guozhangwang commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441026485



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -502,56 +494,24 @@ public void closeAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void maybeScheduleCheckpoint() {
-        switch (state()) {
-            case RESTORING:
-            case SUSPENDED:
-                this.checkpoint = checkpointableOffsets();
-
-                break;
-
-            case RUNNING:
-                if (!eosEnabled) {
-                    this.checkpoint = checkpointableOffsets();
-                }
-
-                break;
-
-            case CREATED:
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
-        }
-    }
-
-    private void writeCheckpointIfNeed() {
+    private void maybeWriteCheckpoint() {
         if (commitNeeded) {
+            log.error("Tried to write a checkpoint with pending uncommitted 
data, should complete the commit first.");
             throw new IllegalStateException("A checkpoint should only be 
written if no commit is needed.");
         }
-        if (checkpoint != null) {
-            stateMgr.checkpoint(checkpoint);
-            checkpoint = null;
-        }
+        stateMgr.checkpoint(checkpointableOffsets());
     }
 
     /**
-     * <pre>
-     * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, 
EOS is still guaranteed
-     *  2. then if we are closing on EOS and dirty, wipe out the state store 
directory
-     *  3. finally release the state manager lock
-     * </pre>
+     * You must commit a task and checkpoint the state manager before closing 
as this will release the state dir lock
      */
     private void close(final boolean clean) {
-        if (clean) {
-            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state 
manager checkpoint", log);
+        if (clean && commitNeeded) {
+            log.debug("Tried to close clean but there was an active scheduled 
checkpoint, this means we failed to"

Review comment:
       nit: `was an active scheduled checkpoint` -> `there was a pending 
uncommitted data`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Sounds good, in that case the nested try-catch would be necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to