在 ThinkPHP5.0 中使用 Kafka
豆包:thinkphp5.0 kafka扩展
在 ThinkPHP5.0 中使用 Kafka,通常需要先安装 PHP 的 Kafka 扩展,再在 ThinkPHP5.0 项目中进行相应的配置和使用,以下是具体步骤:
安装 PHP 的 Kafka 扩展
在 ThinkPHP5.0 中配置和使用 Kafka
php
Copy
<?phpreturn [ // 其他配置项... 'kafka' => [ 'bootstrap_servers' => 'your_kafka_server:9092', 'topic' => 'your_topic_name', // 其他Kafka相关配置参数 ],];
php
Copy
<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Producer;class KafkaProducer extends Controller{ public function produceMessage() { $conf = new \RdKafka\Conf(); $conf->set('bootstrap.servers', config('kafka.bootstrap_servers')); $producer = new Producer($conf); $topic = $producer->newTopic(config('kafka.topic')); $message = '这是一条测试消息'; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $producer->flush(10000); return '消息发送成功'; }}
php
Copy
<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Consumer;use RdKafka\Conf;class KafkaConsumer extends Controller{ public function consumeMessage() { $conf = new Conf(); $conf->set('bootstrap.servers', config('kafka.bootstrap_servers')); $conf->set('group.id', 'test_group'); $consumer = new Consumer($conf); $consumer->subscribe([config('kafka.topic')]); while (true) { $message = $consumer->consume(120 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo '收到消息: '. $message->payload. ""; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "到达分区末尾"; break; default: echo '消费消息出错: '. $message->errstr(). ""; break; } } }}
除了php-rdkafka
扩展,还可以使用kafka-php
扩展,它提供了更高级的功能和更便捷的 API,可以通过 Composer 安装。