I believe I've nailed down a situation that happens in practice that
causes Beam and Rabbit to be incompatible. It seems that runners can and
do make assumptions about the serializability (via Coder) of a
CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server
along this channel
- when messages are done processing, they are acknowledged *client-side*
and must be acknowledged on the *same channel* that originally received
the message.
Since a channel (or any open connection) is non-serializable, it means
that a CheckpointMark that has been serialized cannot ever be used to
acknowledge these messages and correctly 'finalize' the checkpoint. It
also, as previously discussed in this thread, implies a rabbit Reader
cannot accept an existing CheckpointMark at all; the Reader and the
CheckpointMark must share the same connection to the rabbit server
("channel").
Next, I've found how DirectRunner (and presumably others) can attempt to
serialize a CheckpointMark that has not been finalized. In
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets the
current reader to 'null' but retains the existing CheckpointMark, which
it then attempts to pass to a new reader via a Coder.
This puts the shard, the runner, and the reader with differing views of
the world. In UnboundedReadEvaluatorFactory's processElement function, a
call to getReader(shard) (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
) clones the shard's checkpoint mark and passes that to the new reader.
The reader ignores it, creating its own, but even if it accepted it, it
would be accepting a serialized CheckpointMark, which wouldn't work.
Later, the runner calls finishRead (
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
). The shard's CheckpointMark (unserialized; which should still be
valid) is finalized. The reader's CheckpointMark (which may be a
different instance) becomes the return value, which is referred to as
"finishedCheckpoint" in the calling code, which is misleading at best
and problematic at worst as *this* checkpoint has not been finalized.
So, tl;dr: I cannot find any means of maintaining a persistent
connection to the server for finalizing checkpoints that is safe across
runners. If there's a guarantee all of the shards are on the same JVM
instance, I could rely on global, static collections/instances as a
workaround, but if other runners might serialize this across the wire,
I'm stumped. The only workable situation I can think of right now is to
proactively acknowledge messages as they are received and effectively
no-op in finalizeCheckpoint. This is very different, semantically, and
can lead to dropped messages if a pipeline doesn't finish processing the
given message.
Any help would be much appreciated.
Thanks,
-Danny
On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
Hi Daniel,
This is probably insufficiently well documented. The CheckpointMark is
used for two purposes:
1) To persistently store some notion of how much of the stream has
been consumed, so that if something fails we can tell the underlying
streaming system where to start reading when we re-create the reader.
This is why CheckpointMark is Serializable. E.g. this makes sense for
Kafka.
2) To do acks - to let the underlying streaming system know that the
Beam pipeline will never need data up to this CheckpointMark. Acking
does not require serializability - runners call ack() on the same
in-memory instance of CheckpointMark that was produced by the reader.
E.g. this makes sense for RabbitMq or Pubsub.
In practice, these two capabilities tend to be mutually exclusive:
some streaming systems can provide a serializable CheckpointMark, some
can do acks, some can do neither - but very few (or none) can do both,
and it's debatable whether it even makes sense for a system to provide
both capabilities: usually acking is an implicit form of
streaming-system-side checkpointing, i.e. when you re-create the
reader you don't actually need to carry over any information from an
old CheckpointMark - the necessary state (which records should be
delivered) is maintained on the streaming system side.
These two are lumped together into one API simply because that was the
best design option we came up with (not for lack of trying, but
suggestions very much welcome - AFAIK nobody is happy with it).
RabbitMQ is under #2 - it can't do serializable checkpoint marks, but
it can do acks. So you can simply ignore the non-serializability.
On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.rob...@acm.org
<mailto:daniel.rob...@acm.org>> wrote:
(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous problems so
put up a correction PR that I think needs some eyes fairly quickly as
I'd consider master to be broken for rabbitmq right now. The PR keeps
the upgrade but reverts to the same push-based implementation as
in 4.x:
https://github.com/apache/beam/pull/9977 )
Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some input
here.
CheckointMark itself must be Serializable, presumably this means
it gets
shuffled around between nodes. However 'Channel', the tunnel through
which it communicates with Rabbit to ack messages and finalize the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new CheckpointMark is
instantiated, it's given a Channel. If an existing one is supplied to
the Reader's constructor (part of the 'startReader()' interface), the
channel is overwritten.
*However*, Rabbit does not support 'ack'ing messages on a channel
other
than the one that consumed them in the first place. Attempting to
do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
).
Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam with this
limitation of Rabbit.
Thanks,
-Daniel Robert