guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r448093846



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       I'm a bit torn about this optimization to avoid double checkpointing, 
because on the other hand, even if we have not made any progress since loaded 
the checkpoint, we'd still need to make a checkpoint upon closing if we have 
never made one before -- and I use emptyMap as an indicator for that.
   
   Given that upon suspending we are now less likely to checkpoint, the chance 
that we would double checkpointing (when transiting to suspended, and when 
transiting to closed) is smaller, and hence I'm thinking maybe I'd just remove 
this optimization to make the logic a bit simpler. LMK WDYT.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,13 +530,22 @@ public void closeCleanAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void writeCheckpoint() {
+    /**
+     * The following exceptions maybe thrown from the state manager flushing 
call
+     *
+     * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+     * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+     *                          or flushing state store get IO errors; such 
error should cause the thread to die
+     */
+    @Override
+    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {

Review comment:
       I decided to extract out the update of the changelog offsets from 
actually writing the offsets since even if we do not want to write the file, we 
still need to update the offsets.
   
   The reason I did not yet remove the parameter from `checkpoint` is that 
global-task is still using it. I plan to remove it when consolidating the 
global task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to