Nodejs cluster 模块

cluster 和 child_process 模块子进程的区别

child_process 执行 shell 命令、利用多进程执行代码
cluster 通过多进程 master、worker 实现多个 HTTP 应用服务器架构

总结写前面

cluster 模块是 node 利用多进程处理网络连接的应用架构
cluster 通过进程 IPC 通道共享主进程的 server handle 句柄创建 socket 文件描述符 实现子进程共同监听同一端口
cluster 在 http 网络请求中采用 RoundRobin 轮询的负载均衡方式对 woker 进行调度

框架图

http createServer 时 child 通过 IPC 通道获取 master 的 server.handle 流程

多进程通信,子进程监听同一端口为什么不冲突

不同进程之间的 server 通过 IPC 通道共享监听某个端口的 socket 连接句柄来解决冲突。

// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()

const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
  const worker = fork('worker.js')
  worker.send('server', netServer)
  console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}

// worker.js
const http = require('http')

const server = http.createServer((req, res) => { //   this.on('connection', connectionListener);
  res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})

let _handle
process.on('message', (msg, handle) => {
  if (msg !== 'server') return
  _handle = handle
  _handle.on('connection', (socket) => { // _http_server.js 中实现, this.on('connection', connectionListener)
    server.emit('connection', socket); // 与子进程 server 共享 socket 处理连接后执行子进程 http.createServer 的 callback
  })
})

server 共享 socket 过程

看下 createServer 的处理过程就可以知道 server.emit('connection', socket); 是如何共享 socket 并何时触发 createServer 回调对用户进行响应的

// Server 构造函数
function Server {
 ... 
  if (requestListener) {
    this.on('request', requestListener);
  }

  this.on('connection', connectionListener);
}
function connectionListener(socket) {
  defaultTriggerAsyncIdScope(
    getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  );
}
function connectionListenerInternal(server, socket) {
// ...
  parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
  // ...
  res.detachSocket(socket); // 关闭 socket
  // ...
}

function parserOnIncoming(server, socket, state, req, keepAlive) {
  // ...
  res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));

  if (req.headers.expect !== undefined &&
      (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
  // ...
  } else {
    server.emit('request', req, res);
  }
}

cluster 源码

// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master 进程 ${process.pid} 正在运行`);

  for (let i = 0; i < 1; i++) { // 衍生工作进程。
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
  http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
  console.log(`Worker 进程 ${process.pid} 已启动`);
}

主进程创建子进程 cluster fork

createWorkerProcess 通过 child_process fork 创建子进程源码
在主进程中 cluster.fork 通过 child_process fork 创建子进程

function createWorkerProcess(id, env) {
// ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

在子进程中的 http.createServer

当在子进程通过 createServer 并 listen 端口时,net 模块会根据 isMaster 来当前进程是主进程直接监听端口,当前进程是子进程则通过 IPC 通道获取 master 进程的服务器(server)句柄(handle),并监听(listen)它。

http createServer 时 child 通过 IPC 通道获取 master 的 server.handle 流程

listenInCluster

http.createServer().listen(port) 会执行 net 模块的 Server.prototype.listen 方法 调用 listenInCluster,在此方法中根据 isMaster 判断,子进程时通过 cluster 模块 cluster._getServer 方法与 master 建立 IPC 通道获取 master 中 创建 server 的 handle 并在子进程代码中进行 listen。
listenInCluster 源码

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // 通过 IPC 通道获取 master server 的 handle 进行监听
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    server._handle = handle; // 重用 master handle

    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

子进程 cluster._getServer

子进程中 _getServer 向 master 通过 IPC 发送 node 内部消息为 act: 'queryServer ' 的通信获取 master handle
子进程 cluster._getServer

// lib/internal/cluster/master.js
// obj 在 http 请求 TCP 连接中是 net 的 Server 实例,UDP 连接是 dgram 的 Socket 实例
cluster._getServer = function(obj, options, cb) {
  ...
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  if (obj._getServerData)
    message.data = obj._getServerData();
  // 向 master 发送 querServer 消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket. UDP 连接处理方式
    else
      rr(reply, indexesKey, cb);              // Round-robin. TCP 连接 rr 模式
  });

  obj.once('listening', () => {
    ...
  }
}

主进程 master 中 queryServer

// master 监听内部消息
function onmessage(message, handle) {
  const worker = this;

  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message); // queryServer
  else if (message.act === 'listening')
    listening(worker, message);
  else if (message.act === 'exitedAfterDisconnect')
    exitedAfterDisconnect(worker, message);
  else if (message.act === 'close')
    close(worker, message);
}

queryServer 当不存在 RoundRobinHandle 实例时会创建一个, 通过 RoundRobinHandle 原型 add 方法添加 woker 到 实例 this.all 属性中,用来进行 master 对 worker 的负载均衡策略。

function queryServer(worker, message) {
  ...
  let handle = handles.get(key);
  // 创建 TCP RoundRobinHandle rr 实例, master 逻辑
  if (handle === undefined) {
    ...
    let constructor = RoundRobinHandle;
    handle = new constructor(key, address, message);
    handles.set(key, handle);
  }
  ...
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.
     // handle.add 后去执行子进程 queryServe 的 cb,告知采用 UDP 或 TCP
    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}

RoundRobinHandle 实例创建

创建 server,重写 server._handle 句柄的 onconnection 改用轮询的方式分发句柄给子进程处理

在 master 进程中会接收、传递请求给 worker 处理,RoundRobinHandle 的作用就是用来对 woker 进行分发、任务交接、调度的负载均衡策略,同时进程间共享的 TCP server handle 是在 RoundRobinHandle 实例创建时生成的。

  • RoundRobinHandle 实例创建,重写 server._handle.onconnection 处理请求
    通过传入无操作的回调用 net.createServer 创建 server 并监听端口,通过对 server.once('listening') 的监听重写 this.server._handle 的 onconnection,当 server 的 handle 遇到 connection 事件时将会使用 RoundRobinHandle 实例的 distribute 进行 handle 的分发
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new Map(); // 所有的 woker 
  this.free = new Map(); // 空闲可用的 woker
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail); // assert.fail typeof Function, 这里给了个没用的 onconnection 回调用来生成 server

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) { // fd: undefined, port: 9000
    this.server.listen({ // 监听 address port, 触发 listening 事件
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.
  // 在调用 server.listen() 后绑定服务器时触发。
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改写 net.Server onconnection
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle 轮询分配策略

RoundRobinHandle 通过轮询分配 handle 给 woker 的负载策略共享 handle 的 socket 解决子进程共同监听一个端口处理请求。



最后就回到文章中最开始 server 共享 socket 过程 中触发 createServer((req, res) => {}) 回调的内容。

参考

源码分析
cluster-base
cluster 模块的主要功能实现

egg-cluster 模块的实现

cluster 模块是用来处理网络连接的多进程模块,egg-cluster 通过 cluster 模块对 egg 进行多进程管理的基础模块
在 egg-cluster 中:
master 主进程类似守护进程在后台执行
agent 是由 child_process 模块 fork 创建,当 master 退出时会优雅的退出 agent 进程(防止变为孤儿进程被系统 init 收养 parentId: 0)
woker 是由 cluster 模块 fork 创建,用来处理 http 请求
可以参考文章 egg-cluster

本文章由javascript技术分享原创和收集

发表评论 (审核通过后显示评论):