[
https://issues.apache.org/jira/browse/FLINK-38453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18023736#comment-18023736
]
Hongshun Wang commented on FLINK-38453:
---------------------------------------
Hi, [~arvid] , i also want to solve this problem. If we support
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment,]
we can do it easily.
> KafkaEnumerator doesn't restore offsets of owned split
> ------------------------------------------------------
>
> Key: FLINK-38453
> URL: https://issues.apache.org/jira/browse/FLINK-38453
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: kafka-5.0.0
> Reporter: Arvid Heise
> Assignee: Arvid Heise
> Priority: Major
>
> KafkaEnumerator's state contains the TopicPartitions only but not the
> offsets, so it doesn't contain the full split state contrary to the design
> intent.
> There are a couple of issues with that approach. It implicitly assumes that
> splits are fully assigned to readers before the first checkpoint. Else the
> enumerator will invoke the offset initializer again on recovery from such a
> checkpoint leading to inconsistencies (LATEST may be initialized during the
> first attempt for some partitions and initialized during second attempt for
> others).
> Through addSplitBack callback, you may also get these scenarios later for
> BATCH which actually leads to duplicate rows (in case of EARLIEST or
> SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not
> possible to safely use KafkaSource as part of a HybridSource because the
> offset initializer cannot even be recreated on recovery.
> All cases are solved by also retaining the offset in the enumerator state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)