import { CountryConfigRestService } from '../../rest-services/country-config-rest.service';
import { MyFilterWebsocketService } from '../my-filter-websocket/my-filter-websocket.service';
import { Injectable } from '@angular/core';
import { map, switchMap, takeWhile } from 'rxjs/operators';

import { BehaviorSubject, Observable, of } from 'rxjs';
import { EquipmentRestService } from '../../rest-services/equipment-rest.service';
import { EquipmentStatusForCustomer } from '../../models/equipment/equipment-status';
import { Customer } from '../../models/customer/customer';
import { CacheableHttpClient } from '../cache/cacheable-http-client';
import { equipmentStatusRestName } from '../../rest-services/equipment-rest-constants.service';
import { EquipmentStatusWebsocketStateService } from './equipment-status-websocket-state.service';

@Injectable({providedIn: 'root'})
export class EquipmentStatusWebsocketService {

  equipmentStatusWebserviceMessage$ = new BehaviorSubject<EquipmentStatusForCustomer[]>([]);

  // Sometimes messages start coming sooner, than equipment status call returns response.
  // Therefore, we need to cache those notMergedMessages for times, when we get response from BE
  notMergedMessages: EquipmentStatusForCustomer[] = [];

  constructor(
    private countryConfig: CountryConfigRestService,
    private equipmentRestService: EquipmentRestService,
    private myFilterWebsocketService: MyFilterWebsocketService,
    private cacheableHttpClient: CacheableHttpClient,
    private equipmentStatusWebsocketStateService: EquipmentStatusWebsocketStateService) {
  }

  activateWebsocketEquipmentStatusIfToggleIsActive() {
    this.countryConfig.getConfig().pipe(
      takeWhile(config => config.FEATURE_TOGGLE_EQUIPMENT_STATUS_CACHE === 'true'),
      switchMap(config => {
        this.equipmentStatusWebsocketStateService.isWebsocketForEquipmentStatusActive = config.FEATURE_TOGGLE_EQUIPMENT_STATUS_CACHE === 'true';

        // When new value comes, it will cancel previous watchers created in startWatchersForCustomerIds,
        // this is crucial part for correct functioning with my equipment switch,
        // so be aware before any change
        return this.myFilterWebsocketService.myShowingEquipmentCustomersList$;
      }),
      switchMap((customerList) => this.startWatchersForCustomerIds(customerList)
      )).subscribe((equipmentStatusList: EquipmentStatusForCustomer[]) => {

      if (equipmentStatusList) {
        this.setLightEquipmentStatusWidget();

        this.processIncomingMessages(equipmentStatusList);
      }
    });
  }

  private processIncomingMessages(equipmentStatusList: EquipmentStatusForCustomer[]) {
    if (equipmentStatusList.length > 0) {

      const cachedResponse: EquipmentStatusForCustomer[] =
        this.cacheableHttpClient.getCachedValueForRequest(equipmentStatusRestName, null);

      if (cachedResponse) {
        const updatedEquipmentStatusList: EquipmentStatusForCustomer[] = this.mergeMessageToList(cachedResponse, equipmentStatusList);

        this.cacheableHttpClient.updateCacheWithoutTriggeringReload(equipmentStatusRestName, null, updatedEquipmentStatusList);
        this.equipmentStatusWebserviceMessage$.next(updatedEquipmentStatusList);
      } else {
        this.notMergedMessages = this.mergeMessageToList(equipmentStatusList, this.notMergedMessages);
      }
    } else {
      this.lightUpEquipmentStatusWidgetImmediately();
    }
  }

  private startWatchersForCustomerIds(customerList: Customer[]) {
    return this.equipmentRestService.getEquipmentStatusFromWebsocketMessages(
      of(customerList.map(customer => customer.customerId))
    );
  }

  setLightEquipmentStatusWidget() {
    this.myFilterWebsocketService.isEquipmentStatusWidgetLight = true;
  }

  lightUpEquipmentStatusWidgetImmediately() {
      this.myFilterWebsocketService.lightUpEquipmentStatusWidget$.next(true);
  }

  mergeNewMessageToExistingList(responseList: EquipmentStatusForCustomer[]): Observable<EquipmentStatusForCustomer[]> {
    return this.equipmentStatusWebserviceMessage$.pipe(map(mergedList => {
        if (mergedList.length === 0) {
          // When response come from BE, we could be in state that some messages already came to FE
          // Therefore we are merging messages which are already present on FE with response list from BE
          const mergedResponseWithAllMessagesList = this.notMergedMessages.length !== 0 ?
            this.mergeMessageToList(responseList, this.notMergedMessages) : responseList;
          this.cacheableHttpClient.updateCacheWithoutTriggeringReload(equipmentStatusRestName, null, mergedResponseWithAllMessagesList);
          this.notMergedMessages = [];

          return mergedResponseWithAllMessagesList;
        } else {
          return mergedList;
        }
      }
    ));
  }

  private mergeMessageToList = (cacheValues: EquipmentStatusForCustomer[],
    messageValues: EquipmentStatusForCustomer[]) => {
    return [...messageValues, ...cacheValues].reduce((uniqueList: EquipmentStatusForCustomer[],
      uniqueListItem: EquipmentStatusForCustomer) => {
      return uniqueList.find(item => item.customerId === uniqueListItem.customerId) ? uniqueList : [...uniqueList, uniqueListItem];
    }, []);
  }

  resetWebsocketMessageMerging() {
    this.equipmentStatusWebserviceMessage$.next([]);
  }
}
