import { map, filter } from 'rxjs/operators';
import { Observable, fromEvent } from 'rxjs';
import { EventEmitter } from 'events';

/**
 * An class which provides a rate limited observable based
 * on a sourceobservable through the use of a buffer.
 *
 * This has been assembled for use for the toast popup..
 * if its found useful, we might want to make it generic
 * and put it in a common utils package.
 */
export class ObservableRateLimited<T> {

    // The output observable.
    public observable: Observable<T>;

    // An overflow observable, called if the buffer has been truncated.
    public overflow: Observable<boolean>;

    // Our internal emitter trigger, which will trigger
    // the emission of the next item.
    private readonly itemTriggerEmitter: EventEmitter = new EventEmitter();

    // Interval beteem emmissions.
    private minInterval = 5000; // 5 second default - just in case.

    // Item buffer.
    private itemBuffer: Array<T> = [];

    // Time of last item to bew re-emmitted
    private itemEmittedLastTime = 0;

    // Timer for sheduling emmissions.
    private sheduledEmission = null;

    constructor(sourceObservable: Observable<T>, minimumInterval: number, maxBufferItems: number) {

        // Set the minimum time between emissions.
        this.minInterval = minimumInterval;

        // We'll listen to the source observable and place any emitted items into a buffer.
        // and emit them when safe to do so.
        sourceObservable.subscribe((item: T) => {

            // We'll only add until the size of the buffer.
            if (this.itemBuffer.length < maxBufferItems) {
                this.itemBuffer.push(item);
            }

            this.bufferEmitItem();
        });

        // Create a new observable based on the emit event trigger, which will send out
        // our bufffered items - with filter for empty buffer case and the triggering of
        // the sheduling of then emit.
        this.observable = fromEvent(this.itemTriggerEmitter, "EMIT").pipe(
            filter((event: Event) => { return (this.itemBuffer.length > 0); }),
            map((event: Event) => { return (this.itemBuffer.shift()) }),
            map((item: T): T => {
                this.itemEmittedLastTime = new Date().getTime();
                this.sheduledEmission = null;
                this.bufferEmitItem();
                return item;
            }));

        // Create an error observable which will will send out false / true on every emit.
        // It will send out true if the buffer is full !
        this.overflow = fromEvent(this.itemTriggerEmitter, "EMIT").pipe(
            map((event: Event): boolean => (this.itemBuffer.length >= maxBufferItems)));
    }

    /**
     * This will trigger an emit once the safe minimum interval has passed.
     */
    private bufferEmitItem(self?: ObservableRateLimited<T>) {
        // This allows us to keep hold of our original object when called
        // via sheduled time.. I keep forgetting its only a scripting language.
        if (!self) {
            self = this;
        }

        // Whats the time mr wolf.
        const now: number = new Date().getTime();

        // Has the minimum time period passed? - yes then trigger emit.
        if ((now - this.itemEmittedLastTime) > self.minInterval) {
            self.itemTriggerEmitter.emit("EMIT");
            return;
        }

        // Not time to emit, then if we hav'nt already shedule a time to check agian.

        // Shedule a new emission after the minimum interval - unless we have one already.
        if (!self.sheduledEmission) {
            self.sheduledEmission = setTimeout(() => {
                self.bufferEmitItem(self)
            }, (self.minInterval - (now - self.itemEmittedLastTime) + 1));
        }
    }
}
