lib/core/messenger/ipc.js

'use strict';

const debug = require('debug')('egg:util:messenger:ipc');
const is = require('is-type-of');
const sendmessage = require('sendmessage');
const EventEmitter = require('events');

/**
 * Communication between app worker and agent worker by IPC channel
 */
class Messenger extends EventEmitter {

  constructor() {
    super();
    this.pid = String(process.pid);
    // pids of agent or app maneged by master
    // - retrieve app worker pids when it's an agent worker
    // - retrieve agent worker pids when it's an app worker
    this.opids = [];
    this.on('egg-pids', pids => {
      this.opids = pids;
    });
    this._onMessage = this._onMessage.bind(this);
    process.on('message', this._onMessage);
  }

  /**
   * Send message to all agent and app
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  broadcast(action, data) {
    debug('[%s] broadcast %s with %j', this.pid, action, data);
    this.send(action, data, 'app');
    this.send(action, data, 'agent');
    return this;
  }

  /**
   * send message to the specified process
   * @param {String} pid - the process id of the receiver
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendTo(pid, action, data) {
    debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
    sendmessage(process, {
      action,
      data,
      receiverPid: String(pid),
    });
    return this;
  }

  /**
   * send message to one app worker by random
   * - if it's running in agent, it will send to one of app workers
   * - if it's running in app, it will send to agent
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendRandom(action, data) {
    /* istanbul ignore if */
    if (!this.opids.length) return this;
    const pid = random(this.opids);
    this.sendTo(String(pid), action, data);
    return this;
  }

  /**
   * send message to app
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendToApp(action, data) {
    debug('[%s] send %s with %j to all app', this.pid, action, data);
    this.send(action, data, 'app');
    return this;
  }

  /**
   * send message to agent
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendToAgent(action, data) {
    debug('[%s] send %s with %j to all agent', this.pid, action, data);
    this.send(action, data, 'agent');
    return this;
  }

  /**
   * @param {String} action - message key
   * @param {Object} data - message value
   * @param {String} to - let master know how to send message
   * @return {Messenger} this
   */
  send(action, data, to) {
    sendmessage(process, {
      action,
      data,
      to,
    });
    return this;
  }

  _onMessage(message) {
    if (message && is.string(message.action)) {
      debug('[%s] got message %s with %j, receiverPid: %s',
        this.pid, message.action, message.data, message.receiverPid);
      this.emit(message.action, message.data);
    }
  }

  close() {
    process.removeListener('message', this._onMessage);
    this.removeAllListeners();
  }

  /**
   * @method Messenger#on
   * @param {String} action - message key
   * @param {Object} data - message value
   */
}

module.exports = Messenger;

function random(arr) {
  const index = Math.floor(Math.random() * arr.length);
  return arr[index];
}