在 ThinkPHP5.0 中使用 Kafka
豆包:thinkphp5.0 kafka扩展
在 ThinkPHP5.0 中使用 Kafka,通常需要先安装 PHP 的 Kafka 扩展,再在 ThinkPHP5.0 项目中进行相应的配置和使用,以下是具体步骤:
安装 PHP 的 Kafka 扩展
在 ThinkPHP5.0 中配置和使用 Kafka
php
Copy
<?phpreturn [ // 其他配置项... 'kafka' => [ 'bootstrap_servers' => 'your_kafka_server:9092', 'topic' => 'your_topic_name', // 其他Kafka相关配置参数 ],];
php
Copy
<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Producer;class KafkaProducer extends Controller{
public function produceMessage()
{
$conf = new \RdKafka\Conf();
$conf->set('bootstrap.servers', config('kafka.bootstrap_servers'));
$producer = new Producer($conf);
$topic = $producer->newTopic(config('kafka.topic'));
$message = '这是一条测试消息';
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
$producer->flush(10000);
return '消息发送成功';
}}php
Copy
<?phpnamespace app\index\controller;use think\Controller;use RdKafka\Consumer;use RdKafka\Conf;class KafkaConsumer extends Controller{
public function consumeMessage()
{
$conf = new Conf();
$conf->set('bootstrap.servers', config('kafka.bootstrap_servers'));
$conf->set('group.id', 'test_group');
$consumer = new Consumer($conf);
$consumer->subscribe([config('kafka.topic')]);
while (true) {
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo '收到消息: '. $message->payload. "";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "到达分区末尾";
break;
default:
echo '消费消息出错: '. $message->errstr(). "";
break;
}
}
}}除了php-rdkafka扩展,还可以使用kafka-php扩展,它提供了更高级的功能和更便捷的 API,可以通过 Composer 安装。