Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2454#discussion_r157104947
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -442,10 +435,13 @@ private boolean isEmitTuple(List<Object> tuple) {
return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
}
- private void commitOffsetsForAckedTuples() {
- // Find offsets that are ready to be committed for every topic
partition
+ private void commitOffsetsForAckedTuples(Set<TopicPartition>
assignedPartitions) {
+ // Find offsets that are ready to be committed for every assigned
topic partition
+ final Map<TopicPartition, OffsetManager> assignedOffsetManagers =
offsetManagers.entrySet().stream()
+ .filter(entry -> assignedPartitions.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets =
new HashMap<>();
--- End diff --
An empty line before this var would make the code easier to read
---