Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2453#discussion_r156013674
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -625,4 +619,15 @@ public String toString() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
+
+ private static class PollingInfo {
+ private final Set<TopicPartition> pollablePartitions;
+ private final Map<TopicPartition, Long>
pollableEarliestRetriableOffsets;
+
+ public PollingInfo(Set<TopicPartition> pollablePartitions,
Map<TopicPartition, Long> earliestRetriableOffsets) {
+ this.pollablePartitions = pollablePartitions;
+ earliestRetriableOffsets.keySet().removeIf(tp ->
!pollablePartitions.contains(tp));
--- End diff --
minor: might be better to copy the map first, and remove the elements to
avoid side-effect to the parameter.
---