import { Injectable } from '@angular/core';
import * as signalR from '@microsoft/signalr';
import { LogLevel } from '@microsoft/signalr';
import { EMPTY, Observable, of, Subject, throwError, from } from 'rxjs';
import { filter, take, takeUntil, tap } from 'rxjs/operators';
import { ScriptInjectorService } from '../../../utils/script-injector/script-injector.service';
import { TokenService } from '../../token/token.service';
import { MessageQueue } from '../message-queue.abstract';
import { SignalRConnectionConfig } from '../types';

@Injectable({
  providedIn: 'root',
})
export class SignalRV3Service extends MessageQueue {
  connection: any; //signalR connection reference
  chatHub: any;
  hub: any;

  private message$ = new Subject<string>();
  private error$ = new Subject<any>();
  private disconnect$ = new Subject<any>();
  private reconnect$ = new Subject<any>();

  private retryMax = 25;
  private retryWait = 125;
  private scripts: string[] = [];

  constructor(private token: TokenService, private scriptLoader: ScriptInjectorService) {
    super();
  }

  init(connectionConfig: SignalRConnectionConfig): Observable<any> {
    this.scripts = [connectionConfig.$Url];
    return this.scriptLoader.injectMultipleScripts(this.scripts).pipe(
      take(1),
      tap(() => {
        if (connectionConfig.retries !== undefined) this.retryMax = connectionConfig.retries;
        if (connectionConfig.retryWait !== undefined) this.retryWait = connectionConfig.retryWait;

        this.connection = new signalR.HubConnectionBuilder()
          .withUrl(connectionConfig.hubUrl, {
            accessTokenFactory: () => this.token.get(),
          })
          .withAutomaticReconnect({
            nextRetryDelayInMilliseconds: retryContext => {
              this.error$.next(retryContext.retryReason);
              if (retryContext.previousRetryCount === this.retryMax) return null;
              return Math.pow(retryContext.previousRetryCount, 1.25) * this.retryWait;
            },
          })
          .configureLogging(LogLevel.Information)
          .build();

        //SignalR event handlers.
        this.connection.on('broadcastMessage', msg => this.message$.next(msg));
        this.connection.onclose(error => {
          this.disconnect$.next(error);
        });
        this.connection.onreconnecting(error => {
          this.error$.next(error);
        });
        this.connection.onreconnected(connectionId => {
          this.reconnect$.next(connectionId);
        });
      })
    );
  }

  start(): Observable<any> {
    return from(this.connection.start());
  }

  stop() {
    try {
      this.connection.stop();
      return EMPTY;
    } catch (err) {
      return throwError(err);
    }
  }

  destroy() {
    this.scriptLoader.removeMultipleScripts(this.scripts);
    this.scripts = [];
    this.unsubscribe();
    return this.stop();
  }

  onMessage(filterOn: ((msg: string) => boolean) | string | void): Observable<string> {
    const msgFilter = this.createFilterFunction(filterOn);
    return this.message$.asObservable().pipe(
      filter(msg => msgFilter(msg)),
      takeUntil(this.ngUnsubscribe$)
    );
  }

  onDisconnect() {
    return this.disconnect$.asObservable().pipe(takeUntil(this.ngUnsubscribe$));
  }

  onReconnect() {
    return this.disconnect$.asObservable().pipe(takeUntil(this.ngUnsubscribe$));
  }

  onError() {
    return this.error$.asObservable().pipe(takeUntil(this.ngUnsubscribe$));
  }

  send(msg: string) {
    this.connection.invoke('send', msg);
  }

  reconnect() {
    console.log('v3 has auto reconnect so no manual reconnect needed');
    return of(true);
  }

  private createFilterFunction(matching) {
    switch (true) {
      case typeof matching === 'string':
        return msg => msg.indexOf(matching) === 0;
      case typeof matching === 'function':
        return matching;
      default:
        return () => true;
    }
  }
}
