This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e97dadc8ef MINOR: Moved resetting checkpointed offsets after stopping 
KS when the new protocol is used. (#21597)
4e97dadc8ef is described below

commit 4e97dadc8ef865559437369145cecfd7f8114a8b
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Feb 27 06:33:34 2026 -0800

    MINOR: Moved resetting checkpointed offsets after stopping KS when the new 
protocol is used. (#21597)
    
    Sometimes we manage to update the offsets during the first run which
    leads to a smaller number of restored records.  Which in turn makes the
    test flaky.  Moving resetting checkpointed offsets to the place where we
    reset the committed offsets allows us to start from the beginning.
    
    Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
    <[email protected]>
---
 .../integration/RestoreIntegrationTest.java        | 36 +++++++++-------------
 1 file changed, 15 insertions(+), 21 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 32df2b6f653..b58dd655a2d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -285,15 +285,9 @@ public class RestoreIntegrationTest {
         createStateForRestoration(inputStream, 0);
         if (!useNewProtocol) {
             setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+            setCheckpointedOffset(props, inputStream, offsetCheckpointed);
         }
 
-        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true, false);
-        // note here the checkpointed offset is the last processed record's 
offset, so without control message we should write this offset - 1
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(inputStream, 
0), (long) offsetCheckpointed - 1));
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(inputStream, 
1), (long) offsetCheckpointed - 1));
-
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
 
         topology.addReadOnlyStateStore(
@@ -320,7 +314,8 @@ public class RestoreIntegrationTest {
             // For new protocol, we need to stop the streams instance before 
altering offsets
             kafkaStreams.close(Duration.ofSeconds(60));
             setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
-            
+            setCheckpointedOffset(props, inputStream, offsetCheckpointed);
+
             // Restart the streams instance with a new startup latch
             
             kafkaStreams = new KafkaStreams(topology, props);
@@ -352,15 +347,9 @@ public class RestoreIntegrationTest {
         createStateForRestoration(inputStream, 0);
         if (!useNewProtocol) {
             setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+            setCheckpointedOffset(props, inputStream, offsetCheckpointed);
         }
 
-        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true, false);
-        // note here the checkpointed offset is the last processed record's 
offset, so without control message we should write this offset - 1
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(inputStream, 
0), (long) offsetCheckpointed - 1));
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(inputStream, 
1), (long) offsetCheckpointed - 1));
-
         final CountDownLatch startupLatch = new CountDownLatch(1);
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
 
@@ -389,6 +378,7 @@ public class RestoreIntegrationTest {
             // For new protocol, we need to stop the streams instance before 
altering offsets
             kafkaStreams.close();
             setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+            setCheckpointedOffset(props, inputStream, offsetCheckpointed);
 
             // Restart the streams instance with a new startup latch
             kafkaStreams = new KafkaStreams(builder.build(props), props);
@@ -424,12 +414,7 @@ public class RestoreIntegrationTest {
         createStateForRestoration(changelog, 0);
         createStateForRestoration(inputStream, 10000);
 
-        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true, false);
-        // note here the checkpointed offset is the last processed record's 
offset, so without control message we should write this offset - 1
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(changelog, 0), 
(long) offsetCheckpointed - 1));
-        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), 
".checkpoint"))
-            .write(Collections.singletonMap(new TopicPartition(changelog, 1), 
(long) offsetCheckpointed - 1));
+        setCheckpointedOffset(props, changelog, offsetCheckpointed);
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
@@ -988,6 +973,15 @@ public class RestoreIntegrationTest {
         }
     }
 
+    private void setCheckpointedOffset(final Properties props, final String 
inputStream, final long offsetCheckpointed) throws IOException {
+        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true, false);
+        // note here the checkpointed offset is the last processed record's 
offset, so without control message we should write this offset - 1
+        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)), 
".checkpoint"))
+                .write(Collections.singletonMap(new 
TopicPartition(inputStream, 0), offsetCheckpointed - 1));
+        new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), 
".checkpoint"))
+                .write(Collections.singletonMap(new 
TopicPartition(inputStream, 1), offsetCheckpointed - 1));
+    }
+
     private void waitForTransitionTo(final Set<KafkaStreams.State> observed, 
final KafkaStreams.State state, final Duration timeout) throws Exception {
         waitForCondition(
             () -> observed.contains(state),

Reply via email to