import Ably from 'ably';
import * as Sentry from '@sentry/react';
import Queue from 'src/utils/Queue';
import edgeDataApi from 'src/apis/edgeDataApi';


/*
 Solution 1: Queuing
  Either:
    inforce order of operations, and enforce one done before next begins
    Add timeout to each request
 Solution 2: Batching
  Take in connection / disconnection requests
  Batch for X ms, and ignore everything except the most recent batch req.
  if connect -> connect: connect
  if connect -> disconnect: disconnect
  if connect -> disconnect -> connect: connect
*/


// Will be called internally by Ably.Realtime whenever it needs to authenticate. Get sigend tokenRequest from our server, and feed to ably via callback().
const __fetchRequestToken = async (tokenParams, callback) => {
  let err = null;
  let tokenRequest = null;
  try {
    const { data } = await edgeDataApi.post('/stream/auth', {});
    tokenRequest = data;

    if (!tokenRequest || !tokenRequest?.timestamp) {
      tokenRequest = null;
      err = Error('Empty token request');
    }
  } catch (error) {
    console.log(error);
    err = error;
  }

  callback(err, tokenRequest);
};

/**
 * Singleton to manage All ably connections. Supports multiplexing on handlerIds, and will
 * open/close connections only as needed.
 **/
const AblyService = {

  realtime: new Ably.Realtime({
    authCallback: __fetchRequestToken,
    autoConnect: false,
    transportParams: { heartbeatInterval: 60_000 } // Maybe this helps us beat the chrome background tab problem.
  }),
  connected: false,
  initialized: false,
  /** Maps Connections (subscriptions) to Handlers (clients), in a One-to-Many relationship */
  subscriptionItems: new Map(),
  /** Map of channel->callback events */
  channelLifecycleCallbacks: new Map(),
  /** Ensure the order of events is maintained. Prevents some multiplex bugs from occuring. */
  queue: new Queue(),
  /** True if the queue is currently clearing */
  processingQueue: false,


  /**
   * Every internal command will be placed on the queue.
   * Whenever an item is placed on the queue, it will IMMEDIATELY begin processing.
   * Commands enqueued during processing will be processed after the current command is finished.
   * @param {function} command - The async operation to queue
   * @param {*} args - The arguments to pass to the command
   **/
  enqueueCommand: async function (command, args) {
    this.queue.enqueue({ command, args });

    if (!this.processingQueue) {
      // console.log('[ablyQueue] START PROCESSING QUEUE', this.queue.length);
      this.processingQueue = true;
      while (this.queue.length) {
        try {
          const queueItem = this.queue.dequeue();
          const results = await queueItem.command.apply(this, queueItem.args);
          if (results) {
            // console.log(`[ablyQueue] Dequeued ${queueItem.command.name}\nargs: ${JSON.stringify(queueItem.args)} \nresults: ${results.join(', ')}`);
          }
        } catch (err) {
          Sentry.captureException(err);
        }
      }
      this.processingQueue = false;
      // console.log(`[ablyQueue] END PROCESSING QUEUE, TIME: ${+new Date() - processTimeStart}ms`);
    }
  },


  /**
   * Opens the connection.
   * Pass single or array of channelCfgs, which contain the channel name(s) and the callback(s) to be fired.
   * Will initialize Ably if not already done so.
   *
   * A channel can have multiple handlers. All handlers within a channel will be called when that channel recieves a message.
   *
   * If a channel is opened for the first time, (it has no active handlers yet), optional onOpenChannel will be called.
   * If a channel is closed, (it has no more handlers), optional onCloseChannel will be called.
   * If a channel already exists (it has some handlers), then the new handler will be appended to it. It will not open a new channel.
   *
   *
   * channel: {
   *  channelName: 'charts:AAPL',
   *  channelLifecycleCallbacks: { onOpenChannel() => {},  onCloseChannel() => {}},
   *  handlers: [
   *     { handlerID: 'xddd', onMessage: () => {}, onPresence: () => {}}
   *     { handlerID: 'dgfsa', onMessage: () => {}}
   *  ]
   * }
  */
  openConnection: function (channelName, handler) {
    if (!channelName) {
      throw new Error(`[ablyService:openConnection ${+ new Date()}] No channel name to open`);
    } else if (!handler) {
      throw new Error(`[ablyService:openConnection ${+ new Date()}] Handlers for '${channelName}' invalid`);
    } else if (!handler.handlerId) {
      throw new Error(`[ablyService:openConnection ${+ new Date()}] Handler for '${channelName}' does not have ID`);
    } else if (!handler?.onMessage && !handler?.onPresence) {
      throw new Error(`[ablyService:openConnection ${+ new Date()}] Handler '${handler.handlerId}' for '${channelName}' does not have callback`);
    }

    void this.enqueueCommand(
      this.__openConnection,
      [channelName, handler]
    );
  },


  // Remove a single handler. Will close entire channel if no handlers left.
  closeHandler: function (handlerId) {
    void this.enqueueCommand(
      this.__closeHandler,
      [handlerId]
    );
  },


  // Close a channel. Will close entire connection if no channels left
  closeChannel: function (channelName) {
    void this.enqueueCommand(
      this.__closeChannel,
      [channelName]
    );
  },


  /* Will only be called once on each channel. Is not called per-handler.
   * Can be accidentally rewritten if you call this function with different callbacks on the same handler.
   * If the channel is already open without a handler, and you set an onOpen, it will never be called.
   **/
  setChannelLifecycleCallbacks: function (channel, { onOpenChannel, onCloseChannel }) {
    this.channelLifecycleCallbacks.set(channel, { onOpenChannel, onCloseChannel });
  },


  //////////////////// PRIVATE METHODS ////////////////////

  /* Because this action is within the queue, we shouldn't have any double-connection issues.
     Theoretically if 3 calls are made to open, the first one will connect, and the others will
     await it and then skip connecting.
  */
  __openConnection: async function (channelName, handler) {
    return new Promise(resolve => {
      const results = [];
      if (!this.connected) {
        this.realtime.connection.connect();

        if (!this.initialized) {
          this.initialized = true;
          this.realtime.connection.on(stateChange => {
            console.log(`[ablyService:stateChange ${+ new Date()}] New connection state is [${stateChange.current}]`);
          });
          results.push('Initialized');
        }

        this.realtime.connection.once('connected', () => {
          this.connected = true;
          results.push('Connection Established');
          this.__connectToChannel(channelName, handler).then(connectionResults => {
            resolve([...results, ...connectionResults]);
          });
        });
      } else {
        this.__connectToChannel(channelName, handler).then(connectionResults => {
          resolve([...results, ...connectionResults]);
        });
      }
    });
  },


  __connectToChannel: function (channelName, handler) {
    return new Promise(resolve => {
      let subscriptionItem = this.subscriptionItems.get(channelName);

      // If we don't have a channel for this handler, create and subscribe
      if (!subscriptionItem) {
        this.subscriptionItems.set(channelName, {
          handlers: [handler]
        });

        const channel = this.realtime.channels.get(channelName);
        subscriptionItem = this.subscriptionItems.get(channelName);

        const channelLifecycleCallback = this.channelLifecycleCallbacks.get(channelName);
        if (channelLifecycleCallback && !!channelLifecycleCallback.onOpenChannel) {
          channelLifecycleCallback.onOpenChannel(subscriptionItem);
        }

        try {
          this.__enter_presence_if_callback(channel, handler);
        } catch (err) {
          // empty
        }

        channel.subscribe(message => {
          // 10/14/24 - News stream is now automatically decoding the data...
          let data = null;
          try {
            // regular
            data = JSON.parse(message.data);
          } catch (err) {
            // news stream
            data = message.data;
          }

          subscriptionItem.handlers.forEach(({ onMessage }) => {
            !!onMessage && onMessage(data, message.name, subscriptionItem);
          });
        });

        return resolve(['Create Subscription', 'Add Handler']);
      }

      const handlerItemIndex = subscriptionItem.handlers.findIndex(h => h.handlerId === handler.handlerId);

      // if we already have a channel, add this handler to that channel.
      if (handlerItemIndex === -1) {
        subscriptionItem.handlers.push(handler);
        return resolve(['Add Handler']);
      }

      // If we already have a channel, and this handlerId is a duplicate, rebind the handler.
      subscriptionItem.handlers[handlerItemIndex] = handler;

      return resolve(['Rebind Handler']);
    });
  },


  __enter_presence_if_callback: function (channel, handler) {
    if (handler?.onPresence) {
      channel.presence.enter((err) => {
        if (err) {
          console.warn(`[ablyService:__connectToChannel ${+ new Date()}] presence.enter() error`);
          // console.error(err);
        } else {
          handler.onPresence()
        }
      });
    }
  },


  __closeHandler: async function (handlerId) {
    const results = [];
    for (const channelName of this.subscriptionItems.keys()) {
      const subscriptionItem = this.subscriptionItems.get(channelName);
      const handlerIdx = subscriptionItem.handlers.findIndex(h => h.handlerId === handlerId);
      if (handlerIdx !== -1) {
        results.push('Closed handler');
        subscriptionItem.handlers.splice(handlerIdx, 1);
        if (!subscriptionItem.handlers.length) {
          const closeResults = await this.__closeConnection(channelName);
          if (closeResults && closeResults.length) {
            results.push(...closeResults);
          }
        }
      }
    }
    return results;
  },


  // Close a channel. Will close entire connection if no channels left
  __closeChannel: async function (channelName) {
    const subscriptionItem = this.subscriptionItems.get(channelName);
    if (subscriptionItem) {
      return await this.__closeConnection(channelName);
    }
  },


  __closeConnection: function (channelName) {
    return new Promise((resolve, reject) => {
      const results = [];

      const subscriptionItem = this.subscriptionItems.get(channelName);
      if (!subscriptionItem) {
        return reject('Subscription does not exist');
      }

      const channel = this.realtime.channels.get(channelName);
      channel.unsubscribe();
      channel.detach((err) => {
        if (err) {
          // We never seem to get useful errors here. Just going to batch them.
          Sentry.withScope(scope => {
            scope.setTransactionName('AblyService.__closeConnection channel.detach() Error');
            scope.setLevel('warning')
            try {
              const msg = err?.message.toLowerCase();
              if (!msg.includes('connection closed') && !msg.includes('already detached')) {
                const message = `${err?.message}, status:${err?.statusCode}, code:${err?.code}, cause:${err?.cause}`
                Sentry.captureException(new Error(message), 'warning');
              }
            } catch (err) {
              Sentry.captureException(new Error('Ably failed to __closeConnection'));
            } finally {
              resolve(results);
            }
          })
        } else {
          const channelLifecycleCallback = this.channelLifecycleCallbacks.get(channelName);
          if (channelLifecycleCallback && !!channelLifecycleCallback.onCloseChannel) {
            channelLifecycleCallback.onCloseChannel(subscriptionItem);
          }

          if (!this.subscriptionItems.size) {
            this.realtime.connection.close();
            this.connected = false;
            results.push('Closed Ably Completely');
          }

          this.subscriptionItems.delete(channelName);
          results.push('Closed channel');
          return resolve(results);
        }
      });
    });
  },
};

export default AblyService;


