On Fri, Nov 8, 2019 at 5:57 AM Daniel Robert <daniel.rob...@acm.org> wrote:
> Thanks Euguene and Reuven. > > In response to Eugene, I'd like to confirm I have this correct: In the > rabbit-style use case of "stream-system-side checkpointing", it is safe > (and arguably the correct behavior) to ignore the supplied CheckpointMark > argument in `createReader(options, checkpointmark)` and in the constructor > for the and instead always instantiate a new CheckpointMark during > construction. Is that correct? > Yes, this is correct. > In response to Reuven: noted, however I was mostly using serialization in > the general sense. That is, there does not seem to be any means of > deserializing a RabbitMqCheckpointMark such that it can continue to provide > value to a runner. Whether it's java serialization, avro, or any other > Coder, the 'channel' itself cannot "come along for the ride", which leaves > the rest of the internal state mostly unusable except for perhaps some > historical, immutable use case. > > -Danny > On 11/8/19 2:01 AM, Reuven Lax wrote: > > Just to clarify one thing: CheckpointMark does not need to be Java > Seralizable. All that's needed is do return a Coder for the CheckpointMark > in getCheckpointMarkCoder. > > On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com> wrote: > >> Hi Daniel, >> >> This is probably insufficiently well documented. The CheckpointMark is >> used for two purposes: >> 1) To persistently store some notion of how much of the stream has been >> consumed, so that if something fails we can tell the underlying streaming >> system where to start reading when we re-create the reader. This is why >> CheckpointMark is Serializable. E.g. this makes sense for Kafka. >> 2) To do acks - to let the underlying streaming system know that the Beam >> pipeline will never need data up to this CheckpointMark. Acking does not >> require serializability - runners call ack() on the same in-memory instance >> of CheckpointMark that was produced by the reader. E.g. this makes sense >> for RabbitMq or Pubsub. >> >> In practice, these two capabilities tend to be mutually exclusive: some >> streaming systems can provide a serializable CheckpointMark, some can do >> acks, some can do neither - but very few (or none) can do both, and it's >> debatable whether it even makes sense for a system to provide both >> capabilities: usually acking is an implicit form of streaming-system-side >> checkpointing, i.e. when you re-create the reader you don't actually need >> to carry over any information from an old CheckpointMark - the necessary >> state (which records should be delivered) is maintained on the streaming >> system side. >> >> These two are lumped together into one API simply because that was the >> best design option we came up with (not for lack of trying, but suggestions >> very much welcome - AFAIK nobody is happy with it). >> >> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it >> can do acks. So you can simply ignore the non-serializability. >> >> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org> >> wrote: >> >>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. >>> As part of this I switched to a pull-based API rather than the >>> previously-used push-based. This has caused some nebulous problems so >>> put up a correction PR that I think needs some eyes fairly quickly as >>> I'd consider master to be broken for rabbitmq right now. The PR keeps >>> the upgrade but reverts to the same push-based implementation as in 4.x: >>> https://github.com/apache/beam/pull/9977 ) >>> >>> Regardless, in trying to get the pull-based API to work, I'm finding the >>> interactions between rabbitmq and beam with CheckpointMark to be >>> fundamentally impossible to implement so I'm hoping for some input here. >>> >>> CheckointMark itself must be Serializable, presumably this means it gets >>> shuffled around between nodes. However 'Channel', the tunnel through >>> which it communicates with Rabbit to ack messages and finalize the >>> checkpoint, is non-Serializable. Like most other CheckpointMark >>> implementations, Channel is 'transient'. When a new CheckpointMark is >>> instantiated, it's given a Channel. If an existing one is supplied to >>> the Reader's constructor (part of the 'startReader()' interface), the >>> channel is overwritten. >>> >>> *However*, Rabbit does not support 'ack'ing messages on a channel other >>> than the one that consumed them in the first place. Attempting to do so >>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See >>> >>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed >>> ). >>> >>> Truthfully, I don't really understand how the current implementation is >>> working; it seems like a happy accident. But I'm curious if someone >>> could help me debug and implement how to bridge the >>> re-usable/serializable CheckpointMark requirement in Beam with this >>> limitation of Rabbit. >>> >>> Thanks, >>> -Daniel Robert >>> >>>