import { Injectable } from "@angular/core";
import * as SockJS from "sockjs-client";
import {
    BehaviorSubject,
    EMPTY,
    fromEventPattern,
    merge,
    Observable,
    Subject,
    Subscription,
    timer
} from "rxjs";
import {
    filter,
    finalize,
    map,
    mergeMap,
    publishReplay,
    refCount,
    share,
    switchAll,
    tap
} from "rxjs/operators";
import { SocketRequest } from "../interfaces/socket-request.interface";
import { Auth } from "../interfaces/auth.interface";
import { SocketMessage } from "../interfaces/socket-message.interface";
import { SendAcknowledgeMessage } from "../interfaces/send-acknowledge-message.interface";
import { SocketMessageType } from "../enums/socket-message-type.enum";
import { SocketErrorMessage } from "../interfaces/socket-error-message.interface";
import { HttpClient } from "@angular/common/http";

import { dayjs } from "../date-and-time/plugins/dayjs/index";
import { LoggerService } from "./logger.service";

@Injectable({
    providedIn: "root"
})
export class SocketService {
    private _deadManTimeout = 30000;
    private _connectionTimeout = 7500;

    private _socket: WebSocket;
    private _heartbeat$: Observable<SockJS.BaseEvent>;
    private _messages$: Observable<SockJS.MessageEvent>;
    private _processedMessages$: Observable<
        Auth | SocketMessage | SendAcknowledgeMessage
    >;
    private _open$: Observable<SockJS.OpenEvent>;
    private _close$: Observable<SockJS.CloseEvent>;
    private _error$: Observable<any>;
    private _heartbeatSubscription: Subscription;
    private _isConnectedSubscription: Subscription;
    private _deadManSubscription: Subscription;
    private _isConnected$: BehaviorSubject<boolean>;
    private _onReconnect$: Subject<void>;
    private _onDisconnect$: Subject<void>;
    private _deadMan$: BehaviorSubject<Observable<number>>;
    private _connectionTries = 0;
    private _messageQueue: string[] = [];
    private _listeners: { [key: string]: Observable<SocketMessage> } = {};
    private _currentSubscriptions: {
        heartbeat: BehaviorSubject<Observable<SockJS.BaseEvent>>;
        messages: BehaviorSubject<Observable<SockJS.MessageEvent>>;
        open: BehaviorSubject<Observable<SockJS.OpenEvent>>;
        close: BehaviorSubject<Observable<SockJS.CloseEvent>>;
        error: BehaviorSubject<Observable<any>>;
    };

    private _snapshot: {
        isConnected: boolean;
        lastMessage: SocketMessage | null;
        lastAuth: Auth | null;
        lastHeartbeat: dayjs.Dayjs | null;
    };

    constructor(private _http: HttpClient, private logger: LoggerService) {
        this._currentSubscriptions = {
            heartbeat: new BehaviorSubject(EMPTY),
            messages: new BehaviorSubject(EMPTY),
            open: new BehaviorSubject(EMPTY),
            close: new BehaviorSubject(EMPTY),
            error: new BehaviorSubject(EMPTY)
        };

        this._initConnectionEvents();
        this._initMessageEvents();
        this._initBaseSubscriptions();
        this._initSnapshot();
    }

    get messages() {
        return this._processedMessages$;
    }

    get snapshot() {
        return this._snapshot;
    }

    get authMessages(): Observable<Auth> {
        return this.messages.pipe(
            filter((message: Auth) => message.type === SocketMessageType.AUTH),
            publishReplay(1),
            refCount()
        );
    }

    get acknowledgeMessages(): Observable<SendAcknowledgeMessage> {
        return this.messages.pipe(
            filter((message: SendAcknowledgeMessage) =>
                message.hasOwnProperty("ACK")
            )
        );
    }

    get fetchMessages(): Observable<SocketMessage> {
        return this.messages.pipe(
            filter(
                (message: SocketMessage) =>
                    message.type === SocketMessageType.FETCH
            ),
            mergeMap(
                // TODO: what error handling do we need to add?
                // TODO: how do I write tests for this?
                (message: SocketMessage) =>
                    this._http.get<SocketMessage>(
                        "/sf/admin/mw-updates/message/" + message.data + ".json"
                    )
            )
        );
    }

    get realSubscriptionMessages(): Observable<SocketMessage> {
        return this.messages.pipe(
            filter(
                (message: SocketMessage) =>
                    !message.type ||
                    message.type === SocketMessageType.SEND ||
                    message.type === SocketMessageType.UPDATE ||
                    message.hasOwnProperty("error")
            )
        );
    }

    get subscriptionMessages(): Observable<SocketMessage> {
        return merge(this.realSubscriptionMessages, this.fetchMessages);
    }

    get errorMessages(): Observable<SocketErrorMessage> {
        return this.subscriptionMessages.pipe(
            filter((message: SocketErrorMessage) =>
                message.hasOwnProperty("error")
            ),
            share()
        );
    }

    get isConnected(): BehaviorSubject<boolean> {
        return this._isConnected$;
    }

    get onReconnect(): Observable<void> {
        return this._onReconnect$;
    }

    get onDisconnect(): Observable<void> {
        return this._onDisconnect$;
    }

    get socket(): WebSocket {
        return this._socket;
    }

    processQueuedMessages(auth: Auth): void {
        if (auth && auth.data.authenticated) {
            while (this._messageQueue.length > 0) {
                this._socket.send(this._messageQueue.splice(0, 1)[0]);
            }
        }
    }

    makeConnection(): Observable<boolean> {
        this._socket = this._getNewSocket();

        const { heartbeat, messages, open, close, error } =
            this._getSocketEvents(this._socket);

        this._currentSubscriptions.heartbeat.next(heartbeat);
        this._currentSubscriptions.messages.next(messages);
        this._currentSubscriptions.open.next(open);
        this._currentSubscriptions.close.next(close);
        this._currentSubscriptions.error.next(error);

        this._deadMan$.next(timer(this._connectionTimeout));

        return this.isConnected;
    }

    reconnect(heartFailure?: boolean): void {
        if (this._socket) {
            this.close();
            delete this._socket;
        }

        this._connectionTries++;

        if (this._connectionTries > 2) {
            this._onDisconnect$.next();
        } else {
            let subscription = this.makeConnection().subscribe(
                (isConnected) => {
                    if (isConnected) {
                        this._connectionTries = 0;
                        this._onReconnect$.next();
                        subscription.unsubscribe();
                    }
                }
            );
        }
    }

    close(): void {
        this._socket?.close();
        this._snapshot = {
            isConnected: false,
            lastMessage: null,
            lastAuth: null,
            lastHeartbeat: null
        };
        this._isConnected$.next(false);
        this._deadMan$.next(EMPTY);
    }

    on(path: string): Observable<SocketMessage> {
        if (!this._listeners[path]) {
            this._listeners[path] = this.subscriptionMessages.pipe(
                filter(
                    (event: SocketMessage) =>
                        event.path && event.path.startsWith(path)
                ),
                finalize(() => {
                    this.off(path);
                }),
                share()
            ) as Observable<SocketMessage>;
        }
        return this._listeners[path];
    }

    send(
        type: string,
        method: string,
        path: string,
        data?: any,
        product?: string,
        requestId?: number
    ) {
        let request: SocketRequest = {
            path,
            product,
            data: {}
        };

        if (typeof type !== "undefined" && type !== null) {
            request.type = type;
        }
        if (typeof method !== "undefined" && method !== null) {
            request.method = method;
        }
        if (typeof data !== "undefined" && data !== null) {
            request.data = data;
        }
        if (typeof requestId !== "undefined" && requestId !== null) {
            request.requestId = requestId;
        }

        let packet = JSON.stringify(request);

        if (this._snapshot.isConnected && this._snapshot.lastAuth) {
            if (packet.length > 96 * 1024) {
                let postRequest =
                    '{"websocketid":"' +
                    this._snapshot.lastAuth.session.socketId +
                    '","sessionid":"' +
                    this._snapshot.lastAuth.session.sessionId +
                    '","data":' +
                    packet +
                    "}";

                this._http.post("/sf/websocket/send", postRequest).subscribe();
            } else {
                this._socket.send(packet);
            }
        } else {
            this._messageQueue.push(packet);
        }
    }

    private off(path: string): void {
        if (this._listeners[path]) {
            delete this._listeners[path];
        }
    }

    private _getSocketEvents(socket: WebSocket): {
        heartbeat: Observable<SockJS.BaseEvent>;
        messages: Observable<SockJS.MessageEvent>;
        open: Observable<SockJS.OpenEvent>;
        close: Observable<SockJS.CloseEvent>;
        error: Observable<any>;
    } {
        return {
            heartbeat: this._getSocketEvent<SockJS.BaseEvent>(
                socket,
                "heartbeat"
            ),
            messages: this._getSocketEvent<SockJS.MessageEvent>(
                socket,
                "message"
            ),
            open: this._getSocketEvent<SockJS.OpenEvent>(socket, "open"),
            close: this._getSocketEvent<SockJS.CloseEvent>(socket, "close"),
            error: this._getSocketEvent<any>(socket, "error")
        };
    }

    private _getNewSocket(): WebSocket {
        return new SockJS(this.getUrl(), null, {
            transports: ["websocket", "xhr-polling"]
        });
    }

    private getUrl(): string {
        return (
            window.location.href.substring(
                0,
                window.location.href.indexOf("/sf/")
            ) + "/sf/websocket"
        );
    }

    private _getSocketEvent<T>(
        socket: WebSocket,
        eventName: string
    ): Observable<T> {
        return fromEventPattern<T>(
            (handler) => socket.addEventListener(eventName, handler),
            (handler) => socket.removeEventListener(eventName, handler)
        );
    }

    private _processMessage(
        event: SockJS.MessageEvent
    ): Auth | SocketMessage | SendAcknowledgeMessage {
        let data = JSON.parse(event.data);
        return data;
    }

    private _processHeartbeat(event: SockJS.BaseEvent): void {
        this._snapshot.lastHeartbeat = dayjs();
        this._deadMan$.next(timer(this._deadManTimeout));
    }

    private _initMessageEvents(): void {
        this._heartbeat$ = this._currentSubscriptions.heartbeat.pipe(
            switchAll()
        );
        this._messages$ = this._currentSubscriptions.messages.pipe(
            switchAll(),
            share()
        );
        this._open$ = this._currentSubscriptions.open.pipe(
            switchAll(),
            share()
        );
        this._close$ = this._currentSubscriptions.close.pipe(
            switchAll(),
            share()
        );
        this._error$ = this._currentSubscriptions.error.pipe(
            switchAll(),
            share()
        );
        this._processedMessages$ = this._messages$.pipe(
            map(this._processMessage.bind(this)),
            share<Auth | SocketMessage>()
        );
    }

    private _initConnectionEvents(): void {
        this._isConnected$ = new BehaviorSubject<boolean>(false);
        this._onReconnect$ = new Subject<void>();
        this._onDisconnect$ = new Subject<void>();
        this._deadMan$ = new BehaviorSubject<Observable<undefined>>(EMPTY);
    }

    private _initBaseSubscriptions(): void {
        this._isConnectedSubscription = this._open$.subscribe((event) => {
            this._isConnected$.next(true);
        });
        this._heartbeatSubscription = this._heartbeat$.subscribe(
            this._processHeartbeat.bind(this)
        );
        this._deadManSubscription = this._deadMan$
            .pipe(switchAll())
            .subscribe((value) => {
                this.reconnect();
            });
        this._close$.subscribe((close) => {
            if (close.wasClean === false) {
                let multiplier = Math.max(1, this._connectionTries * 2);
                this.logger.error(
                    "Failed to connect to the socket. Trying again in " +
                        (this._connectionTimeout * multiplier) / 1000 +
                        " seconds"
                );
                this._deadMan$.next(
                    timer(this._connectionTimeout * multiplier)
                );
            } else if (close.code === 3000 && close.reason === "Go away!") {
                this.reconnect();
            }
        });
        this.authMessages.subscribe((auth) => {
            this.processQueuedMessages(auth);
        });
    }

    private _initSnapshot(): void {
        this._snapshot = {
            isConnected: false,
            lastMessage: null,
            lastAuth: null,
            lastHeartbeat: null
        };

        this.isConnected.subscribe((isConnected) => {
            this._snapshot.isConnected = isConnected;
        });

        this.authMessages.subscribe((auth) => {
            this._snapshot.lastAuth = auth;
        });

        this.subscriptionMessages.subscribe((message) => {
            this._snapshot.lastMessage = message;
        });
    }
}
