On 11/14/19 2:32 AM, Jan Lukavský wrote:
Hi Danny,
as Eugene pointed out, there are essentially two "modes of operation"
of CheckpointMark. It can:
a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)
b) confirm processed elements in CheckpointMark#finalizeCheckpoint
If your source doesn't provide a persistent position in data stream
that can be referred to (and serialized - example of this would be
kafka offsets), then what you actually need to serialize is not the
channel, but a way how to restore it - e.g. by opening a new channel
with a given 'consumer group name'. Then you just use this checkpoint
to commit your processed data in finalizeCheckpoint.
Note that the finalizeCheckpoint is not guaranteed to be called - that
can happen in cases when an error occurs and the source has to be
rewind back - that is what direct runner emulates with the probability
of 'readerReuseChance'.
I'm reading the documentation of RabbitMQ very quickly, but if I
understand it correctly, then you have to create a subscription to the
broker, serialize identifier of the subscription into the
checkpointmark and then just recover the subscription in call to
UnboundedSource#createReader. That should do the trick.
I have not seen any such documentation in rabbit. My understanding is it
has to be the same, physical connection and channel. Can you cite the
source you were looking at?
-Danny
Hope this helps, sorry if I'm not using 100% correct RabbitMQ
terminology as I said, I'm not quite familiar with it.
Best,
Jan
On 11/14/19 5:26 AM, Daniel Robert wrote:
I believe I've nailed down a situation that happens in practice that
causes Beam and Rabbit to be incompatible. It seems that runners can
and do make assumptions about the serializability (via Coder) of a
CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server
along this channel
- when messages are done processing, they are acknowledged
*client-side* and must be acknowledged on the *same channel* that
originally received the message.
Since a channel (or any open connection) is non-serializable, it
means that a CheckpointMark that has been serialized cannot ever be
used to acknowledge these messages and correctly 'finalize' the
checkpoint. It also, as previously discussed in this thread, implies
a rabbit Reader cannot accept an existing CheckpointMark at all; the
Reader and the CheckpointMark must share the same connection to the
rabbit server ("channel").
Next, I've found how DirectRunner (and presumably others) can attempt
to serialize a CheckpointMark that has not been finalized. In
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets the
current reader to 'null' but retains the existing CheckpointMark,
which it then attempts to pass to a new reader via a Coder.
This puts the shard, the runner, and the reader with differing views
of the world. In UnboundedReadEvaluatorFactory's processElement
function, a call to getReader(shard) (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
) clones the shard's checkpoint mark and passes that to the new
reader. The reader ignores it, creating its own, but even if it
accepted it, it would be accepting a serialized CheckpointMark, which
wouldn't work. Later, the runner calls finishRead (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
). The shard's CheckpointMark (unserialized; which should still be
valid) is finalized. The reader's CheckpointMark (which may be a
different instance) becomes the return value, which is referred to as
"finishedCheckpoint" in the calling code, which is misleading at best
and problematic at worst as *this* checkpoint has not been finalized.
So, tl;dr: I cannot find any means of maintaining a persistent
connection to the server for finalizing checkpoints that is safe
across runners. If there's a guarantee all of the shards are on the
same JVM instance, I could rely on global, static
collections/instances as a workaround, but if other runners might
serialize this across the wire, I'm stumped. The only workable
situation I can think of right now is to proactively acknowledge
messages as they are received and effectively no-op in
finalizeCheckpoint. This is very different, semantically, and can
lead to dropped messages if a pipeline doesn't finish processing the
given message.
Any help would be much appreciated.
Thanks,
-Danny
On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
Hi Daniel,
This is probably insufficiently well documented. The CheckpointMark
is used for two purposes:
1) To persistently store some notion of how much of the stream has
been consumed, so that if something fails we can tell the underlying
streaming system where to start reading when we re-create the
reader. This is why CheckpointMark is Serializable. E.g. this makes
sense for Kafka.
2) To do acks - to let the underlying streaming system know that the
Beam pipeline will never need data up to this CheckpointMark. Acking
does not require serializability - runners call ack() on the same
in-memory instance of CheckpointMark that was produced by the
reader. E.g. this makes sense for RabbitMq or Pubsub.
In practice, these two capabilities tend to be mutually exclusive:
some streaming systems can provide a serializable CheckpointMark,
some can do acks, some can do neither - but very few (or none) can
do both, and it's debatable whether it even makes sense for a system
to provide both capabilities: usually acking is an implicit form of
streaming-system-side checkpointing, i.e. when you re-create the
reader you don't actually need to carry over any information from an
old CheckpointMark - the necessary state (which records should be
delivered) is maintained on the streaming system side.
These two are lumped together into one API simply because that was
the best design option we came up with (not for lack of trying, but
suggestions very much welcome - AFAIK nobody is happy with it).
RabbitMQ is under #2 - it can't do serializable checkpoint marks,
but it can do acks. So you can simply ignore the non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org
<mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous
problems so
put up a correction PR that I think needs some eyes fairly
quickly as
I'd consider master to be broken for rabbitmq right now. The PR
keeps
the upgrade but reverts to the same push-based implementation as
in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some
input here.
CheckointMark itself must be Serializable, presumably this means
it gets
shuffled around between nodes. However 'Channel', the tunnel
through
which it communicates with Rabbit to ack messages and finalize the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new
CheckpointMark is
instantiated, it's given a Channel. If an existing one is
supplied to
the Reader's constructor (part of the 'startReader()'
interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing messages on a
channel other
than the one that consumed them in the first place. Attempting
to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'.
(See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam with this
limitation of Rabbit.
Thanks,
-Daniel Robert