实现功能
- 守护进程
- master/worker进程模型
- 自定义进程用户
- 自定义进程名称
- 自定义worker进程数量
- 日志记录
- 标准输入输出重定向
- master进程回调:启动回调、停止回调
- worker进程回调:启动回调、停止回调
- 异常处理,错误处理
脚本不支持在windows下运行。
多进程管理类
<?php
namespace IYUU;
use Error;
use Exception;
/**
* PHP多进程管理类
* Class worker
*/
class worker
{
/**
* worker进程数
*
* @access public
* @var int
*/
public $count = 1;
/**
* 进程编号
* - worker进程从1开始,0被master进程所使用
* @var int
*/
public $id = 0;
/**
* 进程PID
* @var int
*/
public $pid = 0;
/**
* master,worker进程启动时间戳
* @var int
*/
public $time_start = 0;
/**
* 进程用户
*
* @access public
* @var string
*/
public $user = '';
/**
* 进程名
*
* @access public
* @var string
*/
public $title = 'Worker';
/**
* 每个进程是否只运行一次
*
* @access public
* @var bool
*/
public $run_once = true;
/**
* master进程启动回调
*
* @access public
* @var null
*/
public $onMasterStart = null;
/**
* master进程停止回调
*
* @access public
* @var null
*/
public $onMasterStop = null;
/**
* worker进程启动回调
*
* @access public
* @var null
*/
public $onWorkerStart = null;
/**
* worker进程停止回调
*
* @access public
* @var null
*/
public $onWorkerStop = null;
/**
* master进程PID
* @var int
*/
protected static $_master_pid = 0;
/**
* 所有worker进程映射表
* - 进程id编号与进程pid的映射表,格式为:[[worker_id=>pid], ..]
* @var array
*/
protected static $_worker_pidMap = [];
/**
* Status starting.
* @var string
*/
const STATUS_STARTING = 'starting';
/**
* Status running.
* @var string
*/
const STATUS_RUNNING = 'running';
/**
* Status shutdown.
* @var string
*/
const STATUS_SHUTDOWN = 'shutdown';
/**
* Status reloading.
* @var string
*/
const STATUS_RELOADING = 'reload';
/**
* master,worker进程运行状态 [starting|running|shutdown|reload]
* @var string
*/
protected static $_status = self::STATUS_STARTING;
// Define OS Type
const OS_TYPE_LINUX = 'linux';
const OS_TYPE_WINDOWS = 'windows';
/**
* OS.
* @var string
*/
protected static $_OS = self::OS_TYPE_LINUX;
/**
* Start file.
* @var string
*/
protected static $_startFile = '';
/**
* Standard output stream
* @var resource
*/
protected static $_outputStream = null;
/**
* If $outputStream support decorated
* @var bool
*/
protected static $_outputDecorated = null;
/**
* Daemonize.
*
* @access public
* @var bool
*/
public static $daemonize = false;
/**
* Stdout file.
*
* @access public
* @var string
*/
public static $stdoutFile = '/dev/null';
/**
* 保存master进程PID的文件.
*
* @access public
* @var string
*/
public static $pidFile = '';
/**
* Log file.
*
* @access public
* @var mixed
*/
public static $logFile = '';
/**
* PHP built-in error types.
* @var array
*/
protected static $_errorType = array(
\E_ERROR => 'E_ERROR', // 1
\E_WARNING => 'E_WARNING', // 2
\E_PARSE => 'E_PARSE', // 4
\E_NOTICE => 'E_NOTICE', // 8
\E_CORE_ERROR => 'E_CORE_ERROR', // 16
\E_CORE_WARNING => 'E_CORE_WARNING', // 32
\E_COMPILE_ERROR => 'E_COMPILE_ERROR', // 64
\E_COMPILE_WARNING => 'E_COMPILE_WARNING', // 128
\E_USER_ERROR => 'E_USER_ERROR', // 256
\E_USER_WARNING => 'E_USER_WARNING', // 512
\E_USER_NOTICE => 'E_USER_NOTICE', // 1024
\E_STRICT => 'E_STRICT', // 2048
\E_RECOVERABLE_ERROR => 'E_RECOVERABLE_ERROR', // 4096
\E_DEPRECATED => 'E_DEPRECATED', // 8192
\E_USER_DEPRECATED => 'E_USER_DEPRECATED' // 16384
);
/**
* 构造函数 worker constructor
* @access public
*/
public function __construct()
{
static::checkSapiEnv(); // 环境检查
}
/**
* 检查运行环境.
*/
protected static function checkSapiEnv()
{
// Only for cli.
if (\PHP_SAPI !== 'cli') {
exit("Only run in command line mode \n");
}
if (\DIRECTORY_SEPARATOR === '\\') {
self::$_OS = self::OS_TYPE_WINDOWS;
}
if (static::$_OS !== self::OS_TYPE_LINUX) {
exit('OS:'.self::$_OS);
}
if(version_compare(PHP_VERSION, "7.1.0", "<"))
{
exit('php version < 7.1.0');
}
}
/**
* 运行worker实例
* @access public
* @throws Exception
*/
public function run()
{
$this->init(); // 初始化
$this->parseCommand(); // 解析命令行参数
$this->daemonize(); // 变为守护进程
$this->installSignal(); // 安装信号
$this->saveMasterPid(); // 保存master进程pid
$this->forkWorkersForLinux(); // fork若干个worker进程
$this->resetStd(); // 重定向标准输入和输出
$this->monitorWorkers(); // 监控信号、管理worker进程
}
/**
* 初始化.
*/
protected function init()
{
set_error_handler(function($code, $msg, $file, $line){
static::safeEcho("$msg in file $file on line $line\n");
});
// Start file.
$backtrace = \debug_backtrace();
static::$_startFile = $backtrace[\count($backtrace) - 1]['file'];
$unique_prefix = \str_replace('/', '_', static::$_startFile);
// Pid file.
if (empty(static::$pidFile)) {
static::$pidFile = __DIR__ . "/$unique_prefix.pid";
}
// Log file.
if (empty(static::$logFile)) {
static::$logFile = __DIR__ . '/logFile_'.date("Ymd").'.log';
}
$log_file = (string)static::$logFile;
if (!\is_file($log_file)) {
\touch($log_file);
\chmod($log_file, 0622);
}
// State.
static::$_status = static::STATUS_STARTING;
// Count
$this->count = $this->count < 1 ? 1 : $this->count;
// Process title.
}
/**
* 解析命令行参数 Parse command.
*
* @return void
*/
protected function parseCommand()
{
if (static::$_OS !== self::OS_TYPE_LINUX) {
return;
}
global $argv;
}
/**
* 变为守护进程 Run as deamon mode.
*
* @throws Exception
*/
protected function daemonize()
{
if (!static::$daemonize) {
return;
}
\umask(0);
// 第一次pcntl_fork()
$pid = \pcntl_fork();
if (-1 === $pid) {
// 创建子进程失败
throw new Exception('Fork fail');
} elseif ($pid > 0) {
// 主进程安全退出
exit(0);
}
// 子进程创建成功,设置为会话leader
if (-1 === \posix_setsid()) {
throw new Exception("Setsid fail");
}
// 第二次pcntl_fork() Fork again avoid SVR4 system regain the control of terminal.
$pid = \pcntl_fork();
if (-1 === $pid) {
// 孙子进程创建失败
throw new Exception("Fork fail");
} elseif (0 !== $pid) {
// 子进程安全退出
exit(0);
}
// 孙子进程创建成功,变为master常驻进程
}
/**
* 安装信号处理函数
*/
protected function installSignal()
{
// stop
pcntl_signal(SIGINT, array($this, 'signalHandler'), false);
// stop
pcntl_signal(SIGTERM, array($this, 'signalHandler'), false);
// reload
pcntl_signal(SIGUSR1, array($this, 'signalHandler'), false);
// status
pcntl_signal(SIGUSR2, array($this, 'signalHandler'), false);
// ignore
pcntl_signal(SIGPIPE, SIG_IGN, false);
}
/**
* 卸载信号处理函数
*/
protected function uninstallSignal()
{
// uninstall stop signal handler
pcntl_signal(SIGINT, SIG_IGN, false);
pcntl_signal(SIGTERM, SIG_IGN, false);
// uninstall reload signal handler
pcntl_signal(SIGUSR1, SIG_IGN, false);
// uninstall status signal handler
pcntl_signal(SIGUSR2, SIG_IGN, false);
}
/**
* 信号处理函数,会被其他类调用(必须为public)
*
* @access public
* @param int $signal
*/
public function signalHandler(int $signal)
{
switch ($signal) {
// stop 2
case SIGINT:
case SIGTERM:
// master进程和worker进程都会调用
$this->stop_all();
break;
// reload 30
case SIGUSR1:
echo "reload\n";
break;
// show status 31
case SIGUSR2:
echo "status\n";
break;
}
}
/**
* 保存master进程pid
* @throws Exception
*/
protected function saveMasterPid()
{
static::$_master_pid = \posix_getpid();
if (false === \file_put_contents(static::$pidFile, static::$_master_pid)) {
throw new Exception('can not save pid to ' . static::$pidFile);
}
}
/**
* 获取master进程的pid
*
* @access public
* @return int
*/
public static function getMasterPid(): int
{
return static::$_master_pid ? static::$_master_pid : (int)file_get_contents(static::$pidFile);
}
/**
* Fork若干个worker进程.
*
* @throws Exception
*/
protected function forkWorkersForLinux()
{
// master进程
$this->time_start = microtime(true);
$this->id = 0; // 0被master进程使用
$this->pid = posix_getpid();
$this->setProcessTitle($this->title." master process pid:{$this->pid} start_file=" . static::$_startFile);
// master进程启动回调
if ($this->onMasterStart) {
try {
call_user_func($this->onMasterStart, $this);
} catch (Exception $e) {
static::log($e);
// 避免快速无限循环退出.
sleep(2);
exit(250);
} catch (Error $e) {
static::log($e);
// 避免快速无限循环退出.
sleep(2);
exit(250);
}
}
// worker进程从1开始,0被master进程所使用
for ($i = 1; $i <= $this->count; $i++) {
$this->forkOneWorkerForLinux($i);
}
}
/**
* Fork若干个worker进程.
*
* @param int $worker_id
* @throws Exception
*/
protected function forkOneWorkerForLinux(int $worker_id)
{
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
// 主进程记录子进程pid
self::$_worker_pidMap[$worker_id] = $pid;
}
// For child processes.
elseif (0 === $pid) {
/**
* worker子进程运行
*/
srand();
mt_srand();
if (static::$_status === self::STATUS_STARTING) {
static::resetStd();
}
// worker进程
$this->time_start = microtime(true);
$this->id = $worker_id;
$this->pid = posix_getpid();
$this->setProcessTitle($this->title." worker process id:{$worker_id}, pid:{$this->pid}");
$this->setProcessUser($this->user);
// 清空master进程克隆过来的worker进程ID
self::$_worker_pidMap = array();
// 设置worker进程的运行状态为运行中
self::$_status = self::STATUS_RUNNING;
// 注册进程退出回调,用来检查是否有错误(子进程里面注册)
register_shutdown_function(array($this, 'check_errors'));
// 卸载信号处理函数
// $this->uninstall_signal();
// restore_error_handler();
// 如果设置了worker进程启动回调函数
if ($this->onWorkerStart) {
try {
call_user_func($this->onWorkerStart, $this);
} catch (Exception $e) {
static::log($e);
sleep(1);
exit(250);
} catch (Error $e) {
static::log($e);
sleep(1);
exit(250);
}
}
// 停止当前worker实例
$this->stop();
// 这里用0表示正常退出
exit(0);
} else {
throw new Exception("forkOneWorker fail");
}
}
/**
* 设置当前进程的名称,在ps aux命令中有用
* 注意 需要php>=5.5或者安装了protitle扩展
* @param string $title
*/
protected function setProcessTitle(string $title = '')
{
if (!empty($title)) {
\set_error_handler(function(){});
// >=php 5.5
if (\function_exists('cli_set_process_title')) {
\cli_set_process_title($title);
} // Need proctitle when php<=5.5 .
elseif (\extension_loaded('proctitle') && \function_exists('setproctitle')) {
\setproctitle($title);
}
\restore_error_handler();
}
}
/**
* 尝试设置运行当前进程的用户
* @param $user_name
*/
protected function setProcessUser($user_name)
{
// 用户名为空 或者 当前用户不是root用户
if (empty($user_name) || posix_getuid() !== 0) {
return;
}
$user_info = posix_getpwnam($user_name);
if (!$user_info) {
static::log("Warning: User {$this->user} not exsits");
return;
}
$uid = $user_info['uid'];
$gid = $user_info['gid'];
// 设置uid、gid
if ($uid != posix_getuid() || $gid != posix_getgid()) {
if (!\posix_setgid($gid) || !\posix_initgroups($user_info['name'], $gid) || !\posix_setuid($uid)) {
static::log("Warning: change gid or uid fail.");
}
}
}
/**
* 重定向标准输入和输出
* Redirect standard input and output.
*
* @throws Exception
*/
protected static function resetStd()
{
if (!static::$daemonize) {
return;
}
global $STDOUT, $STDERR;
$handle = \fopen(static::$stdoutFile, "a");
if ($handle) {
unset($handle);
\set_error_handler(function(){});
if ($STDOUT) {
\fclose($STDOUT);
}
if ($STDERR) {
\fclose($STDERR);
}
\fclose(\STDOUT);
\fclose(\STDERR);
$STDOUT = \fopen(static::$stdoutFile, "a");
$STDERR = \fopen(static::$stdoutFile, "a");
// change output stream
static::$_outputStream = null;
static::outputStream($STDOUT);
\restore_error_handler();
return;
}
throw new Exception('Can not open stdoutFile ' . static::$stdoutFile);
}
/**
* @param null $stream
* @return bool|resource
*/
private static function outputStream($stream = null)
{
if (!$stream) {
$stream = static::$_outputStream ? static::$_outputStream : \STDOUT;
}
/**
* is_resource() https://www.php.net/manual/zh/function.is-resource.php
* get_resource_type() https://www.php.net/manual/zh/function.get-resource-type.php
*/
if (!$stream || !\is_resource($stream) || 'stream' !== \get_resource_type($stream)) {
return false;
}
/**
* fstat() https://www.php.net/manual/zh/function.fstat.php
* 通过已打开的文件指针取得文件信息
*/
$stat = \fstat($stream);
if (!$stat) {
return false;
}
if (($stat['mode'] & 0170000) === 0100000) {
// file
static::$_outputDecorated = false;
} else {
static::$_outputDecorated =
static::$_OS === self::OS_TYPE_LINUX &&
\function_exists('posix_isatty') &&
\posix_isatty($stream);
}
return static::$_outputStream = $stream;
}
/**
* 停止当前worker实例
* 正常运行结束和接受信号退出,都会调用这个方法
*/
public function stop()
{
// worker进程结束回调函数
if ($this->onWorkerStop) {
try {
call_user_func($this->onWorkerStop, $this);
} catch (Exception $e) {
static::log($e);
exit(250);
} catch (Error $e) {
static::log($e);
exit(250);
}
}
// 设置worker进程的运行状态为关闭
self::$_status = self::STATUS_SHUTDOWN;
}
/**
* 执行关闭流程(所有进程)
* 事件触发,非正常程序执行完毕
* @return void
*/
public function stop_all()
{
// master进程
if (self::$_master_pid === posix_getpid()) {
// 设置master进程的运行状态为关闭
self::$_status = self::STATUS_SHUTDOWN;
// 循环给worker进程发送关闭信号
foreach (self::$_worker_pidMap as $worker_id => $pid) {
if ($worker_id && ($pid !== 0)) {
// SIGINT | SIGKILL
posix_kill($pid, SIGINT);
}
}
}
// worker进程
else {
// 接收到master进程发送的关闭信号之后退出,这里应该考虑业务的完整性,不能强行exit
$this->stop();
exit(0);
}
}
/**
* 监控所有子进程的退出事件及退出码
* @access public
* @return void
* @throws Exception
*/
public function monitorWorkers()
{
// 设置master进程的运行状态为运行中
self::$_status = self::STATUS_RUNNING;
while (1) {
// 调用等待信号的处理器
pcntl_signal_dispatch();
/**
* https://www.php.net/manual/zh/function.pcntl-wait.php
* pcntl_wait()返回退出的子进程进程号,发生错误时返回 -1,如果提供了 WNOHANG 作为 option(wait3可用的系统)并且没有可用子进程时返回 0。
* pcntl_wait()将会存储状态信息到 status 参数上,这个通过 status 参数返回的状态信息可以用以下函数 pcntl_wifexited(), pcntl_wifstopped(),
* pcntl_wifsignaled(), pcntl_wexitstatus(), pcntl_wtermsig() 以及 pcntl_wstopsig() 获取其具体的值。
*/
// 挂起进程,直到有子进程退出或者被信号打断
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
// 再次调用等待信号的处理器
pcntl_signal_dispatch();
// 子进程退出信号
if ($pid > 0) {
// 如果不是正常退出,是被kill等杀掉的
if ($status !== 0) {
static::log("worker process [pid:{$pid}] exit with status $status");
}
// key 和 value 互换
$worker_pids = array_flip(self::$_worker_pidMap);
// 通过 pid 得到 worker_id
$worker_id = $worker_pids[$pid];
// 这里不unset掉,是为了进程重启
self::$_worker_pidMap[$worker_id] = 0;
// 再生成一个worker
if ((self::$_status !== self::STATUS_SHUTDOWN) && !$this->run_once) {
$this->forkOneWorkerForLinux($worker_id);
}
}
// 如果master进程状态为关闭 且 所有子进程都退出了,触发主进程退出函数
if ((self::$_status === self::STATUS_SHUTDOWN) && !static::getAllWorkerPids()) {
$this->exitAndClearAll();
}
// 如果进程为单次运行 且 所有子进程都推出了,触发主进程退出函数
if ($this->run_once && !static::getAllWorkerPids()) {
$this->exitAndClearAll();
}
}
}
/**
* 获取所有worker进程的pid.
* - 只要有一个worker进程还存在进程pid,就不算全部退出
* @return array
*/
protected static function getAllWorkerPids(): array
{
return array_filter(self::$_worker_pidMap, function ($pid, $worker_id){
// 选出worker进程的pid(排除进程编号为0的master进程)
return $worker_id && ($pid !== 0);
}, ARRAY_FILTER_USE_BOTH);
}
/**
* 退出当前进程.
*
* @return void
*/
protected function exitAndClearAll()
{
@\unlink(static::$pidFile);
static::log("Worker[" . \basename(static::$_startFile) . "] has been stopped");
// master进程退出回调函数
if ($this->onMasterStop) {
try {
call_user_func($this->onMasterStop, $this);
} catch (Exception $e) {
static::log($e);
exit(250);
} catch (Error $e) {
static::log($e);
exit(250);
}
}
exit(0);
}
/**
* 检查错误,PHP exit之前会执行
*/
public function check_errors()
{
// 如果当前worker进程不是正常退出
if (self::$_status != self::STATUS_SHUTDOWN) {
$error_msg = static::$_OS === self::OS_TYPE_LINUX ? 'Worker['. \posix_getpid() .'] process terminated' : 'Worker process terminated';
$errors = error_get_last();
if ($errors && ($errors['type'] === E_ERROR ||
$errors['type'] === E_PARSE ||
$errors['type'] === E_CORE_ERROR ||
$errors['type'] === E_COMPILE_ERROR ||
$errors['type'] === E_RECOVERABLE_ERROR)) {
$error_msg .= static::getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
}
// 记录错误日志
static::log($error_msg);
}
}
/**
* 获取错误类型对应的意义
* @param int $type
* @return string
*/
protected static function getErrorType(int $type): string
{
if(isset(self::$_errorType[$type])) {
return self::$_errorType[$type];
}
return '';
}
/**
* Safe Echo.
* @param string $msg
* @param bool $decorated
* @return bool
*/
public static function safeEcho($msg, bool $decorated = false): bool
{
$stream = static::outputStream();
if (!$stream) {
return false;
}
if (!$decorated) {
$line = $white = $green = $end = '';
if (static::$_outputDecorated) {
$line = "\033[1A\n\033[K";
$white = "\033[47;30m";
$green = "\033[32;40m";
$end = "\033[0m";
}
$msg = \str_replace(array('<n>', '<w>', '<g>'), array($line, $white, $green), $msg);
$msg = \str_replace(array('</n>', '</w>', '</g>'), $end, $msg);
} elseif (!static::$_outputDecorated) {
return false;
}
\fwrite($stream, $msg);
\fflush($stream);
return true;
}
/**
* Log.
*
* @param mixed $msg
* @return void
*/
public static function log($msg)
{
$msg = $msg . "\n";
if (!static::$daemonize) {
static::safeEcho($msg);
}
\file_put_contents((string)static::$logFile, \date('Y-m-d H:i:s') . ' ' . 'pid:'
. (static::$_OS === self::OS_TYPE_LINUX ? \posix_getpid() : 1) . ' ' . $msg, \FILE_APPEND | \LOCK_EX);
}
}
测试代码
<?php
require_once __DIR__ . '/worker.php';
use IYUU\worker;
worker::$stdoutFile = __DIR__ . '/stdoutFile.log';
worker::$logFile = __DIR__ . '/logFile.log';
worker::$pidFile = __DIR__ . '/pidFile.pid';
worker::$daemonize = false;
$worker = new worker();
$worker->count = 2;
$worker->title = 'IYUUPTT';
$worker->run_once = false; // 每个进程是否只运行一次
$worker->onMasterStart = function ($worker) {
echo 'onMasterStart回调函数,master进程pid: ' . $worker->pid . PHP_EOL;
};
$worker->onMasterStop = function ($worker) {
sleep(2);
echo 'onMasterStop回调函数,master进程pid: ' . $worker->pid . PHP_EOL;
};
$worker->onWorkerStart = function ($worker) {
echo 'Master process id: '.$worker::getMasterPid();
echo sprintf(' Worker id: %d, pid: %d', $worker->id, $worker->pid).PHP_EOL.PHP_EOL;
sleep(10);
if ($worker->id === 1) {
sleep(30);
posix_kill($worker::getMasterPid(), SIGTERM);
}
};
$worker->onWorkerStop = function ($worker) {
sleep(2);
echo 'onWorkerStop回调函数,worker进程pid: ' . $worker->pid . PHP_EOL;
};
try {
$worker->run();
} catch (\Exception $e) {
echo $e->getMessage();
}
版权属于:大卫科技Blog
本文链接:https://www.iyuu.cn/archives/437/
转载时须注明出处