import * as Sentry from '@sentry/minimal';
import {Severity} from '@sentry/types';
import {Observable, Subject, Subscription} from 'rxjs';
import {filter, map} from 'rxjs/operators';

import {BaseTransport} from './transport';
import {
    SMPCloseEvent,
    SMPDisconnectedException,
    SMPEndpointAlreadyRegisteredException,
    SMPEvent,
    SMPException,
    SMPMessage,
    SMPMessageEvent,
    SMPMessageType,
    SMPOpenEvent,
    SMPSendNotConnectedException,
} from './types';

// Reexport some things for easier usage
export {WebsocketTransport} from './transport';
export {
    SMPCloseEvent,
    SMPDisconnectedException,
    SMPEndpointAlreadyRegisteredException,
    SMPEvent,
    SMPException,
    SMPMessage,
    SMPMessageEvent,
    SMPMessageType,
    SMPOpenEvent,
    SMPSendNotConnectedException,
} from './types';

export type TopicHandler = (message: any) => any;

interface ScheduledRequest {
    type: SMPMessageType;
    args: Array<any>;
    retryCount: number;
}

interface TopicStateContainer {
    alive: boolean;
    lastPublishedValue?: any;
    subscription?: Subscription;
}

/**
 * Shop-IQ Messaging Protocol
 */
export class SMPConnection {
    private static readonly REQUEST_RETRY_DELAY = 5 * 1000;

    private eventsSubject = new Subject<SMPEvent>();
    private authenticated = false;

    // state management for high level features such as subscribe and register
    private registrations = new Map<string, Function>();
    private subscriptions = new Map<string, Array<TopicHandler>>();

    // Requests that are scheduled but not yet executed (scheduled using setTimeout(...,0))
    private scheduledRequests: Array<ScheduledRequest> = [];

    // State information for the pubblishIfSubscribed(...) method
    // which only sends published message if the topic has subscribers
    private topicStateContainers = new Map<string, TopicStateContainer>();

    get events(): Observable<SMPEvent> {
        return this.eventsSubject.asObservable();
    }

    get connected(): boolean {
        return this.transport.connected && this.authenticated;
    }

    get bufferedAmount(): number {
        return this.transport.bufferedAmount;
    }

    constructor(private transport: BaseTransport,
                private tokenProvider: () => Promise<string>) {
        // Register onOpen()
        this.transport.events.pipe(
            filter((event: SMPEvent): event is SMPOpenEvent => event instanceof SMPOpenEvent),
        ).subscribe(this.onOpen.bind(this));

        // Pass SMPCloseEvent() through to our own events
        this.transport.events.pipe(
            filter((event: SMPEvent): event is SMPCloseEvent => event instanceof SMPCloseEvent),
        ).subscribe(this.eventsSubject);

        // Subscribe onMessage() to unpacked SMPMessageEvents
        this.transport.events.pipe(
            filter((event: SMPEvent): event is SMPMessageEvent => event instanceof SMPMessageEvent),
            map(event => event.message),
        ).subscribe(this.onMessage.bind(this));
    }

    connect(): void {
        this.transport.connect();
    }

    close(): void {
        this.transport.close();
    }

    register(endpoint: string, handler: Function): void {
        if (this.registrations.has(endpoint)) {
            throw new SMPEndpointAlreadyRegisteredException();
        }

        this.registrations.set(endpoint, handler);

        // Only create a request if we are connected atm, otherwise onOpen() will register us
        if (this.connected) {
            // Register cannot be batched (yet?), so we always create a new request
            const request = this.getScheduledRequest(SMPMessageType.REGISTER, true);
            request.args = [endpoint];
        }
    }

    /**
     * Subscribe a handler to a topic.
     *
     * Limitations: If this connection is already subscribed to topic the parameter replayLastValue will be ignored!
     *
     * @param topic the topic to subscribe to
     * @param handler a handler callback, which be be called with a single parameter (the received message)
     * @param replayLastValue whether the server will send us the last value published before we subscribed
     */
    subscribe(topic: string, handler: TopicHandler, replayLastValue: boolean = false): void {
        // We only need to send the subscribe command on the first subscription of a topic
        if (this.subscriptions.has(topic)) {
            // tslint:disable-next-line:ban-ts-ignore
            // @ts-ignore: Object is possibly undefined
            this.subscriptions.get(topic).push(handler);
            return;
        }

        this.subscriptions.set(topic, [handler]);

        // Only create a request if we are connected atm, otherwise onOpen() will subscribe us
        if (this.connected) {
            const type = replayLastValue ? SMPMessageType.SUBSCRIBE_REPLAY : SMPMessageType.SUBSCRIBE;
            const request = this.getScheduledRequest(type);
            request.args.push(topic);
        }
    }

    subscribe$(topic: string, replayLastValue: boolean = false): Observable<any> {
        return new Observable(subscriber => {
            const handler = subscriber.next.bind(subscriber);
            this.subscribe(topic, handler, replayLastValue);

            return () => {
                this.unsubscribe(topic, handler);
            };
        });
    }

    unsubscribe(topic: string, handler: TopicHandler): void {
        if (!this.subscriptions.has(topic)) {
            return;
        }

        // Remove handler from our handler list
        // tslint:disable-next-line:ban-ts-ignore
        // @ts-ignore: Object is possibly undefined
        const handlers = this.subscriptions.get(topic).filter(fn => fn !== handler);
        if (handlers.length > 0) {
            this.subscriptions.set(topic, handlers);
            return;
        }

        // There are no more handlers left for this topic so we unsubscribe
        this.subscriptions.delete(topic);
        const request = this.getScheduledRequest(SMPMessageType.UNSUBSCRIBE);
        request.args.push(topic);
    }

    async call(endpoint: string, ...args: Array<any>): Promise<any> {
        return this.transport.execute(SMPMessageType.CALL, endpoint, ...args);
    }

    async publish(topic: string, msg: any): Promise<void> {
        await this.transport.execute(SMPMessageType.PUBLISH, topic, msg);
    }

    publishIfSubscribed(topic: string, msg: any): void {
        const topicState = this.getTopicStateContainer(topic);
        topicState.lastPublishedValue = msg;

        if (topicState.alive) {
            // noinspection JSIgnoredPromiseFromCall
            this.publishTopicState(topic);
        }
    }

    private async publishTopicState(topic: string): Promise<void> {
        const topicState = this.getTopicStateContainer(topic);

        try {
            await this.transport.execute(SMPMessageType.PUBLISH, topic, topicState.lastPublishedValue);
        } catch (e) {
            console.warn(`Exception while publishing message on topic ${topic}: ${e.toString()}`);
        }
    }

    private getTopicStateContainer(topic: string): TopicStateContainer {
        if (this.topicStateContainers.has(topic)) {
            return this.topicStateContainers.get(topic) as TopicStateContainer;
        }

        const topicState: TopicStateContainer = {
            alive: false,
        };
        topicState.subscription = this
            .subscribe$('meta.topic_changed.' + topic, true)
            .subscribe(change => {
                topicState.alive = change === 'CREATED';

                if (topicState.alive) {
                    // noinspection JSIgnoredPromiseFromCall
                    this.publishTopicState(topic);
                }
            });

        this.topicStateContainers.set(topic, topicState);
        return topicState;
    }

    /**
     * Gets or creates a ScheduledRequest with the specified message type.
     *
     * @param type The type of the request
     * @param createNew If true a new request is always created
     */
    private getScheduledRequest(type: SMPMessageType, createNew: boolean = false): ScheduledRequest {
        if (!createNew) {
            for (const request of this.scheduledRequests) {
                if (request.type === type) {
                    return request;
                }
            }
        }

        const request: ScheduledRequest = {
            type,
            args: [],
            retryCount: 0,
        };

        const executeRequest = async () => {
            // Remove the request from the scheduled array, because we are sending it now
            this.scheduledRequests = this.scheduledRequests.filter(r => r !== request);

            try {
                await this.transport.execute(request.type, ...request.args);
            } catch (e) {
                if (e instanceof SMPDisconnectedException) {
                    // we can ignore disconnected errors, because we will retry in onOpen()
                    return;
                }

                if (!this.connected) {
                    // same as above, requests will be retried in onOpen()
                    return;
                }

                // Only retry a request up to 3 times before we stop retrying
                request.retryCount += 1;
                if (request.retryCount > 3) {
                    console.warn('Failed to execute request after 3 retries.', e, request);
                    Sentry.captureException(e);
                    return;
                }

                // Remember the retry in sentry and retry later
                Sentry.addBreadcrumb({
                    category: 'smp',
                    message: 'Retrying failed request because of exception: ' + e.toString(),
                    level: Severity.Warning,
                });
                setTimeout(executeRequest, SMPConnection.REQUEST_RETRY_DELAY);
            }
        };
        setTimeout(executeRequest, 0);

        this.scheduledRequests.push(request);
        return request;
    }

    private async onOpen(event: SMPOpenEvent): Promise<void> {
        // Before subscribing / registering we need to authenticate
        try {
            await this.transport.execute(SMPMessageType.AUTH, await this.tokenProvider());
            this.authenticated = true;
        } catch (e) {
            console.warn('Error while authenticating. Closing connection and retrying.');
            this.transport.forceClose();
            return;
        }

        // We can subscribe to all topics in a single request
        if (this.subscriptions.size > 0) {
            this.getScheduledRequest(SMPMessageType.SUBSCRIBE_REPLAY).args = Array.from(this.subscriptions.keys());
        }

        // We have to reregister every registration (one by one, there is not batch api)
        for (const endpoint of Array.from(this.registrations.keys())) {
            this.getScheduledRequest(SMPMessageType.REGISTER, true).args = [endpoint];
        }

        this.eventsSubject.next(event);
    }

    private async onMessage(message: SMPMessage): Promise<void> {
        const msgType = message[0];

        // Catch and log all expected errors to allow reconnect.
        try {
            if (msgType === SMPMessageType.CALL) {
                await this.onCallMessage(message);
                return;
            }
            if (msgType === SMPMessageType.EVENT) {
                this.onEventMessage(message);
                return;
            }
        } catch (e) {
            if (!(e instanceof SMPException)) {
                throw e;
            }
            console.warn('Error while sending response for received message:', e);
            return;
        }

        console.warn('Unhandled message type: ' + msgType);
    }

    private async onCallMessage(message: SMPMessage): Promise<void> {
        const [, requestId, endpoint, ...callArgs] = message;

        if (!this.registrations.has(endpoint)) {
            this.transport.send([SMPMessageType.ERROR, requestId, 'endpoint_not_registered']);
            return;
        }

        // The handler may throw any exception but we always have to send an answer
        let result;
        try {
            // tslint:disable-next-line:ban-ts-ignore
            // @ts-ignore: Object is possibly undefined
            result = await Promise.resolve(this.registrations.get(endpoint).apply(this, callArgs));
        } catch (e) {
            this.transport.send([SMPMessageType.ERROR, requestId, e.toString()]);
            return;
        }

        this.transport.send([SMPMessageType.SUCCESS, requestId, result]);
    }

    private onEventMessage(message: SMPMessage): void {
        const [, requestId, topic, msg] = message;

        if (!this.subscriptions.has(topic)) {
            this.transport.send([SMPMessageType.ERROR, requestId, 'topic_not_subscribed']);
            return;
        }

        this.transport.send([SMPMessageType.SUCCESS, requestId]);
        // tslint:disable-next-line:ban-ts-ignore
        // @ts-ignore: Object is possibly undefined
        for (const handler of this.subscriptions.get(topic)) {
            try {
                handler(msg);
            } catch (e) {
                console.warn('Unhandled exception in handler', e);
                Sentry.captureException(e);
            }
        }
    }
}
