hyperf3.1如何定义amqp的优先队列
- 开发技术
- 2025-03-06
- 103
- 0
投递者Test2Producer.php
<?php
declare(strict_types=1);
namespace App\Queue\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
#[Producer(exchange: "test2", routingKey: "test2")]
class Test2Producer extends ProducerMessage
{
public function __construct($id, $priority=0)
{
$this->payload = [
'id' => $id
];
$this->properties = [
'priority' => $priority //数值越大越优先,数值不能超过x-max-priority的值
];
}
}
消费者Test2Consumer.php
<?php
declare(strict_types=1);
namespace App\Queue\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Wire\AMQPTable;
#[Consumer(exchange: "test2", routingKey: "test2", queue: "test2", nums: 1)]
class Test2Consumer extends ConsumerMessage
{
// 设置队列的Arguments
public function getQueueBuilder(): QueueBuilder
{
$builder = new QueueBuilder();
$builder->setQueue($this->getQueue());
$builder->setArguments(new AMQPTable([
'x-max-priority' => 5, //不要设置过大,有性能问题
]));
return $builder;
}
// 设置basic.qos
public function getQos(): array
{
return [
'prefetch_size' => null,
'prefetch_count' => null,
'global' => null
];
}
public function consumeMessage($data, AMQPMessage $message): Result
{
var_dump($data);
return Result::ACK;
}
public function isEnable(): bool
{
return true;
}
}
测试投递
for ($i = 0; $i < 100; $i++) {
co(function () use ($i) {
$priority = $i % 2 == 0 ? 1 : 2;
$message = new Test2Producer($i, $priority);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce($message);
});
}