(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. As part of this I switched to a pull-based API rather than the previously-used push-based. This has caused some nebulous problems so put up a correction PR that I think needs some eyes fairly quickly as I'd consider master to be broken for rabbitmq right now. The PR keeps the upgrade but reverts to the same push-based implementation as in 4.x: https://github.com/apache/beam/pull/9977 )

Regardless, in trying to get the pull-based API to work, I'm finding the interactions between rabbitmq and beam with CheckpointMark to be fundamentally impossible to implement so I'm hoping for some input here.

CheckointMark itself must be Serializable, presumably this means it gets shuffled around between nodes. However 'Channel', the tunnel through which it communicates with Rabbit to ack messages and finalize the checkpoint, is non-Serializable. Like most other CheckpointMark implementations, Channel is 'transient'. When a new CheckpointMark is instantiated, it's given a Channel. If an existing one is supplied to the Reader's constructor (part of the 'startReader()' interface), the channel is overwritten.

*However*, Rabbit does not support 'ack'ing messages on a channel other than the one that consumed them in the first place. Attempting to do so results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed ).

Truthfully, I don't really understand how the current implementation is working; it seems like a happy accident. But I'm curious if someone could help me debug and implement how to bridge the re-usable/serializable CheckpointMark requirement in Beam with this limitation of Rabbit.

Thanks,
-Daniel Robert

Reply via email to