'use strict';
const debug = require('debug')('egg:util:messenger:ipc');
const is = require('is-type-of');
const sendmessage = require('sendmessage');
const EventEmitter = require('events');
/**
* Communication between app worker and agent worker by IPC channel
*/
class Messenger extends EventEmitter {
constructor() {
super();
this.pid = String(process.pid);
// pids of agent or app maneged by master
// - retrieve app worker pids when it's an agent worker
// - retrieve agent worker pids when it's an app worker
this.opids = [];
this.on('egg-pids', pids => {
this.opids = pids;
});
this._onMessage = this._onMessage.bind(this);
process.on('message', this._onMessage);
}
/**
* Send message to all agent and app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
broadcast(action, data) {
debug('[%s] broadcast %s with %j', this.pid, action, data);
this.send(action, data, 'app');
this.send(action, data, 'agent');
return this;
}
/**
* send message to the specified process
* @param {String} pid - the process id of the receiver
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendTo(pid, action, data) {
debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
sendmessage(process, {
action,
data,
receiverPid: String(pid),
});
return this;
}
/**
* send message to one app worker by random
* - if it's running in agent, it will send to one of app workers
* - if it's running in app, it will send to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendRandom(action, data) {
/* istanbul ignore if */
if (!this.opids.length) return this;
const pid = random(this.opids);
this.sendTo(String(pid), action, data);
return this;
}
/**
* send message to app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToApp(action, data) {
debug('[%s] send %s with %j to all app', this.pid, action, data);
this.send(action, data, 'app');
return this;
}
/**
* send message to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToAgent(action, data) {
debug('[%s] send %s with %j to all agent', this.pid, action, data);
this.send(action, data, 'agent');
return this;
}
/**
* @param {String} action - message key
* @param {Object} data - message value
* @param {String} to - let master know how to send message
* @return {Messenger} this
*/
send(action, data, to) {
sendmessage(process, {
action,
data,
to,
});
return this;
}
_onMessage(message) {
if (message && is.string(message.action)) {
debug('[%s] got message %s with %j, receiverPid: %s',
this.pid, message.action, message.data, message.receiverPid);
this.emit(message.action, message.data);
}
}
close() {
process.removeListener('message', this._onMessage);
this.removeAllListeners();
}
/**
* @method Messenger#on
* @param {String} action - message key
* @param {Object} data - message value
*/
}
module.exports = Messenger;
function random(arr) {
const index = Math.floor(Math.random() * arr.length);
return arr[index];
}