guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439493219



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                    if (task.commitNeeded()) {
+                        if (task.isActive()) {
+                            log.error("Active task {} was revoked and should 
have already been committed", task.id());
+                            throw new IllegalStateException("Revoked active 
task was not committed during handleRevocation");
+                        } else {
+                            task.prepareCommit();
+                            task.postCommit();
+                        }
                     }
+                    completeTaskCloseClean(task);

Review comment:
       When closing-clean a standby task, we would checkpoint the file and 
close the state store which would also flush it as well, so I think we do not 
need to call
   
   ```
   task.prepareCommit();
   task.postCommit();
   ```
   
   which is just to flush the stores and write checkpoint files, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -440,41 +402,35 @@ boolean tryToCompleteRestoration() {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new 
HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);

Review comment:
       nit: Add in the above javadoc that we should only revoke active tasks 
here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       nit: add a comment above `task.suspend()` that for active it should 
always be an no-op?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to