Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4928#discussion_r148226195 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -242,10 +243,25 @@ public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) thr * @param commitCallback The callback that the user should trigger when a commit request completes or fails. * @throws Exception This method forwards exceptions. */ - public abstract void commitInternalOffsetsToKafka( + public final void commitInternalOffsetsToKafka( + Map<KafkaTopicPartition, Long> offsets, + @Nonnull KafkaCommitCallback commitCallback) throws Exception { + // Ignore sentinels. They might appear here if snapshot has started before actual offsets values + // replaced sentinels + doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), commitCallback); + } + + protected abstract void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception; + private Map<KafkaTopicPartition, Long> filerOutSentinels(Map<KafkaTopicPartition, Long> offsets) { --- End diff -- typo: `filterOutSentinels`, missing `t`.
---