mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439695107



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
+     *                +------+--------+           |
+     *                       |                    |
+     *                       v                    |
+     *                 +-----+-------+            |
+     *                 | Closed (4)  | -----------+
      *                 +-------------+
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to 
handle corrupted tasks
+        CREATED(1, 3),            // 0
+        RESTORING(2, 3),          // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3

Review comment:
       So far, we did not allow idempotent state transitions in the state 
machine itself, but handle it caller side. -- It seem inconsistent to allow 
`SUSPENDED -> SUSPEND` but not `CREATE -> CREATED` etc.
   
   I would recommend to keep the current pattern and avoid calling 
`transiteState()` if the task is already in the target state. -- I would also 
be happy to change it, but for this case, we should change it for _all_ cases. 
However, this would enlarge the scope of this PR and I think it better _not_ to 
do it in this PR.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -250,14 +250,10 @@ public void completeRestoration() {
     public void suspend() {
         switch (state()) {
             case CREATED:
-            case SUSPENDED:

Review comment:
       IMHO, we should keep the `SUSPENDED` case for consistency reasons. Only 
merge `CREATED` and `RESTORING` (cf. my other comment on `Task.java`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -112,10 +112,19 @@ public void completeRestoration() {
     @Override
     public void suspend() {
         log.trace("No-op suspend with state {}", state());
-        if (state() == State.RUNNING) {
-            transitionTo(State.SUSPENDED);
-        } else if (state() == State.RESTORING) {
-            throw new IllegalStateException("Illegal state " + state() + " 
while suspending standby task " + id);
+        switch (state()) {
+            case CREATED:
+            case RUNNING:
+            case SUSPENDED:

Review comment:
       The `SUSPEND` case should be no-op IMHO and not call `transiteTo()` 
(compare my other comment in `Task.java`).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,20 +470,17 @@ public void update(final Set<TopicPartition> 
topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
+        // Stream tasks should have already been suspended and their consumed 
offsets committed before recycling

Review comment:
       Is the comment necessary? Seem the code is self-explaining?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();

Review comment:
       Nit: Why `LinkedList<Task> tasksToClose`? Should we only declare it as 
`List`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429

Review comment:
       Why remove the `<` arrow? We can still transit from `RESTORING` to 
`SUSPENDED`.
   
   Super-nit: `+---->| Suspended` -> `+---> | Suspended`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Why do we do the `try-catch` as outer-layer? In an exception occurs, we 
should stop looping through the tasks to call `postCommit()` -- is this 
intended? If yes, why?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
                 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed 
active/standby status
             } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the 
point of clean).
-                    // Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
       If we hit an exception in `handleRevocation` why would we continue here? 
Are we still in a "clean enough" state to actually continue?
   
   Below we call `completeTaskCloseClean(task)` what seem incorrect for this 
case as it might close clean task even if we did not successfully commit before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to