I just took a look at the PR - it is, indeed, huge. But it is probably not too hard to review as it is mostly fresh code. It is true that there hasn't been a ton of work on RabbitMQ so maybe the reviewer isn't obvious. There's 3 committers on this thread who seem to have the expertise and interest...
Kenn On Mon, Jan 6, 2020 at 2:25 PM Daniel Robert <daniel.rob...@acm.org> wrote: > Alright, a bit late but this took me a while. > > Thanks for all the input so far. I have rewritten much of the RabbitMq IO > connector and have it ready to go in a draft pr: > https://github.com/apache/beam/pull/10509 > > This should incorporate a lot of what's been discussed here, in terms of > watermarking, serialization, error handling, etc. It also clarifies/cleans > up a lot of very confusing documentation/api settings pertaining to using > 'queues vs exchanges' and adds clarifying documentation on various valid > AMQP paradigms. > > Watermarking/timestamp management is mostly stolen from KafkaIO and > modified as appropriate. > > This also does a lot to improve resource management in terms of Connection > and Channel usage, largely modeled after JdbcIO's ConnectionHandlerProvider > concept. > > I'm not entirely sure how best to proceed from here, hence the email. It's > a huge PR, but it has no specific backing ticket (it should), and > historically there haven't been many eyes on RabbitMq PRs. > > Thanks, > -Danny > On 11/14/19 4:13 PM, Jan Lukavský wrote: > > On 11/14/19 9:50 PM, Daniel Robert wrote: > > Alright, thanks everybody. I'm really appreciative of the conversation > here. I think I see where my disconnect is and how this might all work > together for me. There are some bugs in the current rabbit implementation > that I think have confused my understanding of the intended semantics. I'm > coming around to seeing how such a system with rabbit's restrictions can > work properly in Beam (I'd totally forgotten about 'dedupe' support in > Beam) but I want to clarify some implementation questions after pulling > everyone's notes together. > > RabbitMQ reader should not bother accepting an existing CheckpointMark in > its constructor (in 'ack-based' systems this is unnecessary per Eugene's > original reply). It should construct its own CheckpointMark at construction > time and use it throughout its lifecycle. > > At some point later, the CheckpointMark will be 'finalized'. If this > CheckpointMark has been Serialized (via Coder or otherwise) or its > underlying connection has been severed, this step will fail. This would > mean at some point the messages are redelivered to Beam on some other > Reader, so no data loss. If it has not been serialized, the acks will take > place just fine, even if much later. > > If the system is using processing-time as event-time, however, the > redelivery of these messages would effectively change the ordering and > potentially the window they arrived in. I *believe* that Beam deduping > seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and > fails) would these messages appear in a new window? > > This is very much likely to happen with any source, if it would assign > something like *now* to event time. That is ill defined and if the source > cannot provide some retry-persistent estimate of real event-time, than I'd > suggest to force user to specify an UDF to extract event time from the > payload. Everything else would probably break (at least if any > timestamp-related windowing would be used in the pipeline). > > Perhaps my question are now: > - how should a CheckpointMark should communicate failure to the Beam > > An exception thrown should fail the checkpoint and therefore retry > everything from the last checkpoint. > > - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the > API dictates such a thing? > > See above. > > - is there a provision that would need to be made for processing-time > sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm > nervous redelivered messages would appear in another window) > > You are nervous for a reason. :) I strongly believe processing time source > should be considered anti-pattern, at least in situations where there is > any time manipulation downstream (time-windows, stateful processing, ...). > > - What is the relationship lifecycle-wise between a CheckpointMark and a > Reader? My understanding is a CheckpointMark may outlive a Reader, is that > correct? > > Definitely. But the same instance bound to the lifecycle of the reader > would be used to finalizeCheckpoint (if that ever happens). > > Thanks for bearing with me everyone. It feels a bit unfortunate my first > foray into beam is reliant on this rabbit connector but I'm learning a lot > and I'm very grateful for the help. PRs pending once I get this all > straightened out in my head. > > -Danny > On 11/14/19 2:35 PM, Eugene Kirpichov wrote: > > Hi Daniel, > > > On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.rob...@acm.org> > wrote: > >> I believe I've nailed down a situation that happens in practice that >> causes Beam and Rabbit to be incompatible. It seems that runners can and do >> make assumptions about the serializability (via Coder) of a CheckpointMark. >> >> To start, these are the semantics of RabbitMQ: >> >> - the client establishes a connection to the server >> - client opens a channel on the connection >> - messages are either pulled or pushed to the client from the server >> along this channel >> - when messages are done processing, they are acknowledged *client-side* >> and must be acknowledged on the *same channel* that originally received the >> message. >> >> Since a channel (or any open connection) is non-serializable, it means >> that a CheckpointMark that has been serialized cannot ever be used to >> acknowledge these messages and correctly 'finalize' the checkpoint. It >> also, as previously discussed in this thread, implies a rabbit Reader >> cannot accept an existing CheckpointMark at all; the Reader and the >> CheckpointMark must share the same connection to the rabbit server >> ("channel"). >> > This is correct. > > >> Next, I've found how DirectRunner (and presumably others) can attempt to >> serialize a CheckpointMark that has not been finalized. In >> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, >> the DirectRunner applies a probability and if it hits, it sets the current >> reader to 'null' but retains the existing CheckpointMark, which it then >> attempts to pass to a new reader via a Coder. >> > Correct, this simulates a failure scenario: > - Runner was reading the source and, after finalizing a bunch of previous > CheckpointMarks, obtained a new one and serialized it so things can be > restored in case of failure > - A failure happened before the current CheckpointMark could be finalized, > which means Beam was not able to guarantee that elements after the > last-finalized mark have been durably processed, so we may need to re-read > them, so runner recreates a reader from the current mark. > > >> This puts the shard, the runner, and the reader with differing views of >> the world. In UnboundedReadEvaluatorFactory's processElement function, a >> call to getReader(shard) ( >> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 >> ) clones the shard's checkpoint mark and passes that to the new reader. The >> reader ignores it, creating its own, but even if it accepted it, it would >> be accepting a serialized CheckpointMark, which wouldn't work. >> > Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't > affect what the reader will read: it depends only on the broker's internal > state (which in turn depends on which messages have been acked by previous > finalized CheckpointMark's). > >> Later, the runner calls finishRead ( >> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 >> ). The shard's CheckpointMark (unserialized; which should still be valid) >> is finalized. The reader's CheckpointMark (which may be a different >> instance) becomes the return value, which is referred to as >> "finishedCheckpoint" in the calling code, which is misleading at best and >> problematic at worst as *this* checkpoint has not been finalized. >> > I'm not following what is the problem here. In that code, "oldMark" is the > last checkpoint mark to be finalized - calling finalizeCheckpoint on it > signals that Beam has durably processed all the messages read from the > reader until that mark. "mark" (the new one) represents the state of the > reader after the last finalized mark, so it should not be finalized. > > I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate) > things go like this: > > Create a reader > Let mark M1 = reader.getCheckpointMark() > Durably persist M1 as the "restore point" of this reader > ...read messages A B C from reader and durably process them... > Finalize M1 (acks A B C) > Let mark M2 = reader.getCheckpointMark() > Durably persist M2 as the "restore point" of this reader > ...read messages D E F and durably process them... > Finalize M2 (acks D E F) > > Now let's imagine a failure. > Durably persist M2 as the "restore point" of this reader > ...read messages D E, and then a failure happens > Recreate reader from M2 (reader ignores M2 but it doesn't matter) > Since M2 was not finalized, messages D E F were not acked, and RabbitMQ > will redeliver them to this reader. D E will be processed twice, but only > the results of this new processing will be durable. > Finalize M2 (acks D E F) > Etc. > > Basically you can think of this as a series of micro-bundles, where > micro-bundles are delimited by checkpoint marks, and each micro-bundle is a > runner-side transaction which either commits or discards the results of > processing all messages in this micro-bundle. After a micro-bundle [M1, M2) > commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as > the new restore point in case of failure. > >> So, tl;dr: I cannot find any means of maintaining a persistent connection >> to the server for finalizing checkpoints that is safe across runners. If >> there's a guarantee all of the shards are on the same JVM instance, I could >> rely on global, static collections/instances as a workaround, but if other >> runners might serialize this across the wire, I'm stumped. The only >> workable situation I can think of right now is to proactively acknowledge >> messages as they are received and effectively no-op in finalizeCheckpoint. >> This is very different, semantically, and can lead to dropped messages if a >> pipeline doesn't finish processing the given message. >> >> Any help would be much appreciated. >> > If I'm misunderstanding something above, could you describe in detail a > scenario that leads to message loss, or (less severe) to more-than-once > durable processing of the same message? > > >> Thanks, >> -Danny >> On 11/7/19 10:27 PM, Eugene Kirpichov wrote: >> >> Hi Daniel, >> >> This is probably insufficiently well documented. The CheckpointMark is >> used for two purposes: >> 1) To persistently store some notion of how much of the stream has been >> consumed, so that if something fails we can tell the underlying streaming >> system where to start reading when we re-create the reader. This is why >> CheckpointMark is Serializable. E.g. this makes sense for Kafka. >> 2) To do acks - to let the underlying streaming system know that the Beam >> pipeline will never need data up to this CheckpointMark. Acking does not >> require serializability - runners call ack() on the same in-memory instance >> of CheckpointMark that was produced by the reader. E.g. this makes sense >> for RabbitMq or Pubsub. >> >> In practice, these two capabilities tend to be mutually exclusive: some >> streaming systems can provide a serializable CheckpointMark, some can do >> acks, some can do neither - but very few (or none) can do both, and it's >> debatable whether it even makes sense for a system to provide both >> capabilities: usually acking is an implicit form of streaming-system-side >> checkpointing, i.e. when you re-create the reader you don't actually need >> to carry over any information from an old CheckpointMark - the necessary >> state (which records should be delivered) is maintained on the streaming >> system side. >> >> These two are lumped together into one API simply because that was the >> best design option we came up with (not for lack of trying, but suggestions >> very much welcome - AFAIK nobody is happy with it). >> >> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it >> can do acks. So you can simply ignore the non-serializability. >> >> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org> >> wrote: >> >>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. >>> As part of this I switched to a pull-based API rather than the >>> previously-used push-based. This has caused some nebulous problems so >>> put up a correction PR that I think needs some eyes fairly quickly as >>> I'd consider master to be broken for rabbitmq right now. The PR keeps >>> the upgrade but reverts to the same push-based implementation as in 4.x: >>> https://github.com/apache/beam/pull/9977 ) >>> >>> Regardless, in trying to get the pull-based API to work, I'm finding the >>> interactions between rabbitmq and beam with CheckpointMark to be >>> fundamentally impossible to implement so I'm hoping for some input here. >>> >>> CheckointMark itself must be Serializable, presumably this means it gets >>> shuffled around between nodes. However 'Channel', the tunnel through >>> which it communicates with Rabbit to ack messages and finalize the >>> checkpoint, is non-Serializable. Like most other CheckpointMark >>> implementations, Channel is 'transient'. When a new CheckpointMark is >>> instantiated, it's given a Channel. If an existing one is supplied to >>> the Reader's constructor (part of the 'startReader()' interface), the >>> channel is overwritten. >>> >>> *However*, Rabbit does not support 'ack'ing messages on a channel other >>> than the one that consumed them in the first place. Attempting to do so >>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See >>> >>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed >>> ). >>> >>> Truthfully, I don't really understand how the current implementation is >>> working; it seems like a happy accident. But I'm curious if someone >>> could help me debug and implement how to bridge the >>> re-usable/serializable CheckpointMark requirement in Beam with this >>> limitation of Rabbit. >>> >>> Thanks, >>> -Daniel Robert >>> >>>