hyperf3.1如何定义amqp的优先队列

PHP  

投递者Test2Producer.php

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Queue\Producer;
  4. use Hyperf\Amqp\Annotation\Producer;
  5. use Hyperf\Amqp\Message\ProducerMessage;
  6. #[Producer(exchange: "test2", routingKey: "test2")]
  7. class Test2Producer extends ProducerMessage
  8. {
  9. public function __construct($id, $priority=0)
  10. {
  11. $this->payload = [
  12. 'id' => $id
  13. ];
  14. $this->properties = [
  15. 'priority' => $priority //数值越大越优先,数值不能超过x-max-priority的值
  16. ];
  17. }
  18. }

消费者Test2Consumer.php

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Queue\Consumer;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. use PhpAmqpLib\Message\AMQPMessage;
  8. use Hyperf\Amqp\Builder\QueueBuilder;
  9. use PhpAmqpLib\Wire\AMQPTable;
  10. #[Consumer(exchange: "test2", routingKey: "test2", queue: "test2", nums: 1)]
  11. class Test2Consumer extends ConsumerMessage
  12. {
  13. // 设置队列的Arguments
  14. public function getQueueBuilder(): QueueBuilder
  15. {
  16. $builder = new QueueBuilder();
  17. $builder->setQueue($this->getQueue());
  18. $builder->setArguments(new AMQPTable([
  19. 'x-max-priority' => 5, //不要设置过大,有性能问题
  20. ]));
  21. return $builder;
  22. }
  23. // 设置basic.qos
  24. public function getQos(): array
  25. {
  26. return [
  27. 'prefetch_size' => null,
  28. 'prefetch_count' => null,
  29. 'global' => null
  30. ];
  31. }
  32. public function consumeMessage($data, AMQPMessage $message): Result
  33. {
  34. var_dump($data);
  35. return Result::ACK;
  36. }
  37. public function isEnable(): bool
  38. {
  39. return true;
  40. }
  41. }

测试投递

  1. for ($i = 0; $i < 100; $i++) {
  2. co(function () use ($i) {
  3. $priority = $i % 2 == 0 ? 1 : 2;
  4. $message = new Test2Producer($i, $priority);
  5. $producer = ApplicationContext::getContainer()->get(Producer::class);
  6. $producer->produce($message);
  7. });
  8. }


评论 0

发表评论

Top