import { Injectable } from "@angular/core";
import { Socket } from "../interfaces/socket.interface";
import { BehaviorSubject, Observable, of, Subject, timer } from "rxjs";
import { filter, map, share, take, takeUntil } from "rxjs/operators";
import * as hash from "object-hash";
import {
    AuthMessage,
    ResponseMessage,
    SocketMessage,
    SocketOperation,
    SocketRequestOperation,
    SubscriptionMessage
} from "../interfaces/socket2/socket.interface";
import { WebSocketSubject } from "../rxjs/WebSocketSubject";
import { SocketFactoryService } from "./socket-factory.service";

@Injectable({
    providedIn: "root"
})
export class Socket2Service implements Socket {
    private _socket: WebSocketSubject<any>;
    private _listeners: { [key: string]: Observable<any> } = {};
    private _nextRequestId = 0;
    private _onOpen$: Subject<any>;
    private _isConnected$: BehaviorSubject<boolean>;
    private _onClose$: Subject<any>;
    private _messages$: BehaviorSubject<SocketMessage>;
    private _messagesSubscription$: BehaviorSubject<Observable<SocketMessage>>;
    private _queuedRequests: any[] = [];
    private _onDisconnect$: Subject<void>;
    private _onReconnect$: Subject<void>;
    private _snapshot: any;

    constructor(private _socketFactory: SocketFactoryService) {
        this._onOpen$ = new Subject<Event>();
        this._onClose$ = new Subject<CloseEvent>();
        this._isConnected$ = new BehaviorSubject<boolean>(false);
        this._onDisconnect$ = new Subject<void>();
        this._onReconnect$ = new Subject<void>();
        this._makeConnection();
    }

    get socket(): WebSocketSubject<SocketMessage> {
        return this._socket;
    }

    get messages(): Observable<any> {
        return this.socket;
    }

    get subscriptionMessages(): Observable<SubscriptionMessage> {
        return this.messages.pipe(
            filter(
                (message: SubscriptionMessage) =>
                    message.operation === SocketOperation.Event ||
                    message.operation === SocketOperation.Update
            )
        );
    }

    get responseMessages(): Observable<ResponseMessage> {
        return this.messages.pipe(
            filter(
                (message: ResponseMessage) =>
                    message.operation === SocketOperation.Response
            )
        );
    }

    get authMessages(): Observable<AuthMessage> {
        return this.messages.pipe(
            filter(
                (message: AuthMessage) =>
                    message.operation === SocketOperation.Authorized ||
                    message.operation === SocketOperation.Unauthorized
            )
        );
    }

    get isConnected(): BehaviorSubject<boolean> {
        return this._isConnected$;
    }

    get onDisconnect(): Subject<void> {
        return this._onDisconnect$;
    }

    get onReconnect(): Subject<void> {
        return this._onReconnect$;
    }

    get snapshot(): any {
        return this._snapshot;
    }

    close(): void {
        this._sendRequest({ operation: SocketRequestOperation.Close });
    }

    private _makeConnection(): Observable<boolean> {
        this._socket = this._socketFactory.getSocket({
            url: this.getUrl(),
            openObserver: this._onOpen$,
            closeObserver: this._onClose$
        });

        this._onOpen$.subscribe((event: Event) => {
            this._isConnected$.next(true);
            for (let request of this._queuedRequests) {
                this._socket.next(request);
            }

            timer(2500, 2500)
                .pipe(takeUntil(this._onClose$))
                .subscribe(() => {
                    this._sendRequest(
                        { operation: SocketRequestOperation.Heartbeat },
                        false
                    );
                });
        });
        this._onClose$.subscribe((event: CloseEvent) => {
            this._isConnected$.next(false);
        });

        return undefined;
    }

    on<T>(
        product: string,
        source: string,
        data?: any,
        includeFields?: string[]
    ): Observable<any | T> {
        let subscriptionId = this.getSubscriptionId({
            product,
            source,
            data,
            includeFields
        });

        if (!this._listeners[subscriptionId]) {
            this._listeners[subscriptionId] = this._socket
                .multiplex(
                    () => {
                        let request = this._getRequest(
                            SocketRequestOperation.Subscribe,
                            product,
                            source,
                            data,
                            includeFields
                        );

                        request.subscriptionId = subscriptionId;

                        return request;
                    },
                    () => {
                        delete this._listeners[subscriptionId];
                        return {
                            operation: SocketRequestOperation.Unsubscribe,
                            subscriptionId
                        };
                    },
                    (message: SubscriptionMessage) =>
                        (message.operation === SocketOperation.Event ||
                            message.operation === SocketOperation.Update) &&
                        message.subscriptionId &&
                        message.subscriptionId === subscriptionId
                )
                .pipe(
                    map((message) => message.data[0].val),
                    share()
                );
        }

        return this._listeners[subscriptionId];
    }

    reconnect(): void {
        this.close();
    }

    send(
        operation: SocketRequestOperation,
        product: string,
        source: string,
        data?: any,
        includeFields?: string[]
    ): Observable<any> {
        let request = this._getRequest(
            operation,
            product,
            source,
            data,
            includeFields
        );

        if (operation === SocketRequestOperation.Subscribe) {
            this._addSubscriptionId(request, {
                product,
                source,
                data,
                includeFields
            });
        }

        return this._sendRequest(request);
    }

    private _getRequest(
        operation: SocketRequestOperation,
        product: string,
        source: string,
        data?: any,
        includeFields?: string[]
    ) {
        let request: any = {
            operation,
            product,
            source
        };

        if (data) {
            request.data = data;
        }

        if (includeFields) {
            request.includeFields = includeFields;
        }

        return request;
    }

    private getSubscriptionId(key: {
        product: string;
        source: string;
        data?: any;
        includeFields?: string[];
    }) {
        return hash.sha1(key);
    }

    private _off(subscriptionId: string) {
        if (this._listeners[subscriptionId]) {
            this._sendRequest({
                operation: SocketRequestOperation.Unsubscribe,
                subscriptionId
            });
            delete this._listeners[subscriptionId];
        }
    }

    private _addSubscriptionId(request: any, key: any) {
        request.subscriptionId = this.getSubscriptionId(key);

        return request;
    }

    private _sendRequest(request: any, withRequestId = true): Observable<any> {
        let observable: Observable<unknown>;

        if (withRequestId && this._socket) {
            let requestId = this._nextRequestId;

            request.requestId = requestId;

            observable = this.responseMessages.pipe(
                filter(
                    (event: any) =>
                        event.requestId && event.requestId === requestId
                ),
                take(1)
            );

            this._nextRequestId++;
        }

        if (this._socket) {
            this._socket.next(request);
        } else {
            this._queuedRequests.push(request);
        }

        return observable;
    }

    private getUrl(): string {
        return (
            (location.protocol === "https:" ? "wss://" : "ws://") +
            window.location.host +
            "/sf/rtc/ws"
        );
    }
}
