import { Injectable } from '@angular/core';
import { CacheableHttpClient } from '../cache/cacheable-http-client';
import { concat, Observable, of } from 'rxjs';
import { RxStompService } from '../../stomp/rx-stomp.service';
import { CommunicationEndpoint } from './communication-endpoint';
import { filter, map, mergeAll, mergeMap, scan, withLatestFrom } from 'rxjs/operators';
import { UserRestService } from '../../rest-services/user-rest.service';

@Injectable({providedIn: 'root'})
export class CommunicationClient {
  constructor(private http: CacheableHttpClient,
    private rxStompService: RxStompService,
    private userRestService: UserRestService) {
  }

  public get<T>(endpoint: CommunicationEndpoint<T>, websocketToggleOn = false, options?: any): Observable<T> {
    if (!endpoint.topics$ || !endpoint.merger || !websocketToggleOn) {
      return this.http.get<T>(endpoint.url, options);
    }

   return concat(this.performGet(endpoint, options), endpoint.topics$.pipe(
      withLatestFrom(this.userRestService.getUser()),
      mergeMap(([t, user]) =>
        t.map(tt => this.rxStompService.watch(`/topic/${tt}`, {'x-queue-name': user.gid + '-' + new Date().getTime()})
          .pipe(
            filter(m => m.headers['x-fleet-message'] === endpoint.messageType),
            map(m => {
                  return JSON.parse(m.body) as T;
              }
            )))),
      mergeAll())).pipe(scan((acc, val) => !acc ? val : this.updateCache(endpoint.merger(acc, val), endpoint, options)));
  }

  private updateCache<T>(data: T, endpoint: CommunicationEndpoint<T>, options?: any): T {
    this.http.updateCache(endpoint.url, options, data);
    return data;
  }

  performGet<T>(endpoint: CommunicationEndpoint<T>, options: any): Observable<T> {
    if (endpoint.url !== '') {
      return this.http.get<T>(endpoint.url, options);
    }
    return of<T>(null);
  }
}
