import {
    ChangeDetectorRef,
    EventEmitter,
    OnDestroy,
    Pipe,
    PipeTransform,
    ɵisObservable as isObservable,
} from '@angular/core'
import {
    Observable,
    Subscribable,
    SubscriptionLike,
} from 'rxjs'

export interface StreamConfig {
    default: any
}

@Pipe({
    name: 'stream',
    pure: false,
})
export class StreamPipe implements PipeTransform, OnDestroy {
    private latestValue: any = null

    private subscription: SubscriptionLike
    private source: Subscribable<any> | EventEmitter<any> | null = null

    constructor(
        private changeDetector: ChangeDetectorRef,
    ) {}

    transform<T>(data$: Observable<T> | Subscribable<T>, config?: StreamConfig): T | null
    transform<T>(data$: null | undefined, preload?: any): null
    transform<T>(data$: Observable<T> | Subscribable<T> | null | undefined, config?: StreamConfig): T | null
    transform<T>(data$: Observable<T> | Subscribable<T> | null | undefined, config?: StreamConfig): T | null {
        if (!this.source) {
            if (isObservable(data$) && data$['value'] !== undefined) {
                this.latestValue = data$['value']
            } else {
                this.latestValue = this.latestValue ?? config?.default ?? null
            }

            if (data$) {
                this.subscribe(data$)
            }
            return this.latestValue
        }

        if (data$ !== this.source) {
            this.cleanUpResources()
            return this.transform(data$ as any)
        }
        return this.latestValue
    }

    ngOnDestroy(): void {
        if (this.subscription) {
            this.cleanUpResources()
        }
    }

    private subscribe<T>(data$: Observable<T> | Subscribable<T> | null | undefined): void {
        this.source = data$
        if (!isObservable(data$)) {
            throw Error(`InvalidPipeArgument: '${data$}' for pipe stream`)
        }

        this.subscription = data$.subscribe({
            next: (value) => {
                if (data$ === this.source) {
                    this.latestValue = value
                    if (!this.changeDetector['destroyed']) {
                        this.changeDetector.markForCheck()
                    }

                }
            },
            error: (e: any) => {
                throw e
            },
        })
    }

    private cleanUpResources(): void {
        this.subscription?.unsubscribe()
        this.latestValue = null
        this.subscription = null
        this.source = null
    }
}
