新的 RabbitMQ Pika 驱动程序实现

https://blueprints.launchpad.net/oslo.messaging/+spec/rabbit-pika

本规范提出一个新的 RabbitMQ Pika 驱动程序实现。

问题描述

当前的 RabbitMQ oslo.messaging 驱动程序使用 Kombu 客户端库。但新的特性支持和错误修复在 Kombu 中出现的速度比 Pika 慢。现在我们在 HA 模式下使用 RabbitMQ 稳定工作时遇到了问题。当我们向 RabbitQM 开发人员寻求帮助修复错误时,他们说这是可能的,但首先,请将您的环境更新到推荐的库堆栈。这是开发此驱动程序的主要原因。

此外,Pika 支持现代 RabbitMQ 特性,例如直接回复和心跳。我认为使用此功能的首选方式是代替我们自己开发它。

提议的变更

在本规范中,我建议创建一个全新的驱动程序,该驱动程序

  1. 应该以最佳方式开发,考虑到所有 Pika 客户端库的新特性和最佳实践;

  2. 应该与当前驱动程序接口完全兼容;

  3. 可以更改内部结构,并且不保证与 Kombu 驱动程序兼容(这意味着您不能对某些服务使用旧的 Kombu 驱动程序,而对其他服务使用新的 Pika 驱动程序)

  4. 仅支持当前实际的功能,不包含任何已弃用的功能。

特性及其设计

在调查 oslo.messaging 驱动程序期间,我分离出几个主要支持的特性

  1. RPC - 将消息以不可靠的方式快速发送到单个远程服务器,该服务器使用目标定义,并获取回复。它具有很小的超时时间(几秒钟),因此服务器应该在超时时间内接收并处理此消息(由超时时间定义),否则将被跳过。

  2. CAST - 将消息以不可靠的方式发送到由目标定义的远程服务器集合。服务器应该在超时时间内接收此消息(由超时时间定义),否则将被跳过。如果服务以某种方式未侦听主题,或者发生了一些连接问题,并且我们无法快速恢复,则该服务器将永远无法接收到该消息。

  3. NOTIFY - CAST 的可靠版本 - 我们不能丢失消息。如果您发送通知并且 send_notification 方法没有返回任何错误,则消息应该存储并等待远程服务器启动并获得与我们的 RabbitMQ 代理的连接。

Eventlet 兼容性

Pika 有一些连接适配器,用于与不同的框架一起工作。它没有专门为 eventlet 设计的适配器。但是可以使用“BlockingConnection”适配器和 eventlet monkey patching。它工作得很好。我发现的唯一问题是,它试图使用“select.epull”API,而当前的 eventlet 实现并未对其进行修补。因此,我添加了代码,如果 eventlet 已修补,则从“select”模块中删除“pull”和“epull”属性。在这种情况下,Pika 使用标准的 select api,该 api 已由 eventlet 正确修补。

心跳

Pika 有自己的心跳机制。“BlockingConnection”适配器具有“process_data_events”方法,该方法循环侦听来自 RabbitMQ 的响应。应该在发送请求或注册消费者后执行此方法。此方法运行循环

(代码片段来自 https://github.com/pika/pika/blob/master/pika/adapters/blocking_connection.py#L410)

while not is_done():
    self._impl.ioloop.poll()
    self._impl.ioloop.process_timeouts()

根据配置的心跳超时时间发送心跳(在 process_timeout_method 内部)。因此,对于具有定义间隔的所有侦听器,心跳都在工作。对于用于消息发布连接,只有在连接处于活动状态时(即,您正在执行 publish 方法时)才会发送心跳。

备选方案

  1. 使用旧驱动程序;

  2. 不要开发全新的驱动程序,尝试仅替换客户端库并保留当前驱动程序的逻辑

Impact on Existing APIs

无。

安全影响

性能影响

由于更优化的实现,性能应该会更好。

Configuration Impact

应该添加配置选项来设置 Pika 客户端库。

建议以下配置可能性

连接选项(代表 Pika 功能):

  • “channel_max”(默认 - 0)- 允许的最大通道数,

  • “frame_max”(默认 - 131072)- AMQP 帧的最大字节大小,

  • “heartbeat_interval”(默认 - 0)- 发送心跳的频率,

  • “ssl”(默认 - False)- 是否启用 SSL,

  • “ssl_options”(默认 - None)- 如果启用 ssl 时的 SSL 选项,

  • “socket_timeout”(默认 - 0.25)- Pika 连接的套接字超时时间,

连接池选项:

  • “pool_max_size”(默认 - 10)- 要保留的连接的最大数量,

  • “pool_max_overflow”(默认 - 10)- 在之上创建的连接的最大数量

  • “pool_timeout”(默认 - 30)- 等待可用连接的秒数,

  • “pool_recycle”(默认 - None)- 连接的生命周期(自创建以来)以秒为单位,或者没有回收则为 None。过期的连接在获取时关闭,

  • “pool_stale”(默认 - None)- 自释放以来被认为过期的连接的秒数,或者没有陈旧性则为 None。过期的连接在获取时关闭

重新连接策略选项:

  • “connection_retry_attempts”(默认 - 3)- 连接问题时重新连接的重试次数,

  • “connection_retry_delay”(默认 - 0.1)- 连接问题时重新连接的重试延迟,

  • “rejected_message_retry_attempts”(默认 - 3)- 重新发送被拒绝的消息的重试次数,

  • “rejected_message_retry_delay”(默认 - 0.1)- 重新发送被拒绝的消息的重试延迟

RPC 选项:

  • “rpc_queue_expiration”(默认 - 60)- 没有消费者的 rpc 队列的生存时间(以秒为单位),

  • “default_rpc_exchange”(默认 - “openstack_rpc”)- 发送 RPC 消息的交换机名称,

  • “rpc_reply_exchange”(默认 - “openstack_rpc_reply”)- 接收 RPC 回复的交换机名称。

通知选项:

  • “notification_persistence”(默认 - False)- 持久化通知消息,

  • “default_notification_exchange”(默认 - “openstack_notification”)- 发送通知的交换机名称。

开发人员影响

Devstack 应该进行调整,以便能够设置使用新驱动程序的 gate 测试环境。

Testing Impact

应该调整功能测试(无需更改测试逻辑)。

实现

负责人

dukhlov, yosh-m

主要负责人

dukhlov

里程碑

完成目标里程碑:mitaka

工作项

  • 设计和实现基于 Pika 库功能的 rpc 功能(发送和侦听驱动程序方法)

  • 设计和实现基于 Pika 库功能的 notify 功能(send_notification 和 listen_notifications 驱动程序方法)

  • 调整功能测试

  • 调整 devstack 以能够设置具有新的 Pika 驱动程序的环境

孵化

无。

采用

部署指南可能会略有不同,因为添加了一些新的配置选项(例如,为每个管道分配额外的端口)。

值得注意的是,在稳定期,旧驱动程序和新驱动程序都将保留在仓库中。稳定后,旧驱动程序将通过标准的弃用路径被弃用。

oslo.messaging

预计 API 稳定

新的驱动程序应该能够使用调整后的 devstack 成功运行。调整后的 oslo.messaging 功能测试应该能够在 devstack-gate 中成功通过。

文档影响

应该编写详细的文档字符串

依赖项

pika 库

参考资料

  1. https://github.com/dukhlov/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_pika.py

  2. https://review.openstack.org/#/c/226348/

注意

本作品采用知识共享署名 3.0 非移植许可协议授权。 http://creativecommons.org/licenses/by/3.0/legalcode