vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r451938104
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); + // Pause so we won't poll any more records for this task until it has been re-initialized + // Note, closeDirty already clears the partitiongroup for the task. + mainConsumer().pause(task.inputPartitions()); + final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(task.inputPartitions()); + for (final TopicPartition topicPartition : task.inputPartitions()) { + final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition); + if (offsetAndMetadata == null) { + final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition); + switch (strategy) { + case EARLIEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); + break; + case LATEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); + break; + default: + throw new IllegalArgumentException("Unexpected reset strategy: " + strategy); + } + } else { + mainConsumer().seek(topicPartition, offsetAndMetadata); + } + } Review comment: I think we can, too. I'll give it a shot, since it doesn't seem crazy to you :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org