import * as Logger from 'js-logger';
import {ConnectableObservable, merge, Observable, of, timer, Timestamp} from 'rxjs';
import {ajax} from 'rxjs/ajax';
import {
    catchError,
    delay,
    distinctUntilChanged,
    exhaustMap,
    filter,
    first,
    map,
    mapTo,
    publish,
    scan,
    share,
    tap,
    timestamp,
    withLatestFrom,
} from 'rxjs/operators';

const logger = Logger.get('washtec-client');

export interface WashtecStatus {
    status?: string;
    seconds: number;  // greater or equal 0
    procent?: number;
    reload?: number;
    name?: string;
}

export class WashtecClient {
    private static readonly POLL_INTERVAL = 10 * 1000;
    private static readonly TIME_JITTER_THRESHOLD = 1000;

    private static readonly seconds$ = timer(0, 1000).pipe(share());

    /**
     * @return An observable that emits every second remaining time or undefined, but only on change.
     *         If getTime$ is called several times, all observables are emitting synchronized.
     *         If interrupted connection is detected, the observable emits undefined.
     *         Emission starts on first successful request to host or when interruption is detected.
     */
    static getTime$(host: string): Observable<number> {
        const http$ = ajax(`http://${host}/rest/SoftCareRest.cgi?cmd=remainingWashingTime`).pipe(map(r => r.response));

        logger.debug(`Creating washtec client to host: ${host}`);
        const status$ = timer(0, WashtecClient.POLL_INTERVAL).pipe(
            exhaustMap(() => {
                logger.debug(`Requesting status from: ${host}`);
                return (WashtecClient.EMULATE_WASH_LINK ? WashtecClient.getHttpStatusEmulator() : http$)
                    .pipe(catchError(error => {
                        logger.debug(`Status request for ${host} failed: ${error.message}: ${error.xhr ? error.xhr.errorString : ''}`);
                        return of(undefined);
                    }));
            }),
            share(),
        );

        // Emits false if once received status, emits true if ${interruptionThreshold} times status request failed (undefined)
        const interruptionThreshold = 3;
        const connectionInterrupted$ = status$.pipe(
            scan((count, status) => status === undefined ? count + 1 : 0, 0),
            tap(c => {
                if (c % 10 === interruptionThreshold) {  // Log when connectionInterrupted is set and then every 10th time
                    logger.info(`Connection to carwash with address ${host} interrupted.`);
                }
            }),
            map(c => c >= interruptionThreshold),
        );

        const time$ = status$.pipe(
            filter(s => s !== undefined),
            map(s => s.seconds),
            tap(t => logger.debug(`Received new time ${t} from ${host}`)),
            timestamp(),
            scan((ref: Timestamp<number>, act: Timestamp<number>) => {
                const timeRef = Math.max(0, ref.value * 1000 - (act.timestamp - ref.timestamp));

                if (Math.abs(timeRef - act.value * 1000) <= WashtecClient.TIME_JITTER_THRESHOLD) {
                    return ref;
                }

                logger.debug(`Time deviation larger than threshold, continuing with actual value for: ${host}`);
                return act;
            }),
        );

        // Emits remaining time or undefined if connection is interrupted every second synchronously to other clients if it changes.
        return WashtecClient.seconds$.pipe(
            withLatestFrom(connectionInterrupted$),
            map(([, connectionInterrupted]) => connectionInterrupted),
            branch(
                connectionInterrupted => connectionInterrupted,
                interrupted => interrupted.pipe(mapTo(undefined)),
                connected => connected.pipe(
                    timestamp(),
                    withLatestFrom(time$),
                    map(([tick, time]) => Math.max(0, time.value - Math.trunc((tick.timestamp - time.timestamp) / 1000))),
                ),
            ),
            distinctUntilChanged(),  // Do not emit 0,0,0... and undefined,undefined,undefined...
        );
    }

    // For testing purpose only.
    // tslint:disable:member-ordering
    private static readonly EMULATE_WASH_LINK = false;

    private static readonly statusEmulator$ = timer(0, 950).pipe(
        map((i): WashtecStatus => ({
            seconds: Math.max(0, 75 - i % 90),
            name: 'Car wash emulator',
        })),
        publish(),
    );

    private static getStatusEmulator$(): Observable<WashtecStatus> {
        (WashtecClient.statusEmulator$ as ConnectableObservable<WashtecStatus>).connect();
        return WashtecClient.statusEmulator$;
    }

    private static getHttpStatusEmulator(): Observable<WashtecStatus> {
        return WashtecClient.getStatusEmulator$().pipe(
            delay(500),
            tap(s => {
                if (s.seconds > 30 && s.seconds <= 50) {
                    throw new Error('500');
                }
            }),
            first(),  // Emit once and complete like a http request.
        );
    }
}

// My first custom pipe operator :)
// Splits the source observable by selector into 2 observables, that can be piped by injector callbacks.
// The observables returned by injector callbacks are merged to have at the end only one observable.
// If source observable emits multiple events on one tick, the order might be changed in resulting observable.
function branch<T>(selector: (value: T, index: number) => boolean,
                   injectorTrue: (observable: Observable<T>) => Observable<any>,
                   injectorFalse: (observable: Observable<T>) => Observable<any> = o => o): (source: Observable<T>) => Observable<any> {
    return (source: Observable<T>) => {
        const sharedSource = source.pipe(share());
        const filteredTrue = sharedSource.pipe(filter(selector));
        const filteredFalse = sharedSource.pipe(filter((value: T, index: number) => !selector(value, index)));
        return merge(injectorTrue(filteredTrue), injectorFalse(filteredFalse));
    };
}
