I just took a look at the PR - it is, indeed, huge. But it is probably not
too hard to review as it is mostly fresh code. It is true that there hasn't
been a ton of work on RabbitMQ so maybe the reviewer isn't obvious. There's
3 committers on this thread who seem to have the expertise and interest...


On Mon, Jan 6, 2020 at 2:25 PM Daniel Robert <daniel.rob...@acm.org> wrote:

> Alright, a bit late but this took me a while.
> Thanks for all the input so far. I have rewritten much of the RabbitMq IO
> connector and have it ready to go in a draft pr:
> https://github.com/apache/beam/pull/10509
> This should incorporate a lot of what's been discussed here, in terms of
> watermarking, serialization, error handling, etc. It also clarifies/cleans
> up a lot of very confusing documentation/api settings pertaining to using
> 'queues vs exchanges' and adds clarifying documentation on various valid
> AMQP paradigms.
> Watermarking/timestamp management is mostly stolen from KafkaIO and
> modified as appropriate.
> This also does a lot to improve resource management in terms of Connection
> and Channel usage, largely modeled after JdbcIO's ConnectionHandlerProvider
> concept.
> I'm not entirely sure how best to proceed from here, hence the email. It's
> a huge PR, but it has no specific backing ticket (it should), and
> historically there haven't been many eyes on RabbitMq PRs.
> Thanks,
> -Danny
> On 11/14/19 4:13 PM, Jan Lukavský wrote:
> On 11/14/19 9:50 PM, Daniel Robert wrote:
> Alright, thanks everybody. I'm really appreciative of the conversation
> here. I think I see where my disconnect is and how this might all work
> together for me. There are some bugs in the current rabbit implementation
> that I think have confused my understanding of the intended semantics. I'm
> coming around to seeing how such a system with rabbit's restrictions can
> work properly in Beam (I'd totally forgotten about 'dedupe' support in
> Beam) but I want to clarify some implementation questions after pulling
> everyone's notes together.
> RabbitMQ reader should not bother accepting an existing CheckpointMark in
> its constructor (in 'ack-based' systems this is unnecessary per Eugene's
> original reply). It should construct its own CheckpointMark at construction
> time and use it throughout its lifecycle.
> At some point later, the CheckpointMark will be 'finalized'. If this
> CheckpointMark has been Serialized (via Coder or otherwise) or its
> underlying connection has been severed, this step will fail. This would
> mean at some point the messages are redelivered to Beam on some other
> Reader, so no data loss. If it has not been serialized, the acks will take
> place just fine, even if much later.
> If the system is using processing-time as event-time, however, the
> redelivery of these messages would effectively change the ordering and
> potentially the window they arrived in. I *believe* that Beam deduping
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and
> fails) would these messages appear in a new window?
> This is very much likely to happen with any source, if it would assign
> something like *now* to event time. That is ill defined and if the source
> cannot provide some retry-persistent estimate of real event-time, than I'd
> suggest to force user to specify an UDF to extract event time from the
> payload. Everything else would probably break (at least if any
> timestamp-related windowing would be used in the pipeline).
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
> An exception thrown should fail the checkpoint and therefore retry
> everything from the last checkpoint.
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the
> API dictates such a thing?
> See above.
> - is there a provision that would need to be made for processing-time
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm
> nervous redelivered messages would appear in another window)
> You are nervous for a reason. :) I strongly believe processing time source
> should be considered anti-pattern, at least in situations where there is
> any time manipulation downstream (time-windows, stateful processing, ...).
> - What is the relationship lifecycle-wise between a CheckpointMark and a
> Reader? My understanding is a CheckpointMark may outlive a Reader, is that
> correct?
> Definitely. But the same instance bound to the lifecycle of the reader
> would be used to finalizeCheckpoint (if that ever happens).
> Thanks for bearing with me everyone. It feels a bit unfortunate my first
> foray into beam is reliant on this rabbit connector but I'm learning a lot
> and I'm very grateful for the help. PRs pending once I get this all
> straightened out in my head.
> -Danny
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
> Hi Daniel,
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.rob...@acm.org>
> wrote:
>> I believe I've nailed down a situation that happens in practice that
>> causes Beam and Rabbit to be incompatible. It seems that runners can and do
>> make assumptions about the serializability (via Coder) of a CheckpointMark.
>> To start, these are the semantics of RabbitMQ:
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server
>> along this channel
>> - when messages are done processing, they are acknowledged *client-side*
>> and must be acknowledged on the *same channel* that originally received the
>> message.
>> Since a channel (or any open connection) is non-serializable, it means
>> that a CheckpointMark that has been serialized cannot ever be used to
>> acknowledge these messages and correctly 'finalize' the checkpoint. It
>> also, as previously discussed in this thread, implies a rabbit Reader
>> cannot accept an existing CheckpointMark at all; the Reader and the
>> CheckpointMark must share the same connection to the rabbit server
>> ("channel").
> This is correct.
>> Next, I've found how DirectRunner (and presumably others) can attempt to
>> serialize a CheckpointMark that has not been finalized. In
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>> the DirectRunner applies a probability and if it hits, it sets the current
>> reader to 'null' but retains the existing CheckpointMark, which it then
>> attempts to pass to a new reader via a Coder.
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of previous
> CheckpointMarks, obtained a new one and serialized it so things can be
> restored in case of failure
> - A failure happened before the current CheckpointMark could be finalized,
> which means Beam was not able to guarantee that elements after the
> last-finalized mark have been durably processed, so we may need to re-read
> them, so runner recreates a reader from the current mark.
>> This puts the shard, the runner, and the reader with differing views of
>> the world. In UnboundedReadEvaluatorFactory's processElement function, a
>> call to getReader(shard) (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>> ) clones the shard's checkpoint mark and passes that to the new reader. The
>> reader ignores it, creating its own, but even if it accepted it, it would
>> be accepting a serialized CheckpointMark, which wouldn't work.
> Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
> affect what the reader will read: it depends only on the broker's internal
> state (which in turn depends on which messages have been acked by previous
> finalized CheckpointMark's).
>> Later, the runner calls finishRead (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>> ). The shard's CheckpointMark (unserialized; which should still be valid)
>> is finalized. The reader's CheckpointMark (which may be a different
>> instance) becomes the return value, which is referred to as
>> "finishedCheckpoint" in the calling code, which is misleading at best and
>> problematic at worst as *this* checkpoint has not been finalized.
> I'm not following what is the problem here. In that code, "oldMark" is the
> last checkpoint mark to be finalized - calling finalizeCheckpoint on it
> signals that Beam has durably processed all the messages read from the
> reader until that mark. "mark" (the new one) represents the state of the
> reader after the last finalized mark, so it should not be finalized.
> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
> things go like this:
> Create a reader
> Let mark M1 = reader.getCheckpointMark()
> Durably persist M1 as the "restore point" of this reader
> ...read messages A B C from reader and durably process them...
> Finalize M1 (acks A B C)
> Let mark M2 = reader.getCheckpointMark()
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E F and durably process them...
> Finalize M2 (acks D E F)
> Now let's imagine a failure.
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E, and then a failure happens
> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
> Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
> will redeliver them to this reader. D E will be processed twice, but only
> the results of this new processing will be durable.
> Finalize M2 (acks D E F)
> Etc.
> Basically you can think of this as a series of micro-bundles, where
> micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
> runner-side transaction which either commits or discards the results of
> processing all messages in this micro-bundle. After a micro-bundle [M1, M2)
> commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as
> the new restore point in case of failure.
>> So, tl;dr: I cannot find any means of maintaining a persistent connection
>> to the server for finalizing checkpoints that is safe across runners. If
>> there's a guarantee all of the shards are on the same JVM instance, I could
>> rely on global, static collections/instances as a workaround, but if other
>> runners might serialize this across the wire, I'm stumped. The only
>> workable situation I can think of right now is to proactively acknowledge
>> messages as they are received and effectively no-op in finalizeCheckpoint.
>> This is very different, semantically, and can lead to dropped messages if a
>> pipeline doesn't finish processing the given message.
>> Any help would be much appreciated.
> If I'm misunderstanding something above, could you describe in detail a
> scenario that leads to message loss, or (less severe) to more-than-once
> durable processing of the same message?
>> Thanks,
>> -Danny
>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>> Hi Daniel,
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org>
>> wrote:
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>> Thanks,
>>> -Daniel Robert

