vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451938104



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, 
Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
             task.closeDirty();
+            // Pause so we won't poll any more records for this task until it 
has been re-initialized
+            // Note, closeDirty already clears the partitiongroup for the task.
+            mainConsumer().pause(task.inputPartitions());
+            final Map<TopicPartition, OffsetAndMetadata> committed = 
mainConsumer().committed(task.inputPartitions());
+            for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+                final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+                if (offsetAndMetadata == null) {
+                    final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+                    switch (strategy) {
+                        case EARLIEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        case LATEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unexpected 
reset strategy: " + strategy);
+                    }
+                } else {
+                    mainConsumer().seek(topicPartition, offsetAndMetadata);
+                }
+            }

Review comment:
       I think we can, too. I'll give it a shot, since it doesn't seem crazy to 
you :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to