[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547536#comment-15547536 ]
ASF GitHub Bot commented on FLINK-4723: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2580 Thanks for the review @StephanEwen. Concerning changing the contract for `commitSpecificOffsetsToKafka`: Makes sense, I don't really like excessive copying too. With proper tests on both 0.8 and 0.9, I think it's reasonable to change this. I'll update this, and probably also rename `commitSpecficOffsetsToKafka` to reflect the contract behaviour. Thanks for the tip and test stability, I'll do that ;) > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > --------------------------------------------------------------------------------- > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)