On Fri, Nov 8, 2019 at 5:57 AM Daniel Robert <daniel.rob...@acm.org> wrote:

> Thanks Euguene and Reuven.
>
> In response to Eugene, I'd like to confirm I have this correct: In the
> rabbit-style use case of "stream-system-side checkpointing", it is safe
> (and arguably the correct behavior) to ignore the supplied CheckpointMark
> argument in `createReader(options, checkpointmark)` and in the constructor
> for the and instead always instantiate a new CheckpointMark during
> construction. Is that correct?
>
Yes, this is correct.


> In response to Reuven: noted, however I was mostly using serialization in
> the general sense. That is, there does not seem to be any means of
> deserializing a RabbitMqCheckpointMark such that it can continue to provide
> value to a runner. Whether it's java serialization, avro, or any other
> Coder, the 'channel' itself cannot "come along for the ride", which leaves
> the rest of the internal state mostly unusable except for perhaps some
> historical, immutable use case.
>
> -Danny
> On 11/8/19 2:01 AM, Reuven Lax wrote:
>
> Just to clarify one thing: CheckpointMark does not need to be Java
> Seralizable. All that's needed is do return a Coder for the CheckpointMark
> in getCheckpointMarkCoder.
>
> On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <j...@google.com> wrote:
>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org>
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert
>>>
>>>

Reply via email to