Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2453#discussion_r156013674
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -625,4 +619,15 @@ public String toString() {
         private String getTopicsString() {
             return kafkaSpoutConfig.getSubscription().getTopicsString();
         }
    +    
    +    private static class PollingInfo {
    +        private final Set<TopicPartition> pollablePartitions;
    +        private final Map<TopicPartition, Long> 
pollableEarliestRetriableOffsets;
    +        
    +        public PollingInfo(Set<TopicPartition> pollablePartitions, 
Map<TopicPartition, Long> earliestRetriableOffsets) {
    +            this.pollablePartitions = pollablePartitions;
    +            earliestRetriableOffsets.keySet().removeIf(tp -> 
!pollablePartitions.contains(tp));
    --- End diff --
    
    minor: might be better to copy the map first, and remove the elements to 
avoid side-effect to the parameter.


---

Reply via email to