vvcephei commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463867098



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -461,6 +463,42 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }

Review comment:
       Seems like there's the missing possibility that it's not TimeOrdered or 
Cached. Should we log a different message than "Flushed cache or buffer" in 
that case, to indicate we _didn't_ flush it?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -267,80 +266,26 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                tasksToClose.add(task);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
-                }
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format(
-                    "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                    task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of 
clean).
-                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-                dirtyTasks.add(task);
+                tasksToCloseClean.add(task);
             }
         }
 
-        for (final Task oldTask : tasksToRecycle) {
-            final Task newTask;
-            try {
-                if (oldTask.isActive()) {
-                    if (!oldTask.state().equals(State.SUSPENDED)) {
-                        // Active tasks are revoked and suspended/committed 
during #handleRevocation
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  oldTask.id(), oldTask.state());
-                        throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
-                    }
-                    final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
-                    newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
-                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
-                } else {
-                    oldTask.suspend();
-                    oldTask.prepareCommit();
-                    oldTask.postCommit();
-                    final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
-                    newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
-                }
-                tasks.remove(oldTask.id());
-                addNewTask(newTask);
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(oldTask.id(), e);
-                dirtyTasks.add(oldTask);
-            }
-        }
+        // close and recycle those tasks
+        handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, 
tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, 
taskCloseExceptions);
 
-        for (final Task task : dirtyTasks) {
+        // for tasks that cannot be cleanly closed or recycled, close them 
dirty
+        for (final Task task : tasksToCloseDirty) {

Review comment:
       It seems a little odd to have `handleCloseAndRecycle` not do this but 
just update the taskToCloseDirty list, since it handles everything else.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());

Review comment:
       What's the rationale for subtracting the larger value from the smaller 
one and then taking the absolute value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to