Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4928#discussion_r148226195
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
    @@ -242,10 +243,25 @@ public void 
addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) thr
         * @param commitCallback The callback that the user should trigger when 
a commit request completes or fails.
         * @throws Exception This method forwards exceptions.
         */
    -   public abstract void commitInternalOffsetsToKafka(
    +   public final void commitInternalOffsetsToKafka(
    +                   Map<KafkaTopicPartition, Long> offsets,
    +                   @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
    +           // Ignore sentinels. They might appear here if snapshot has 
started before actual offsets values
    +           // replaced sentinels
    +           doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), 
commitCallback);
    +   }
    +
    +   protected abstract void doCommitInternalOffsetsToKafka(
                        Map<KafkaTopicPartition, Long> offsets,
                        @Nonnull KafkaCommitCallback commitCallback) throws 
Exception;
     
    +   private Map<KafkaTopicPartition, Long> 
filerOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
    --- End diff --
    
    typo: `filterOutSentinels`, missing `t`.


---

Reply via email to