Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r157134967 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -442,10 +435,13 @@ private boolean isEmitTuple(List<Object> tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } - private void commitOffsetsForAckedTuples() { - // Find offsets that are ready to be committed for every topic partition + private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) { + // Find offsets that are ready to be committed for every assigned topic partition + final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream() + .filter(entry -> assignedPartitions.contains(entry.getKey())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); --- End diff -- Will fix
---