import { Injectable, NgZone } from '@angular/core';
import { KeycloakService } from 'keycloak-angular';
import qs from 'qs';
import { Observable, from, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import EventSource from 'sse-events';

@Injectable({
    providedIn: 'root'
})
export class SSEService {

    constructor(private keycloakService: KeycloakService, private zone: NgZone) { }

    /**
     * Create an event source of POST request
     * @param API url 
     * @formData data (file, ...etc.)
     */
    // public getEventSourceWithPost(url: string, formData: FormData): Observable<any> {
    //     throw new Error('Method not implemented.');
    // }

    /**
    * Create an event source of GET request
    * @param API url 
    * @payload data
    */
    public getEventSourceWithGet(url: string): Observable<any> {
        return this.newEventSource('GET', url);
    }

    public subscribeEvents(url, eventTypes = [], filters?: object): Observable<any> {
        return this.newEventSource('GET', url, { eventTypes, filters });
    }


    private newEventSource(method: string, url: string, params = {}): Observable<any> {
        const query = qs.stringify(params);

        return from(this.keycloakService.getToken()).pipe(
            mergeMap(token => of({
                method,
                headers: {
                    Authorization: `Bearer ${token}`
                }
            })),
            mergeMap(options => new Observable(observer => {
                const sse = new EventSource({
                    url: `${url}?${query}`,
                    options: {
                        headers: options.headers
                    },
                    serverErrorCodes: [502, 503, 504, 403],
                    retryOnServerError: true,
                    reconnectInterval: 500,
                    retryOnNetworkError: true
                });

                // eslint-disable-next-line no-console
                this.addEventListener(sse, 'init');
                // eslint-disable-next-line no-console
                this.addEventListener(sse, 'ping');

                params['eventTypes'].forEach(event => {
                    this.addEventListener(sse, event, message => {
                        this.zone.run(() => observer.next(message));
                    });
                });

                sse.open();
            }))
        );
    }

    private addEventListener(sse, event, callback?) {
        sse.addEventListener(event, message => {
            message.type = event;
            callback?.(message);
        });
    }
}
