php常驻内存、master/worker进程模型【笔记】

感谢 https://github.com/walkor/Workerman

实现功能

  1. 守护进程
  2. master/worker进程模型
  3. 自定义进程用户
  4. 自定义进程名称
  5. 自定义worker进程数量
  6. 日志记录
  7. 标准输入输出重定向
  8. master进程回调:启动回调、停止回调
  9. worker进程回调:启动回调、停止回调
  10. 异常处理,错误处理

脚本不支持在windows下运行。

多进程管理类

<?php
namespace IYUU;

use Error;
use Exception;
/**
 * PHP多进程管理类
 * Class worker
 */
class worker
{
    /**
     * worker进程数
     *
     * @access public
     * @var int
     */
    public $count = 1;

    /**
     * 进程编号
     * - worker进程从1开始,0被master进程所使用
     * @var int
     */
    public $id = 0;

    /**
     * 进程PID
     * @var int
     */
    public $pid = 0;

    /**
     * master,worker进程启动时间戳
     * @var int
     */
    public $time_start = 0;

    /**
     * 进程用户
     *
     * @access public
     * @var string
     */
    public $user = '';

    /**
     * 进程名
     *
     * @access public
     * @var string
     */
    public $title = 'Worker';

    /**
     * 每个进程是否只运行一次
     *
     * @access public
     * @var bool
     */
    public $run_once = true;

    /**
     * master进程启动回调
     *
     * @access public
     * @var null
     */
    public $onMasterStart = null;

    /**
     * master进程停止回调
     *
     * @access public
     * @var null
     */
    public $onMasterStop  = null;

    /**
     * worker进程启动回调
     *
     * @access public
     * @var null
     */
    public $onWorkerStart = null;

    /**
     * worker进程停止回调
     *
     * @access public
     * @var null
     */
    public $onWorkerStop  = null;

    /**
     * master进程PID
     * @var int
     */
    protected static $_master_pid = 0;

    /**
     * 所有worker进程映射表
     * - 进程id编号与进程pid的映射表,格式为:[[worker_id=>pid], ..]
     * @var array
     */
    protected static $_worker_pidMap = [];

    /**
     * Status starting.
     * @var string
     */
    const STATUS_STARTING = 'starting';

    /**
     * Status running.
     * @var string
     */
    const STATUS_RUNNING = 'running';

    /**
     * Status shutdown.
     * @var string
     */
    const STATUS_SHUTDOWN = 'shutdown';

    /**
     * Status reloading.
     * @var string
     */
    const STATUS_RELOADING = 'reload';
    /**
     * master,worker进程运行状态 [starting|running|shutdown|reload]
     * @var string
     */
    protected static $_status = self::STATUS_STARTING;

    // Define OS Type
    const OS_TYPE_LINUX   = 'linux';
    const OS_TYPE_WINDOWS = 'windows';
    /**
     * OS.
     * @var string
     */
    protected static $_OS = self::OS_TYPE_LINUX;

    /**
     * Start file.
     * @var string
     */
    protected static $_startFile = '';

    /**
     * Standard output stream
     * @var resource
     */
    protected static $_outputStream = null;

    /**
     * If $outputStream support decorated
     * @var bool
     */
    protected static $_outputDecorated = null;
    /**
     * Daemonize.
     *
     * @access public
     * @var bool
     */
    public static $daemonize = false;

    /**
     * Stdout file.
     *
     * @access public
     * @var string
     */
    public static $stdoutFile = '/dev/null';

    /**
     * 保存master进程PID的文件.
     *
     * @access public
     * @var string
     */
    public static $pidFile = '';

    /**
     * Log file.
     *
     * @access public
     * @var mixed
     */
    public static $logFile = '';

    /**
     * PHP built-in error types.
     * @var array
     */
    protected static $_errorType = array(
        \E_ERROR             => 'E_ERROR',             // 1
        \E_WARNING           => 'E_WARNING',           // 2
        \E_PARSE             => 'E_PARSE',             // 4
        \E_NOTICE            => 'E_NOTICE',            // 8
        \E_CORE_ERROR        => 'E_CORE_ERROR',        // 16
        \E_CORE_WARNING      => 'E_CORE_WARNING',      // 32
        \E_COMPILE_ERROR     => 'E_COMPILE_ERROR',     // 64
        \E_COMPILE_WARNING   => 'E_COMPILE_WARNING',   // 128
        \E_USER_ERROR        => 'E_USER_ERROR',        // 256
        \E_USER_WARNING      => 'E_USER_WARNING',      // 512
        \E_USER_NOTICE       => 'E_USER_NOTICE',       // 1024
        \E_STRICT            => 'E_STRICT',            // 2048
        \E_RECOVERABLE_ERROR => 'E_RECOVERABLE_ERROR', // 4096
        \E_DEPRECATED        => 'E_DEPRECATED',        // 8192
        \E_USER_DEPRECATED   => 'E_USER_DEPRECATED'   // 16384
    );

    /**
     * 构造函数 worker constructor
     * @access public
     */
    public function __construct()
    {
        static::checkSapiEnv();     // 环境检查
    }

    /**
     * 检查运行环境.
     */
    protected static function checkSapiEnv()
    {
        // Only for cli.
        if (\PHP_SAPI !== 'cli') {
            exit("Only run in command line mode \n");
        }
        if (\DIRECTORY_SEPARATOR === '\\') {
            self::$_OS = self::OS_TYPE_WINDOWS;
        }
        if (static::$_OS !== self::OS_TYPE_LINUX) {
            exit('OS:'.self::$_OS);
        }
        if(version_compare(PHP_VERSION, "7.1.0", "<"))
        {
            exit('php version < 7.1.0');
        }
    }

    /**
     * 运行worker实例
     * @access public
     * @throws Exception
     */
    public function run()
    {
        $this->init();          // 初始化
        $this->parseCommand();  // 解析命令行参数
        $this->daemonize();     // 变为守护进程
        $this->installSignal(); // 安装信号
        $this->saveMasterPid(); // 保存master进程pid
        $this->forkWorkersForLinux();   // fork若干个worker进程
        $this->resetStd();              // 重定向标准输入和输出
        $this->monitorWorkers();    // 监控信号、管理worker进程
    }

    /**
     * 初始化.
     */
    protected function init()
    {
        set_error_handler(function($code, $msg, $file, $line){
            static::safeEcho("$msg in file $file on line $line\n");
        });

        // Start file.
        $backtrace        = \debug_backtrace();
        static::$_startFile = $backtrace[\count($backtrace) - 1]['file'];

        $unique_prefix = \str_replace('/', '_', static::$_startFile);

        // Pid file.
        if (empty(static::$pidFile)) {
            static::$pidFile = __DIR__ . "/$unique_prefix.pid";
        }

        // Log file.
        if (empty(static::$logFile)) {
            static::$logFile = __DIR__ . '/logFile_'.date("Ymd").'.log';
        }
        $log_file = (string)static::$logFile;
        if (!\is_file($log_file)) {
            \touch($log_file);
            \chmod($log_file, 0622);
        }

        // State.
        static::$_status = static::STATUS_STARTING;

        // Count
        $this->count = $this->count < 1 ? 1 : $this->count;

        // Process title.
    }

    /**
     * 解析命令行参数 Parse command.
     *
     * @return void
     */
    protected function parseCommand()
    {
        if (static::$_OS !== self::OS_TYPE_LINUX) {
            return;
        }
        global $argv;
    }

    /**
     * 变为守护进程 Run as deamon mode.
     *
     * @throws Exception
     */
    protected function daemonize()
    {
        if (!static::$daemonize) {
            return;
        }
        \umask(0);
        // 第一次pcntl_fork()
        $pid = \pcntl_fork();
        if (-1 === $pid) {
            // 创建子进程失败
            throw new Exception('Fork fail');
        } elseif ($pid > 0) {
            // 主进程安全退出
            exit(0);
        }
        // 子进程创建成功,设置为会话leader
        if (-1 === \posix_setsid()) {
            throw new Exception("Setsid fail");
        }
        // 第二次pcntl_fork() Fork again avoid SVR4 system regain the control of terminal.
        $pid = \pcntl_fork();
        if (-1 === $pid) {
            // 孙子进程创建失败
            throw new Exception("Fork fail");
        } elseif (0 !== $pid) {
            // 子进程安全退出
            exit(0);
        }
        // 孙子进程创建成功,变为master常驻进程
    }

    /**
     * 安装信号处理函数
     */
    protected function installSignal()
    {
        // stop
        pcntl_signal(SIGINT, array($this, 'signalHandler'), false);
        // stop
        pcntl_signal(SIGTERM, array($this, 'signalHandler'), false);
        // reload
        pcntl_signal(SIGUSR1, array($this, 'signalHandler'), false);
        // status
        pcntl_signal(SIGUSR2, array($this, 'signalHandler'), false);
        // ignore
        pcntl_signal(SIGPIPE, SIG_IGN, false);
    }

    /**
     * 卸载信号处理函数
     */
    protected function uninstallSignal()
    {
        // uninstall stop signal handler
        pcntl_signal(SIGINT, SIG_IGN, false);
        pcntl_signal(SIGTERM, SIG_IGN, false);
        // uninstall reload signal handler
        pcntl_signal(SIGUSR1, SIG_IGN, false);
        // uninstall status signal handler
        pcntl_signal(SIGUSR2, SIG_IGN, false);
    }

    /**
     * 信号处理函数,会被其他类调用(必须为public)
     *
     * @access public
     * @param int $signal
     */
    public function signalHandler(int $signal)
    {
        switch ($signal) {
            // stop 2
            case SIGINT:
            case SIGTERM:
                // master进程和worker进程都会调用
                $this->stop_all();
                break;
            // reload 30
            case SIGUSR1:
                echo "reload\n";
                break;
            // show status 31
            case SIGUSR2:
                echo "status\n";
                break;
        }
    }

    /**
     * 保存master进程pid
     * @throws Exception
     */
    protected function saveMasterPid()
    {
        static::$_master_pid = \posix_getpid();
        if (false === \file_put_contents(static::$pidFile, static::$_master_pid)) {
            throw new Exception('can not save pid to ' . static::$pidFile);
        }
    }

    /**
     * 获取master进程的pid
     *
     * @access public
     * @return int
     */
    public static function getMasterPid(): int
    {
        return static::$_master_pid ? static::$_master_pid : (int)file_get_contents(static::$pidFile);
    }

    /**
     * Fork若干个worker进程.
     *
     * @throws Exception
     */
    protected function forkWorkersForLinux()
    {
        // master进程
        $this->time_start = microtime(true);
        $this->id = 0;   // 0被master进程使用
        $this->pid = posix_getpid();
        $this->setProcessTitle($this->title." master process pid:{$this->pid} start_file=" . static::$_startFile);
        // master进程启动回调
        if ($this->onMasterStart) {
            try {
                call_user_func($this->onMasterStart, $this);
            } catch (Exception $e) {
                static::log($e);
                // 避免快速无限循环退出.
                sleep(2);
                exit(250);
            } catch (Error $e) {
                static::log($e);
                // 避免快速无限循环退出.
                sleep(2);
                exit(250);
            }
        }

        // worker进程从1开始,0被master进程所使用
        for ($i = 1; $i <= $this->count; $i++) {
            $this->forkOneWorkerForLinux($i);
        }
    }

    /**
     * Fork若干个worker进程.
     *
     * @param int $worker_id
     * @throws Exception
     */
    protected function forkOneWorkerForLinux(int $worker_id)
    {
        $pid = pcntl_fork();
        // For master process.
        if ($pid > 0) {
            // 主进程记录子进程pid
            self::$_worker_pidMap[$worker_id] = $pid;
        }
        // For child processes.
        elseif (0 === $pid) {
            /**
             * worker子进程运行
             */
            srand();
            mt_srand();
            if (static::$_status === self::STATUS_STARTING) {
                static::resetStd();
            }
            // worker进程
            $this->time_start = microtime(true);
            $this->id = $worker_id;
            $this->pid = posix_getpid();
            $this->setProcessTitle($this->title." worker process id:{$worker_id}, pid:{$this->pid}");
            $this->setProcessUser($this->user);
            // 清空master进程克隆过来的worker进程ID
            self::$_worker_pidMap = array();
            // 设置worker进程的运行状态为运行中
            self::$_status = self::STATUS_RUNNING;
            // 注册进程退出回调,用来检查是否有错误(子进程里面注册)
            register_shutdown_function(array($this, 'check_errors'));
            // 卸载信号处理函数
            // $this->uninstall_signal();
            // restore_error_handler();
            // 如果设置了worker进程启动回调函数
            if ($this->onWorkerStart) {
                try {
                    call_user_func($this->onWorkerStart, $this);
                } catch (Exception $e) {
                    static::log($e);
                    sleep(1);
                    exit(250);
                } catch (Error $e) {
                    static::log($e);
                    sleep(1);
                    exit(250);
                }
            }

            // 停止当前worker实例
            $this->stop();
            // 这里用0表示正常退出
            exit(0);
        } else {
            throw new Exception("forkOneWorker fail");
        }
    }

    /**
     * 设置当前进程的名称,在ps aux命令中有用
     * 注意 需要php>=5.5或者安装了protitle扩展
     * @param string $title
     */
    protected function setProcessTitle(string $title = '')
    {
        if (!empty($title)) {
            \set_error_handler(function(){});
            // >=php 5.5
            if (\function_exists('cli_set_process_title')) {
                \cli_set_process_title($title);
            } // Need proctitle when php<=5.5 .
            elseif (\extension_loaded('proctitle') && \function_exists('setproctitle')) {
                \setproctitle($title);
            }
            \restore_error_handler();
        }
    }

    /**
     * 尝试设置运行当前进程的用户
     * @param $user_name
     */
    protected function setProcessUser($user_name)
    {
        // 用户名为空 或者 当前用户不是root用户
        if (empty($user_name) || posix_getuid() !== 0) {
            return;
        }
        $user_info = posix_getpwnam($user_name);
        if (!$user_info) {
            static::log("Warning: User {$this->user} not exsits");
            return;
        }
        $uid = $user_info['uid'];
        $gid = $user_info['gid'];
        // 设置uid、gid
        if ($uid != posix_getuid() || $gid != posix_getgid()) {
            if (!\posix_setgid($gid) || !\posix_initgroups($user_info['name'], $gid) || !\posix_setuid($uid)) {
                static::log("Warning: change gid or uid fail.");
            }
        }
    }

    /**
     * 重定向标准输入和输出
     * Redirect standard input and output.
     *
     * @throws Exception
     */
    protected static function resetStd()
    {
        if (!static::$daemonize) {
            return;
        }
        global $STDOUT, $STDERR;
        $handle = \fopen(static::$stdoutFile, "a");
        if ($handle) {
            unset($handle);
            \set_error_handler(function(){});
            if ($STDOUT) {
                \fclose($STDOUT);
            }
            if ($STDERR) {
                \fclose($STDERR);
            }
            \fclose(\STDOUT);
            \fclose(\STDERR);
            $STDOUT = \fopen(static::$stdoutFile, "a");
            $STDERR = \fopen(static::$stdoutFile, "a");
            // change output stream
            static::$_outputStream = null;
            static::outputStream($STDOUT);
            \restore_error_handler();
            return;
        }

        throw new Exception('Can not open stdoutFile ' . static::$stdoutFile);
    }

    /**
     * @param null $stream
     * @return bool|resource
     */
    private static function outputStream($stream = null)
    {
        if (!$stream) {
            $stream = static::$_outputStream ? static::$_outputStream : \STDOUT;
        }
        /**
         * is_resource()        https://www.php.net/manual/zh/function.is-resource.php
         * get_resource_type()  https://www.php.net/manual/zh/function.get-resource-type.php
         */
        if (!$stream || !\is_resource($stream) || 'stream' !== \get_resource_type($stream)) {
            return false;
        }
        /**
         * fstat()              https://www.php.net/manual/zh/function.fstat.php
         * 通过已打开的文件指针取得文件信息
         */
        $stat = \fstat($stream);
        if (!$stat) {
            return false;
        }
        if (($stat['mode'] & 0170000) === 0100000) {
            // file
            static::$_outputDecorated = false;
        } else {
            static::$_outputDecorated =
                static::$_OS === self::OS_TYPE_LINUX &&
                \function_exists('posix_isatty') &&
                \posix_isatty($stream);
        }
        return static::$_outputStream = $stream;
    }

    /**
     * 停止当前worker实例
     * 正常运行结束和接受信号退出,都会调用这个方法
     */
    public function stop()
    {
        // worker进程结束回调函数
        if ($this->onWorkerStop) {
            try {
                call_user_func($this->onWorkerStop, $this);
            } catch (Exception $e) {
                static::log($e);
                exit(250);
            } catch (Error $e) {
                static::log($e);
                exit(250);
            }
        }
        // 设置worker进程的运行状态为关闭
        self::$_status = self::STATUS_SHUTDOWN;
    }

    /**
     * 执行关闭流程(所有进程)
     * 事件触发,非正常程序执行完毕
     * @return void
     */
    public function stop_all()
    {
        // master进程
        if (self::$_master_pid === posix_getpid()) {
            // 设置master进程的运行状态为关闭
            self::$_status = self::STATUS_SHUTDOWN;
            // 循环给worker进程发送关闭信号
            foreach (self::$_worker_pidMap as $worker_id => $pid) {
                if ($worker_id && ($pid !== 0)) {
                    // SIGINT | SIGKILL
                    posix_kill($pid, SIGINT);
                }
            }
        }
        // worker进程
        else {
            // 接收到master进程发送的关闭信号之后退出,这里应该考虑业务的完整性,不能强行exit
            $this->stop();
            exit(0);
        }
    }

    /**
     * 监控所有子进程的退出事件及退出码
     * @access public
     * @return void
     * @throws Exception
     */
    public function monitorWorkers()
    {
        // 设置master进程的运行状态为运行中
        self::$_status = self::STATUS_RUNNING;
        while (1) {
            // 调用等待信号的处理器
            pcntl_signal_dispatch();
            /**
             * https://www.php.net/manual/zh/function.pcntl-wait.php
             * pcntl_wait()返回退出的子进程进程号,发生错误时返回 -1,如果提供了 WNOHANG 作为 option(wait3可用的系统)并且没有可用子进程时返回 0。
             * pcntl_wait()将会存储状态信息到 status 参数上,这个通过 status 参数返回的状态信息可以用以下函数 pcntl_wifexited(), pcntl_wifstopped(),
             * pcntl_wifsignaled(), pcntl_wexitstatus(), pcntl_wtermsig() 以及 pcntl_wstopsig() 获取其具体的值。
             */
            // 挂起进程,直到有子进程退出或者被信号打断
            $status = 0;
            $pid = pcntl_wait($status, WUNTRACED);
            // 再次调用等待信号的处理器
            pcntl_signal_dispatch();
            // 子进程退出信号
            if ($pid > 0) {
                // 如果不是正常退出,是被kill等杀掉的
                if ($status !== 0) {
                    static::log("worker process [pid:{$pid}] exit with status $status");
                }
                // key 和 value 互换
                $worker_pids = array_flip(self::$_worker_pidMap);
                // 通过 pid 得到 worker_id
                $worker_id = $worker_pids[$pid];
                // 这里不unset掉,是为了进程重启
                self::$_worker_pidMap[$worker_id] = 0;
                // 再生成一个worker
                if ((self::$_status !== self::STATUS_SHUTDOWN) && !$this->run_once) {
                    $this->forkOneWorkerForLinux($worker_id);
                }
            }

            // 如果master进程状态为关闭 且 所有子进程都退出了,触发主进程退出函数
            if ((self::$_status === self::STATUS_SHUTDOWN) && !static::getAllWorkerPids()) {
                $this->exitAndClearAll();
            }
            // 如果进程为单次运行 且 所有子进程都推出了,触发主进程退出函数
            if ($this->run_once && !static::getAllWorkerPids()) {
                $this->exitAndClearAll();
            }
        }
    }

    /**
     * 获取所有worker进程的pid.
     * - 只要有一个worker进程还存在进程pid,就不算全部退出
     * @return array
     */
    protected static function getAllWorkerPids(): array
    {
        return array_filter(self::$_worker_pidMap, function ($pid, $worker_id){
            // 选出worker进程的pid(排除进程编号为0的master进程)
            return $worker_id && ($pid !== 0);
        }, ARRAY_FILTER_USE_BOTH);
    }

    /**
     * 退出当前进程.
     *
     * @return void
     */
    protected function exitAndClearAll()
    {
        @\unlink(static::$pidFile);
        static::log("Worker[" . \basename(static::$_startFile) . "] has been stopped");
        // master进程退出回调函数
        if ($this->onMasterStop) {
            try {
                call_user_func($this->onMasterStop, $this);
            } catch (Exception $e) {
                static::log($e);
                exit(250);
            } catch (Error $e) {
                static::log($e);
                exit(250);
            }
        }
        exit(0);
    }

    /**
     * 检查错误,PHP exit之前会执行
     */
    public function check_errors()
    {
        // 如果当前worker进程不是正常退出
        if (self::$_status != self::STATUS_SHUTDOWN) {
            $error_msg = static::$_OS === self::OS_TYPE_LINUX ? 'Worker['. \posix_getpid() .'] process terminated' : 'Worker process terminated';
            $errors = error_get_last();
            if ($errors && ($errors['type'] === E_ERROR ||
                    $errors['type'] === E_PARSE ||
                    $errors['type'] === E_CORE_ERROR ||
                    $errors['type'] === E_COMPILE_ERROR ||
                    $errors['type'] === E_RECOVERABLE_ERROR)) {
                $error_msg .= static::getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
            }
            // 记录错误日志
            static::log($error_msg);
        }
    }

    /**
     * 获取错误类型对应的意义
     * @param int $type
     * @return string
     */
    protected static function getErrorType(int $type): string
    {
        if(isset(self::$_errorType[$type])) {
            return self::$_errorType[$type];
        }

        return '';
    }

    /**
     * Safe Echo.
     * @param string $msg
     * @param bool $decorated
     * @return bool
     */
    public static function safeEcho($msg, bool $decorated = false): bool
    {
        $stream = static::outputStream();
        if (!$stream) {
            return false;
        }
        if (!$decorated) {
            $line = $white = $green = $end = '';
            if (static::$_outputDecorated) {
                $line = "\033[1A\n\033[K";
                $white = "\033[47;30m";
                $green = "\033[32;40m";
                $end = "\033[0m";
            }
            $msg = \str_replace(array('<n>', '<w>', '<g>'), array($line, $white, $green), $msg);
            $msg = \str_replace(array('</n>', '</w>', '</g>'), $end, $msg);
        } elseif (!static::$_outputDecorated) {
            return false;
        }
        \fwrite($stream, $msg);
        \fflush($stream);
        return true;
    }

    /**
     * Log.
     *
     * @param mixed $msg
     * @return void
     */
    public static function log($msg)
    {
        $msg = $msg . "\n";
        if (!static::$daemonize) {
            static::safeEcho($msg);
        }
        \file_put_contents((string)static::$logFile, \date('Y-m-d H:i:s') . ' ' . 'pid:'
            . (static::$_OS === self::OS_TYPE_LINUX ? \posix_getpid() : 1) . ' ' . $msg, \FILE_APPEND | \LOCK_EX);
    }
}

测试代码

<?php
require_once __DIR__ . '/worker.php';
use IYUU\worker;
worker::$stdoutFile = __DIR__ . '/stdoutFile.log';
worker::$logFile    = __DIR__ . '/logFile.log';
worker::$pidFile    = __DIR__ . '/pidFile.pid';
worker::$daemonize  = false;

$worker = new worker();
$worker->count = 2;
$worker->title = 'IYUUPTT';
$worker->run_once = false;    // 每个进程是否只运行一次
$worker->onMasterStart = function ($worker) {
    echo 'onMasterStart回调函数,master进程pid: ' . $worker->pid . PHP_EOL;
};
$worker->onMasterStop = function ($worker) {
    sleep(2);
    echo 'onMasterStop回调函数,master进程pid: ' . $worker->pid . PHP_EOL;
};
$worker->onWorkerStart = function ($worker) {
    echo 'Master process id: '.$worker::getMasterPid();
    echo sprintf(' Worker id: %d, pid: %d', $worker->id, $worker->pid).PHP_EOL.PHP_EOL;
    sleep(10);
    if ($worker->id === 1) {
        sleep(30);
        posix_kill($worker::getMasterPid(), SIGTERM);
    }
};
$worker->onWorkerStop = function ($worker) {
    sleep(2);
    echo 'onWorkerStop回调函数,worker进程pid: ' . $worker->pid . PHP_EOL;
};
try {
    $worker->run();
} catch (\Exception $e) {
    echo $e->getMessage();
}
最后修改:2021 年 07 月 09 日 01 : 49 AM
如果觉得我的文章对你有用,请随意赞赏

发表评论