import { Injectable } from '@angular/core';
import { defer, Observable, combineLatest, NEVER } from 'rxjs';
import { switchMap, map, shareReplay } from 'rxjs/operators';
import { RpcService } from './rpc.service';
import { retryAfter } from 'shared/rxjs';

@Injectable({ providedIn: 'root' })
export class OnlineUsersService {
  contextObservers = new Map<string, Observable<Set<string>>>();

  constructor(private rpc: RpcService) { }

  observeContext(context: string) {
    if (!this.contextObservers.has(context)) {
      this.contextObservers.set(
        context,
        this.rpc.isConnected$.pipe(switchMap(status => {
          if (status) {
            return defer(async () => this.rpc.online.observeContext(context)).pipe(
              switchMap(obs => obs),
              map(users => new Set(users)),
            );
          } else {
            return NEVER;
          }
        }), shareReplay({ refCount: true, bufferSize: 1 }))
      );
    }

    return this.contextObservers.get(context)!;
  }
}
