import { WebSocketSubject } from 'rxjs/webSocket';
import {
  type Observable,
  Subject,
  map,
  type Subscription,
  retry,
  timer,
  startWith,
  distinctUntilChanged
} from 'rxjs';
import { inject, singleton } from 'tsyringe';
import type { FetchPolicy } from '@apollo/client';
import type { Maybe, Optional } from '@oms/shared/util-types';
import { Logger, cleanMaybe } from '@oms/shared/util';
import {
  GetMontageInstrumentDocument,
  type GetMontageInstrumentQuery,
  type GetMontageInstrumentQueryVariables,
  type MontageInstrumentFragment
} from '@oms/generated/frontend';
import type { SimpleInstrument } from '@app/common/types/instrument/instrument.types';
import { OfflineDb } from '@app/data-access/offline/offline-database';
import { ApolloClientRPC } from '@app/data-access/api/apollo-client-rpc';
import { GQLResponse } from '@app/data-access/api/graphql/graphql-response';
import type {
  MontageFilter,
  Level2IntegrationEvent,
  Level2QuotePage
} from '../trading/montage/montage.types';
import {
  isLevel2IntegrationEvent,
  isMontageInstrumentFragment
} from '../trading/montage/utils/montage.utils';
import type { FactSetQuery } from './level2-websocket.types';
import { LEVEL2_WEBSOCKET_URL } from './level2-websocket.constants';

@singleton()
export class Level2WebSocketSubject extends WebSocketSubject<Level2IntegrationEvent | FactSetQuery[]> {
  constructor() {
    super({
      url: LEVEL2_WEBSOCKET_URL,
      openObserver: {
        next: () => {},
        complete() {},
        error() {}
      },
      closeObserver: {
        next: () => {},
        complete() {},
        error() {}
      }
    });
  }
}

@singleton()
export class Level2WebsocketService {
  protected messageSubject$: Subject<Level2IntegrationEvent>;

  protected subscription?: Subscription;

  protected fetchPolicy: FetchPolicy = 'cache-first';

  protected logger: Logger;
  protected name: string = 'Level2WebsocketService';

  // 🏗️ Constructor ----------------------------------------------------------------- /

  constructor(
    @inject(ApolloClientRPC) protected apolloClient: ApolloClientRPC,
    @inject(GQLResponse) protected gqlResponse: GQLResponse,
    @inject(OfflineDb) protected offlineDb: OfflineDb,
    @inject(Level2WebSocketSubject) protected socket$: Level2WebSocketSubject
  ) {
    this.logger = Logger.labeled(this.name);
    this.socket$ = socket$;
    this.messageSubject$ = new Subject();
  }

  // 📢 Public ----------------------------------------------------------------- /

  public connect() {
    this.subscription = this.socket$
      .pipe(
        retry({
          delay: (_error, retryIndex) => {
            return timer(Math.pow(2, retryIndex) * 200); // exponential backoff
          }
        })
      )
      .subscribe((event) => {
        if (isLevel2IntegrationEvent(event)) {
          this.messageSubject$.next(event);
        } else {
          this.logger.scope('connect').error('🟢 Message does not conform to Level2IntegrationEvent type');
        }
      });
    this.logger.scope('connect').debug('🟢 Connected to WebSocket');
  }

  /**
   * This subscribes to the current instrument selected on the given Montage window.
   * Just pass the scoped Actor ID for the window you want to subscribe to.
   *
   * @param scopedActorId - The scoped Actor ID for the window
   * @param onError - Optionally, provide a callback to handle errors
   * @returns A Subscription (ensure to unsubscribe this)
   */
  public subscribeToInstrumentFrom(scopedActorId: string, onError?: (e: Error) => void): Subscription {
    return this.getInstrumentId$(scopedActorId).subscribe((instrumentId) => {
      this.getMontageInstrument(instrumentId)
        .then((instrument) => {
          if (instrument) {
            this.subscribeToInstrument(instrument);
          } else {
            this.reset();
          }
        })
        .catch((e) => {
          if (onError && e instanceof Error) {
            onError(e);
          } else {
            this.logger.scope('subscribeToInstrumentFromMontage').error(e);
          }
          this.reset();
        });
    });
  }

  public subscribeToInstrument(instrument: MontageInstrumentFragment): void;

  public subscribeToInstrument(instrument: SimpleInstrument): void;

  public subscribeToInstrument(displayCode: string): void;

  // Implementation only ------- /
  public subscribeToInstrument(instrument: MontageInstrumentFragment | SimpleInstrument | string) {
    if (typeof instrument === 'string') {
      return this.subscribeTo(instrument, (displayCode) => this.trimDisplayCode(displayCode));
    }
    if (isMontageInstrumentFragment(instrument)) {
      return this.subscribeTo(instrument, ({ mappings }) => this.trimDisplayCode(mappings.displayCode));
    } else {
      return this.subscribeTo(instrument, ({ displayCode }) => this.trimDisplayCode(displayCode));
    }
  }

  public level2Quotes$(filter?: MontageFilter): Observable<Level2QuotePage[]> {
    return this.messages$.pipe(
      map((event) => this.extractEventOrders(event, filter)),
      startWith([])
    );
  }

  public reset() {
    this.subscription?.unsubscribe();
    delete this.subscription;
    this.logger.scope('reset').debug('🔁 Reset all subscriptions');
  }

  public destroy() {
    this.reset();
    this.socket$.complete();
    this.messageSubject$.complete();
    this.logger.scope('destroy').debug('🗑️ WebSocket destroyed');
  }

  // 🔒 Protected / private --------------------------------------------------------------- /

  protected get messages$(): Observable<Level2IntegrationEvent> {
    return this.messageSubject$.asObservable();
  }

  protected subscribeTo<T>(value: T, getIdentifier: (value: T) => string) {
    const identifier = getIdentifier(value);
    this.socket$.next([{ identifier, identifierType: 'tickerExchange', maxLevels: 20 }]);
    this.logger.scope('subscribeTo').debug(`🗞️ Subscribed to ${identifier}`);
  }

  protected getInstrumentId$(scopedActorId: string) {
    return this.offlineDb.collections.montage.findOne(scopedActorId).$.pipe(
      map((document) => document?.instrumentId),
      distinctUntilChanged()
    );
  }

  protected async getMontageInstrument(instrumentId?: string): Promise<Optional<MontageInstrumentFragment>> {
    if (!instrumentId) return undefined;
    const result = await this.gqlResponse
      .wrapQuery<GetMontageInstrumentQuery, GetMontageInstrumentQueryVariables>({
        query: GetMontageInstrumentDocument,
        variables: { instrumentId },
        fetchPolicy: this.fetchPolicy
      })
      .exec();
    return result.mapTo(
      ({ data }) => cleanMaybe(data.instrument),
      (errors) => {
        errors.forEach((error, index) => {
          this.logger.scope(['getMontageInstrument', `${index}`]).error(error);
        });
        return undefined;
      }
    );
  }

  protected extractEventOrders(event: Level2IntegrationEvent, filter?: MontageFilter): Level2QuotePage[] {
    const { bids, asks } = event;
    const { type } = filter ?? {};
    if (!type) return [...bids, ...asks];
    return type === 'bid' ? bids : asks;
  }

  protected trimDisplayCode(displayCode: Maybe<string>, fallback?: string): string {
    return displayCode?.split('-')[0] || fallback || '';
  }
}

export default Level2WebsocketService;
