Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2453#discussion_r156499862
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -625,4 +620,21 @@ public String toString() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
+
+ private static class PollablePartitionInfo {
+ private final Set<TopicPartition> pollablePartitions;
+ //The subset of earliest retriable offsets that are on pollable
partitions
+ private final Map<TopicPartition, Long>
pollableEarliestRetriableOffsets;
+
+ public PollablePartitionInfo(Set<TopicPartition>
pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
+ this.pollablePartitions = pollablePartitions;
+ this.pollableEarliestRetriableOffsets =
earliestRetriableOffsets.entrySet().stream()
+ .filter(entry ->
pollablePartitions.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry
-> entry.getValue()));
+ }
+
+ public boolean isPollAllowed() {
--- End diff --
why not simply `isEmpty()` ?
I really don't thing that _allowed_ is a very suitable name here because we
are not checking for permissions or anything like that. If it is in the
pollable set it is always allowed.
---