class Observable {
constructor(producer) {
this.subscribe = producer
}
}
Observable.prototype.of = function (...values) {
const producer = observer => {
values.forEach(v => observer.next(v));
observer.complete();
}
return new Observable(producer)
}
Observable.prototype.delay = function (time) {
const producer = observer => {
return this.subscribe({
next(value) {
setTimeout(() => observer.next(value), time)
},
complete() {
setTimeout(() => observer.complete(), time)
}
})
};
return new Observable(producer);
}
Observable.prototype.combineLatest = function (...observables) {
const length = observables.length + 1;
const producer = outObserver => {
const values = [...Array(length)].map(_ => undefined);
const hasValue = [...Array(length)].map(_ => false);
const hasComplete = [...Array(length)].map(_ => false);
const next = (x, index) => {
values[index] = x;
hasValue[index] = true;
if (hasValue.every(x => x === true)) outObserver.next(values);
};
const complete = (index) => {
hasComplete[index] = true;
if (hasComplete.every(x => x === true)) outObserver.complete();
};
observables.forEach((observable, index) => {
observable.subscribe({
next: (x) => next(x, index + 1),
complete: () => complete(index + 1),
});
});
this.subscribe({
next: (x) => next(x, 0),
complete: () => complete(0),
});
};
return new Observable(producer);
}
const observer = {
next(value) { console.log(value) },
complete() { console.log('Done') }
}
new Observable()
.of(0)
.combineLatest(
new Observable().of(1, 4),
new Observable().of(2),
new Observable().of(3).delay(1000),
)
.subscribe(observer);