在 ThinkPHP5.0 中使用 Kafka

豆包:thinkphp5.0 kafka扩展


在 ThinkPHP5.0 中使用 Kafka,通常需要先安装 PHP 的 Kafka 扩展,再在 ThinkPHP5.0 项目中进行相应的配置和使用,以下是具体步骤:

安装 PHP 的 Kafka 扩展


  • 安装 librdkafka:librdkafka 是一个高性能的 C 库,是许多 Kafka 客户端的基础。在 Linux 系统上,可以通过包管理器安装,如在 Ubuntu 系统中,使用命令sudo apt-get install librdkafka-dev进行安装;在 macOS 上,可以使用 Homebrew 安装,如brew install librdkafka

  • 安装 php-rdkafka 扩展:在安装好 librdkafka 库后,可以通过 PECL 安装 php-rdkafka 扩展,运行命令pecl install rdkafka。安装完成后,需要在 php.ini 文件中添加extension=rdkafka.so来启用该扩展CSDN博客

在 ThinkPHP5.0 中配置和使用 Kafka


  • 引入扩展类:在 ThinkPHP5.0 项目中,在需要使用 Kafka 的地方引入扩展类。假设你已经安装并启用了 php-rdkafka 扩展,你可以在控制器或其他类中使用use RdKafka\Producer;use RdKafka\Consumer;等语句引入生产者和消费者类CSDN博客

  • 配置 Kafka 连接参数:在项目的配置文件中,你可以设置 Kafka 的连接参数,如bootstrap.servers等。例如,在config.php文件中添加以下配置:


php

Copy

<?phpreturn [
    // 其他配置项...
    'kafka' => [
        'bootstrap_servers' => 'your_kafka_server:9092',
        'topic' => 'your_topic_name',
        // 其他Kafka相关配置参数
    ],];


  • 编写生产者代码


php

Copy

<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Producer;class KafkaProducer extends Controller{
    public function produceMessage()
    {
        $conf = new \RdKafka\Conf();
        $conf->set('bootstrap.servers', config('kafka.bootstrap_servers'));

        $producer = new Producer($conf);

        $topic = $producer->newTopic(config('kafka.topic'));

        $message = '这是一条测试消息';
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);

        $producer->flush(10000);

        return '消息发送成功';
    }}


  • 编写消费者代码


php

Copy

<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Consumer;use RdKafka\Conf;class KafkaConsumer extends Controller{
    public function consumeMessage()
    {
        $conf = new Conf();
        $conf->set('bootstrap.servers', config('kafka.bootstrap_servers'));
        $conf->set('group.id', 'test_group');

        $consumer = new Consumer($conf);
        $consumer->subscribe([config('kafka.topic')]);

        while (true) {
            $message = $consumer->consume(120 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    echo '收到消息: '. $message->payload. "";
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "到达分区末尾";
                    break;
                default:
                    echo '消费消息出错: '. $message->errstr(). "";
                    break;
            }
        }
    }}


除了php-rdkafka扩展,还可以使用kafka-php扩展,它提供了更高级的功能和更便捷的 API,可以通过 Composer 安装。