投递者Test2Producer.php
<?phpdeclare(strict_types=1);namespace App\Queue\Producer;use Hyperf\Amqp\Annotation\Producer;use Hyperf\Amqp\Message\ProducerMessage;#[Producer(exchange: "test2", routingKey: "test2")]class Test2Producer extends ProducerMessage{ public function __construct($id, $priority=0) { $this->payload = [ 'id' => $id ]; $this->properties = [ 'priority' => $priority //数值越大越优先,数值不能超过x-max-priority的值 ]; }}
消费者Test2Consumer.php
<?phpdeclare(strict_types=1);namespace App\Queue\Consumer;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use Hyperf\Amqp\Result;use PhpAmqpLib\Message\AMQPMessage;use Hyperf\Amqp\Builder\QueueBuilder;use PhpAmqpLib\Wire\AMQPTable;#[Consumer(exchange: "test2", routingKey: "test2", queue: "test2", nums: 1)]class Test2Consumer extends ConsumerMessage{ // 设置队列的Arguments public function getQueueBuilder(): QueueBuilder { $builder = new QueueBuilder(); $builder->setQueue($this->getQueue()); $builder->setArguments(new AMQPTable([ 'x-max-priority' => 5, //不要设置过大,有性能问题 ])); return $builder; } // 设置basic.qos public function getQos(): array { return [ 'prefetch_size' => null, 'prefetch_count' => null, 'global' => null ]; } public function consumeMessage($data, AMQPMessage $message): Result { var_dump($data); return Result::ACK; } public function isEnable(): bool { return true; }}
测试投递
for ($i = 0; $i < 100; $i++) { co(function () use ($i) { $priority = $i % 2 == 0 ? 1 : 2; $message = new Test2Producer($i, $priority); $producer = ApplicationContext::getContainer()->get(Producer::class); $producer->produce($message); });}