Nodejs cluster 模块
cluster 和 child_process 模块子进程的区别
child_process 执行 shell 命令、利用多进程执行代码
cluster 通过多进程 master、worker 实现多个 HTTP 应用服务器架构
总结写前面
cluster 模块是 node 利用多进程处理网络连接的应用架构
cluster 通过进程 IPC 通道共享主进程的 server handle 句柄创建 socket 文件描述符 实现子进程共同监听同一端口
cluster 在 http 网络请求中采用 RoundRobin 轮询的负载均衡方式对 woker 进行调度
框架图
多进程通信,子进程监听同一端口为什么不冲突
不同进程之间的 server 通过 IPC 通道共享监听某个端口的 socket 连接句柄来解决冲突。
// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()
const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
const worker = fork('worker.js')
worker.send('server', netServer)
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}
// worker.js
const http = require('http')
const server = http.createServer((req, res) => { // this.on('connection', connectionListener);
res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})
let _handle
process.on('message', (msg, handle) => {
if (msg !== 'server') return
_handle = handle
_handle.on('connection', (socket) => { // _http_server.js 中实现, this.on('connection', connectionListener)
server.emit('connection', socket); // 与子进程 server 共享 socket 处理连接后执行子进程 http.createServer 的 callback
})
})
server 共享 socket 过程
看下 createServer
的处理过程就可以知道 server.emit('connection', socket);
是如何共享 socket 并何时触发 createServer
回调对用户进行响应的
-
_http_server 创建 creatServer
Server 绑定 request 事件处理 (req, res) => {} 函数返回响应 和 connection 事件 共享 socket。
// Server 构造函数
function Server {
...
if (requestListener) {
this.on('request', requestListener);
}
this.on('connection', connectionListener);
}
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
function connectionListenerInternal(server, socket) {
// ...
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
// ...
res.detachSocket(socket); // 关闭 socket
// ...
}
function parserOnIncoming(server, socket, state, req, keepAlive) {
// ...
res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));
if (req.headers.expect !== undefined &&
(req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
// ...
} else {
server.emit('request', req, res);
}
}
cluster 源码
// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`Master 进程 ${process.pid} 正在运行`);
for (let i = 0; i < 1; i++) { // 衍生工作进程。
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
console.log(`Worker 进程 ${process.pid} 已启动`);
}
主进程创建子进程 cluster fork
createWorkerProcess 通过 child_process fork 创建子进程源码
在主进程中 cluster.fork 通过 child_process fork 创建子进程
function createWorkerProcess(id, env) {
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
在子进程中的 http.createServer
当在子进程通过 createServer 并 listen 端口时,net 模块会根据 isMaster 来当前进程是主进程直接监听端口,当前进程是子进程则通过 IPC 通道获取 master 进程的服务器(server)句柄(handle),并监听(listen)它。
listenInCluster
http.createServer().listen(port) 会执行 net 模块的 Server.prototype.listen 方法 调用 listenInCluster,在此方法中根据 isMaster 判断,子进程时通过 cluster 模块 cluster._getServer 方法与 master 建立 IPC 通道获取 master 中 创建 server 的 handle 并在子进程代码中进行 listen。
listenInCluster 源码
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 通过 IPC 通道获取 master server 的 handle 进行监听
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
server._handle = handle; // 重用 master handle
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
子进程 cluster._getServer
子进程中 _getServer 向 master 通过 IPC 发送 node 内部消息为 act: 'queryServer ' 的通信获取 master handle
子进程 cluster._getServer
// lib/internal/cluster/master.js
// obj 在 http 请求 TCP 连接中是 net 的 Server 实例,UDP 连接是 dgram 的 Socket 实例
cluster._getServer = function(obj, options, cb) {
...
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
if (obj._getServerData)
message.data = obj._getServerData();
// 向 master 发送 querServer 消息
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket. UDP 连接处理方式
else
rr(reply, indexesKey, cb); // Round-robin. TCP 连接 rr 模式
});
obj.once('listening', () => {
...
}
}
主进程 master 中 queryServer
// master 监听内部消息
function onmessage(message, handle) {
const worker = this;
if (message.act === 'online')
online(worker);
else if (message.act === 'queryServer')
queryServer(worker, message); // queryServer
else if (message.act === 'listening')
listening(worker, message);
else if (message.act === 'exitedAfterDisconnect')
exitedAfterDisconnect(worker, message);
else if (message.act === 'close')
close(worker, message);
}
queryServer 当不存在 RoundRobinHandle 实例时会创建一个, 通过 RoundRobinHandle 原型 add 方法添加 woker 到 实例 this.all 属性中,用来进行 master 对 worker 的负载均衡策略。
function queryServer(worker, message) {
...
let handle = handles.get(key);
// 创建 TCP RoundRobinHandle rr 实例, master 逻辑
if (handle === undefined) {
...
let constructor = RoundRobinHandle;
handle = new constructor(key, address, message);
handles.set(key, handle);
}
...
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
// handle.add 后去执行子进程 queryServe 的 cb,告知采用 UDP 或 TCP
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
RoundRobinHandle 实例创建
创建 server,重写 server._handle 句柄的 onconnection 改用轮询的方式分发句柄给子进程处理
在 master 进程中会接收、传递请求给 worker 处理,RoundRobinHandle 的作用就是用来对 woker 进行分发、任务交接、调度的负载均衡策略,同时进程间共享的 TCP server handle 是在 RoundRobinHandle 实例创建时生成的。
- RoundRobinHandle 实例创建,重写 server._handle.onconnection 处理请求
通过传入无操作的回调用 net.createServer 创建 server 并监听端口,通过对 server.once('listening') 的监听重写 this.server._handle 的 onconnection,当 server 的 handle 遇到 connection 事件时将会使用 RoundRobinHandle 实例的 distribute 进行 handle 的分发
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map(); // 所有的 woker
this.free = new Map(); // 空闲可用的 woker
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail); // assert.fail typeof Function, 这里给了个没用的 onconnection 回调用来生成 server
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) { // fd: undefined, port: 9000
this.server.listen({ // 监听 address port, 触发 listening 事件
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
// 在调用 server.listen() 后绑定服务器时触发。
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改写 net.Server onconnection
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle 轮询分配策略
RoundRobinHandle 通过轮询分配 handle 给 woker 的负载策略共享 handle 的 socket 解决子进程共同监听一个端口处理请求。
最后就回到文章中最开始 server 共享 socket 过程 中触发 createServer((req, res) => {}) 回调的内容。
参考
源码分析
cluster-base
cluster 模块的主要功能实现
egg-cluster 模块的实现
cluster 模块是用来处理网络连接的多进程模块,egg-cluster 通过 cluster 模块对 egg 进行多进程管理的基础模块
在 egg-cluster 中:
master 主进程类似守护进程在后台执行
agent 是由 child_process 模块 fork 创建,当 master 退出时会优雅的退出 agent 进程(防止变为孤儿进程被系统 init 收养 parentId: 0)
woker 是由 cluster 模块 fork 创建,用来处理 http 请求
可以参考文章 egg-cluster
发表评论 (审核通过后显示评论):