Elias Levy created FLINK-4502:
---------------------------------
Summary: Cassandra connector documentation has misleading
consistency guarantees
Key: FLINK-4502
URL: https://issues.apache.org/jira/browse/FLINK-4502
Project: Flink
Issue Type: Bug
Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy
The Cassandra connector documentation states that "enableWriteAheadLog() is an
optional method, that allows exactly-once processing for non-deterministic
algorithms." This claim appears to be false.
>From what I gather, the write ahead log feature of the connector works as
>follows:
- The sink is replaced with a stateful operator that writes incoming messages
to the state backend based on checkpoint they belong in.
- When the operator is notified that a Flink checkpoint has been completed it,
for each set of checkpoints older than and including the committed one:
* reads its messages from the state backend
* writes them to Cassandra
* records that it has committed them to Cassandra for the specific checkpoint
and operator instance
* and erases them from the state backend.
This process attempts to avoid resubmitting queries to Cassandra that would
otherwise occur when recovering a job from a checkpoint and having messages
replayed.
Alas, this does not guarantee exactly once semantics at the sink. The writes
to Cassandra that occur when the operator is notified that checkpoint is
completed are not atomic and they are potentially non-idempotent. If the job
dies while writing to Cassandra or before committing the checkpoint via
committer, queries will be replayed when the job recovers. Thus the
documentation appear to be incorrect in stating this provides exactly-once
semantics.
There also seems to be an issue in GenericWriteAheadSink's
notifyOfCompletedCheckpoint which may result in incorrect output. If
sendValues returns false because a write failed, instead of bailing, it simply
moves on to the next checkpoint to commit if there is one, keeping the previous
one around to try again later. But that can result in newer data being
overwritten with older data when the previous checkpoint is retried. Although
given that CassandraCommitter implements isCheckpointCommitted as checkpointID
<= this.lastCommittedCheckpointID, it actually means that when it goes back to
try the uncommitted older checkpoint it will consider it committed, even though
some of its data may not have been written out, and the data will be discarded.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)