import { Injectable } from "@angular/core";
import {
    BehaviorSubject,
    EMPTY,
    from,
    Observable,
    of,
    Subject,
    Subscriber
} from "rxjs";
import { v4 as uuid } from "uuid";
import { HttpClient } from "@angular/common/http";
import {
    auditTime,
    buffer,
    catchError,
    concatMap,
    filter,
    finalize,
    map,
    shareReplay,
    switchMap,
    tap
} from "rxjs/operators";

@Injectable({
    providedIn: "root"
})
export class LongPollingService {
    private _socketID: BehaviorSubject<string> = new BehaviorSubject(uuid());
    private _pollForUpdate$: BehaviorSubject<any> = new BehaviorSubject<any>(
        null
    );
    private _sendOperations$: Subject<any> = new Subject<any>();

    poll$ = this._socketID.pipe(
        this._openConnection(),
        map(() => this._socketID.getValue()),
        this._pollForUpdates(),
        filter((ops) => Array.isArray(ops) && ops.length !== 0),
        switchMap((ops: any[]) => from(ops)),
        switchMap((response: any) => {
            if (response.operation === "disconnect") {
                this.reconnect();

                return EMPTY;
            }

            return of(response);
        }),
        finalize(() => this._socketID.next(uuid())),
        shareReplay({ refCount: true, bufferSize: 1 })
    );

    send: Subscriber<any> = this._getSubscriber();

    constructor(private _http: HttpClient) {
        this._sendOperations$
            .pipe(
                buffer(this._sendOperations$.pipe(auditTime(250))),
                switchMap((ops) =>
                    this._http.post("/sf/longpoll/op", {
                        socketID: this._socketID.getValue(),
                        ops
                    })
                )
            )
            .subscribe();
    }

    reconnect() {
        this._socketID.next(uuid());
        this.send = this._getSubscriber();
    }

    // private;
    private _getSubscriber<T>(): Subscriber<T> {
        return Subscriber.create<T>(
            (x) => {
                this._sendOperations$.next(x);
            },
            (err) => {},
            () => {}
        );
    }

    private _openConnection() {
        return (socketID: Observable<string>) =>
            socketID.pipe(
                switchMap((socketID) =>
                    this._http.post("/sf/longpoll/op", {
                        isNew: true,
                        socketID,
                        ops: []
                    })
                )
            );
    }

    private _pollForUpdates() {
        return (socketID: Observable<string>) =>
            socketID.pipe(
                switchMap((socketID: string) => {
                    const poll$ = this._http
                        .post("/sf/longpoll/poll", { socketID })
                        .pipe(
                            tap((_) => this._pollForUpdate$.next(null)),
                            catchError((error) => {
                                this.reconnect();

                                return EMPTY;
                            })
                        );

                    return this._pollForUpdate$.pipe(concatMap(() => poll$));
                })
            );
    }
}
