David Morávek created FLINK-23896:
-------------------------------------
Summary: The new KafkaSink drops data if job fails between
checkpoint and transaction commit.
Key: FLINK-23896
URL: https://issues.apache.org/jira/browse/FLINK-23896
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: David Morávek
Fix For: 1.14.0
* Any time a new *transactional producer* is started,
"[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]"
needs to be called in order to obtain new *ProducerId* from
*TransactionCoordinator* (Kafka Broker component).
** *ProducerId* is increased every time a new producer with the same
*TransactionalId* is registered.
** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS*
all of uncommitted transactions assigned with them.
* *KafkaCommitter* uses a separate producer, that hacks into Kafka internals
and resumes transaction using epoch and producer, without actually assigning a
new ProducerId for itself. This committer uses *ProducerId* that is stored in
*KafkaComittable* state to commit transaction.
* If a *new producer is started before committing the transaction* (this can
happen in some failover scenarios), ProducerId stored in the state is already
*FENCED* and commit fails with *ProducerFencedException*. Because we currently
ignore this exception (we just log a warning), we effectively *DROP* all
uncommitted data from the previous checkpoint.
Basically any job failure that happens between successfully taking a checkpoint
and committing transactions, will trigger this behavior.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)