import { BehaviorSubject, combineLatest, Observable, Subscription } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import _ from 'lodash';
import { pipeFromArray } from 'rxjs/internal/util/pipe';
import { interpolateSource } from './interpolateSource';
import { transformationPipeFactory } from './transformationPipeFactory';


export type Unit = {
    errors?: [],
    data: any;
}

export const NeverResolved: Unit = undefined;

export type NoteStream = {
    id: string,
    name?: string,
    input: BehaviorSubject<string>,
    subscription: Subscription,
    output: BehaviorSubject<Unit>,
}

export class EventBus {
    
    public observable(id: string) { return this.getStream(id).output }
    
    private getStream(id) { return _(this.streams.value).filter(i => i.id == id).first() }
    private streams = new BehaviorSubject<NoteStream[]>([]);

    private createSubscription(input: Observable<string>,
        transformations: Transformation[],
        output: BehaviorSubject<Unit>): Subscription {

        const transformationPipe = transformationPipeFactory(this.streams);

        return combineLatest(input, this.streams).pipe(
            switchMap(([source, streams]) => interpolateSource(source, streams)),
            map(interpolatedSource => ({ data: interpolatedSource }) as Unit),
            pipeFromArray(transformationPipe(transformations))
        )
        .subscribe(output);
    }

    update(note: { id: string, name?: string, transformations?: Transformation[] }) {

        const noteStream = this.getStream(note.id);
        noteStream.name = note.name;
        noteStream.subscription.unsubscribe();
        noteStream.subscription = this.createSubscription(noteStream.input, note.transformations, noteStream.output);
    }

    init(note: { id: string, name?: string, transformations?: Transformation[], source?: any }) {

        const input = new BehaviorSubject<string>(note.source);
        const output = new BehaviorSubject<Unit>(NeverResolved);

        const subscription = this.createSubscription(input, note.transformations, output);

        const noteStream: NoteStream = {
            id: note.id,
            name: note.name,
            input,
            subscription,
            output,
        };

        this.streams.next([...this.streams.value, noteStream])
    }

    pushValue({ id, source }) {

        this.getStream(id).input.next(source); 
    }

    remove(ids: string[]) {
        const map = {};
        ids.forEach(id => map[id] = true);
        this.streams.next(_.filter(_.cloneDeep(this.streams).value, stream => {
            if (map[stream.id]) {
                stream.subscription.unsubscribe();
                return false;
            }
            return true;
        }));
    }
}
