ableegoldman commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451884449



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -95,7 +96,7 @@
      *          |      | Assigned (3)| <----+
      *          |      +-----+-------+      |
      *          |            |              |
-     *          |            |              |
+     *          |            |--------------+

Review comment:
       What's up with this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, 
Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
             task.closeDirty();
+            // Pause so we won't poll any more records for this task until it 
has been re-initialized
+            // Note, closeDirty already clears the partitiongroup for the task.
+            mainConsumer().pause(task.inputPartitions());
+            final Map<TopicPartition, OffsetAndMetadata> committed = 
mainConsumer().committed(task.inputPartitions());
+            for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+                final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+                if (offsetAndMetadata == null) {
+                    final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+                    switch (strategy) {
+                        case EARLIEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        case LATEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));

Review comment:
       Should this be `seekToEnd` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to