import {Injectable, Type} from '@angular/core';
import * as Logger from 'js-logger';
import {merge, Observable, Subject} from 'rxjs';
import {filter, finalize, first, map, share, takeUntil, tap} from 'rxjs/operators';
import {SyncServiceBase} from '../sync/sync.service.base';
import {
    ConnectionCloseEvent,
    ConnectionEvent,
    ConnectionOpenEvent,
    DataReceivedEvent,
    DeckStatus,
    OvenClient,
    OvenEvent,
} from './oven-data-types';
import {WachtelClient} from './wachtel-client';

const logger = Logger.get('oven-data-manager');

export class UnsupportedTypeError extends Error {
}

export class NotFoundError extends Error {
}

@Injectable()
export class OvenConnectionManagerService {
    private static ovenTypes = new Map<string, Type<OvenClient>>([
        ['WACHTEL', WachtelClient],
    ]);
    private connections = new Map<string, Observable<OvenEvent>>();

    constructor(private sync: SyncServiceBase) {
    }

    getDeckStatus$(deckId: number): Observable<DeckStatus> {
        const deckConfig = (this.sync.syncData || {oven_decks: []}).oven_decks.find(d => d.id === deckId);
        if (deckConfig === undefined) {
            throw new NotFoundError(`Deck with id ${deckId} not found.`);
        }

        const deck$ = this.getOvenConnection(deckConfig)
            .pipe(
                filter((e): e is DataReceivedEvent => e instanceof DataReceivedEvent),
                map((e: DataReceivedEvent) => e.data.decks[deckConfig.deck_number - 1]),
            );

        // Log this only at first receive.
        const missingDeck$ = deck$.pipe(
            first(),
            filter(deck => deck === undefined),
            tap(() => logger.warn(`There is no deck in status for deck number ${deckConfig.deck_number} of deck with id ${deckId}.`)),
        );

        // Merge missingDeck$, although its single event will be filtered out,
        // because we want to inject logging without affecting teardown of deck$ and its parents.
        // This happens if we subscribe to deck$ instead and no data is received before external unsubscribe.
        return merge(deck$, missingDeck$).pipe(filter(deck => deck !== undefined));
    }

    /**
     * @return A shared stream of OvenEvents (connection status and data receive)
     *         which holds only one connection to each host for all subscribers.
     *         The connection is automatically opened of first subscribe and closed on unsubscribe of last subscriber.
     */
    private getOvenConnection(deckConfig: any): Observable<OvenEvent> {
        const host = deckConfig.address;

        // If we want one connection per oven/deck instance, we need a id based key.
        let connection = this.connections.get(host);

        if (connection !== undefined) {
            return connection;
        }

        // Don't throw exceptions in subscribe callback!
        const clientClass = OvenConnectionManagerService.getOvenClientClass(deckConfig.type);

        // One connection per host and automatic disconnect from client on last unsubscribe.
        connection = new Observable<OvenEvent>(subscriber => {
            logger.debug('Creating oven client to host: ' + host);
            const client = new clientClass(host);
            client.connect();

            const teardown = new Subject();
            const untilTeardownEvents = client.events.pipe(takeUntil(teardown));

            untilTeardownEvents.subscribe(subscriber.next.bind(subscriber));
            OvenConnectionManagerService.logConnectionEvents(untilTeardownEvents, deckConfig);

            return () => {
                teardown.next();
                client.disconnect();
                this.connections.delete(host);
            };
        }).pipe(share());
        this.connections.set(host, connection);

        return connection;
    }

    private static logConnectionEvents(untilTeardownEvents: Observable<OvenEvent>, deckConfig: any): void {
        const id = deckConfig.address_entity === 'Oven' ? deckConfig.oven_id : deckConfig.id;
        const prefix = `Connection of ${deckConfig.address_entity} with id ${id} to host ${deckConfig.address}`;

        untilTeardownEvents
            .pipe(filter(e => e instanceof ConnectionEvent))
            .pipe(finalize(() => logger.info(prefix + ' terminated.')))
            .subscribe(connectionEvent => {
                if (connectionEvent instanceof ConnectionOpenEvent) {
                    logger.info(prefix + ' established.');
                }
                if (connectionEvent instanceof ConnectionCloseEvent) {
                    // Because we unsubscribe before disconnect, close is here always an interruption.
                    logger.warn(prefix + ' interrupted.');
                }
            })
        ;
    }

    private static getOvenClientClass(ovenType: string): Type<OvenClient> {
        const type = OvenConnectionManagerService.ovenTypes.get(ovenType);

        if (type === undefined) {
            throw new UnsupportedTypeError(`Type of oven ${ovenType} is not supported`);
        }

        return type;
    }
}
