untitled notebook

endpointsharetweet
class Observable { constructor(producer) { this.subscribe = producer } } Observable.prototype.of = function (...values) { const producer = observer => { values.forEach(v => observer.next(v)); observer.complete(); } return new Observable(producer) } Observable.prototype.delay = function (time) { const producer = observer => { return this.subscribe({ next(value) { setTimeout(() => observer.next(value), time) }, complete() { setTimeout(() => observer.complete(), time) } }) }; return new Observable(producer); } Observable.prototype.combineLatest = function (...observables) { const length = observables.length + 1; const producer = outObserver => { const values = [...Array(length)].map(_ => undefined); const hasValue = [...Array(length)].map(_ => false); const hasComplete = [...Array(length)].map(_ => false); const next = (x, index) => { values[index] = x; hasValue[index] = true; if (hasValue.every(x => x === true)) outObserver.next(values); }; const complete = (index) => { hasComplete[index] = true; if (hasComplete.every(x => x === true)) outObserver.complete(); }; observables.forEach((observable, index) => { observable.subscribe({ next: (x) => next(x, index + 1), complete: () => complete(index + 1), }); }); this.subscribe({ next: (x) => next(x, 0), complete: () => complete(0), }); }; return new Observable(producer); } const observer = { next(value) { console.log(value) }, complete() { console.log('Done') } } new Observable() .of(0) .combineLatest( new Observable().of(1, 4), new Observable().of(2), new Observable().of(3).delay(1000), ) .subscribe(observer);

no comments

    sign in to comment

    running