'use strict';
const debug = require('debug')('egg:util:messenger:local');
const is = require('is-type-of');
const EventEmitter = require('events');
/**
* Communication between app worker and agent worker with EventEmitter
*/
class Messenger extends EventEmitter {
constructor(egg) {
super();
this.egg = egg;
}
/**
* Send message to all agent and app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
broadcast(action, data) {
debug('[%s] broadcast %s with %j', this.pid, action, data);
this.send(action, data, 'both');
return this;
}
/**
* send message to the specified process
* Notice: in single process mode, it only can send to self process,
* and it will send to both agent and app's messengers.
* @param {String} pid - the process id of the receiver
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendTo(pid, action, data) {
debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
if (pid !== process.pid) return this;
this.send(action, data, 'both');
return this;
}
/**
* send message to one worker by random
* Notice: in single process mode, we only start one agent worker and one app worker
* - if it's running in agent, it will send to one of app workers
* - if it's running in app, it will send to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendRandom(action, data) {
debug('[%s] send %s with %j to opposite', this.pid, action, data);
this.send(action, data, 'opposite');
return this;
}
/**
* send message to app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToApp(action, data) {
debug('[%s] send %s with %j to all app', this.pid, action, data);
this.send(action, data, 'application');
return this;
}
/**
* send message to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToAgent(action, data) {
debug('[%s] send %s with %j to all agent', this.pid, action, data);
this.send(action, data, 'agent');
return this;
}
/**
* @param {String} action - message key
* @param {Object} data - message value
* @param {String} to - let master know how to send message
* @return {Messenger} this
*/
send(action, data, to) {
// use nextTick to keep it async as IPC messenger
process.nextTick(() => {
const { egg } = this;
let application;
let agent;
let opposite;
if (egg.type === 'application') {
application = egg;
agent = egg.agent;
opposite = agent;
} else {
agent = egg;
application = egg.application;
opposite = application;
}
if (!to) to = egg.type === 'application' ? 'agent' : 'application';
if (application && application.messenger && (to === 'application' || to === 'both')) {
application.messenger._onMessage({ action, data });
}
if (agent && agent.messenger && (to === 'agent' || to === 'both')) {
agent.messenger._onMessage({ action, data });
}
if (opposite && opposite.messenger && to === 'opposite') {
opposite.messenger._onMessage({ action, data });
}
});
return this;
}
_onMessage(message) {
if (message && is.string(message.action)) {
debug('[%s] got message %s with %j', this.pid, message.action, message.data);
this.emit(message.action, message.data);
}
}
close() {
this.removeAllListeners();
}
/**
* @method Messenger#on
* @param {String} action - message key
* @param {Object} data - message value
*/
}
module.exports = Messenger;