mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441047360



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,26 +468,24 @@ public void update(final Set<TopicPartition> 
topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
         switch (state()) {
-            case CREATED:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
 
                 break;
 
-            case RESTORING: // we should have transitioned to `SUSPENDED` 
already
-            case RUNNING: // we should have transitioned to `SUSPENDED` already
+            case CREATED:
+            case RESTORING:
+            case RUNNING:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " 
while recycling active task " + id);
             default:
                 throw new IllegalStateException("Unknown state " + state() + " 
while recycling active task " + id);
         }
 
+        // we cannot `clear()` the `PartitionGroup` in `suspend()` already, 
but only after committing,
+        // because otherwise we loose the partition-time information
         partitionGroup.clear();

Review comment:
       As we always suspend a task before closing (even for unclean closing), I 
think we can actually remove this call?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to