I'm pretty new to the Beam ecosystem, so apologies if this is not the right forum for this.

My team has been learning and starting to use Beam for the past few months and have run into myriad problems with the RabbitIO connector for java, aspects of which seem perhaps fundamentally broken or incorrect in the released implementation. I've tracked our significant issues down and opened tickets and PRs for them. I'm not certain what the typical response time is, but given the severity of the issues (as I perceive them), I'd like to escalate some of these PRs and try to get some fixes into the next Beam release.

I originally opened BEAM-8390 (https://github.com/apache/beam/pull/9782) as it was impossible to set the 'useCorrelationId' property (implying this functionality was also untested). Since then, I've found and PR'd the following, which are awaiting feedback/approval:

1. Watermarks not advancing

Ticket/PR: BEAM-8347 - https://github.com/apache/beam/pull/9820

Impact: under low message volumes, the watermark never advances and windows can never 'on time' fire.

Notes: The RabbitMq UnboundedSource uses 'oldest known time' as a watermark when other similar sources (and documentation on watermarking) state for monotonically increasing timestamps (the case with a queue) it should be the most recent time. I have a few open questions about testing and implementation details in the PR but it should work as-is.

2. Exchanges are always declared, which fail if a pre-existing exchange differs

Ticket/PR: BEAM-8513 - https://github.com/apache/beam/pull/9937

Impact: It is impossible to utilize an existing, durable exchange.

Notes: I'm hooking Beam up to an existing topic exchange (an 'event bus') that is 'durable'. RabbitMqIO current implementation will always attempt to declare the exchange, and does so as non-durable, which causes rabbit to fail the declaration. (Interestingly qpid does not fail in this scenario.) The PR allows the caller to disable declaring the exchange, similar to `withQueueDeclare` for declaring a queue.

This PR also calls out a lot of the documentation that seems misleading; implying that one either interacts with queues *or* exchanges when that is not how AMQP fundamentally operates. The implementation of the RabbitMqIO connector before this PR seems like it probably works with the default exchange and maybe a fanout exchange, but not a topic exchange.

3. Library versions

Tickets/PR: BEAM-7434, BEAM-5895, and BEAM-5894 - https://github.com/apache/beam/pull/9900

Impact: The rabbitmq amqp client for java released the 5.x line in September of 2017. Some automated tickets were open to upgrade, plus a manual ticket to drop the use of the deprecated QueueConsumer API.

Notes: The upgrade was relatively simple, but I implemented it using a pull-based API rather than push-based (Consumer) which may warrant some discussion. I'm used to discussing this type of thing over PRs but I'm happy to do whatever the community prefers.

---

Numbers 1 and 2 above are 'dealbreaker' issues for my team. They effectively make rabbitmq unusable as an unbounded source, forcing developers to fork and modify the code. Number 3 is much less significant and I've put it here more for 'good, clean living' than an urgent issue.

Aside from the open issues, given the low response rate so far, I'd be more than happy to take on a more active role in maintaining/reviewing the rabbitmq io for java. For now, however, is this list the best way to 'bump' these open issues and move forward? Further, is the general approach before opening a PR to ask some preliminary questions in this email list?

Thank you,
-Daniel Robert

Reply via email to