import { GroupedObservable, Observable, OperatorFunction, ReplaySubject, Subject } from 'rxjs';

export const seriesGroupBy =
  <T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>> =>
    (source: Observable<T>): Observable<GroupedObservable<K, T>> => {
      let currentKey = null;
      let group$: any = null;

      return new Observable(observer =>
        source.subscribe({
          next: value => {
            const key = keySelector(value);
            if (key !== currentKey) {
              if (group$) {
                group$.complete();
              }

              currentKey = key;
              group$ = new ReplaySubject<T>();
              group$.key = key;

              observer.next(group$);
            }

            group$.next(value);
          },
          error: err => {
            group$?.error(err);
            observer.error(err);
          },
          complete: () => {
            group$?.complete();
            observer.complete();
          },
        })
      );
    };
