Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2454#discussion_r157134967
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -442,10 +435,13 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    -    private void commitOffsetsForAckedTuples() {
    -        // Find offsets that are ready to be committed for every topic 
partition
    +    private void commitOffsetsForAckedTuples(Set<TopicPartition> 
assignedPartitions) {
    +        // Find offsets that are ready to be committed for every assigned 
topic partition
    +        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = 
offsetManagers.entrySet().stream()
    +            .filter(entry -> assignedPartitions.contains(entry.getKey()))
    +            .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
    --- End diff --
    
    Will fix


---

Reply via email to