This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e97dadc8ef MINOR: Moved resetting checkpointed offsets after stopping
KS when the new protocol is used. (#21597)
4e97dadc8ef is described below
commit 4e97dadc8ef865559437369145cecfd7f8114a8b
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Feb 27 06:33:34 2026 -0800
MINOR: Moved resetting checkpointed offsets after stopping KS when the new
protocol is used. (#21597)
Sometimes we manage to update the offsets during the first run which
leads to a smaller number of restored records. Which in turn makes the
test flaky. Moving resetting checkpointed offsets to the place where we
reset the committed offsets allows us to start from the beginning.
Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
<[email protected]>
---
.../integration/RestoreIntegrationTest.java | 36 +++++++++-------------
1 file changed, 15 insertions(+), 21 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 32df2b6f653..b58dd655a2d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -285,15 +285,9 @@ public class RestoreIntegrationTest {
createStateForRestoration(inputStream, 0);
if (!useNewProtocol) {
setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+ setCheckpointedOffset(props, inputStream, offsetCheckpointed);
}
- final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime(), true, false);
- // note here the checkpointed offset is the last processed record's
offset, so without control message we should write this offset - 1
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(inputStream,
0), (long) offsetCheckpointed - 1));
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(inputStream,
1), (long) offsetCheckpointed - 1));
-
final CountDownLatch shutdownLatch = new CountDownLatch(1);
topology.addReadOnlyStateStore(
@@ -320,7 +314,8 @@ public class RestoreIntegrationTest {
// For new protocol, we need to stop the streams instance before
altering offsets
kafkaStreams.close(Duration.ofSeconds(60));
setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
-
+ setCheckpointedOffset(props, inputStream, offsetCheckpointed);
+
// Restart the streams instance with a new startup latch
kafkaStreams = new KafkaStreams(topology, props);
@@ -352,15 +347,9 @@ public class RestoreIntegrationTest {
createStateForRestoration(inputStream, 0);
if (!useNewProtocol) {
setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+ setCheckpointedOffset(props, inputStream, offsetCheckpointed);
}
- final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime(), true, false);
- // note here the checkpointed offset is the last processed record's
offset, so without control message we should write this offset - 1
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(inputStream,
0), (long) offsetCheckpointed - 1));
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(inputStream,
1), (long) offsetCheckpointed - 1));
-
final CountDownLatch startupLatch = new CountDownLatch(1);
final CountDownLatch shutdownLatch = new CountDownLatch(1);
@@ -389,6 +378,7 @@ public class RestoreIntegrationTest {
// For new protocol, we need to stop the streams instance before
altering offsets
kafkaStreams.close();
setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol);
+ setCheckpointedOffset(props, inputStream, offsetCheckpointed);
// Restart the streams instance with a new startup latch
kafkaStreams = new KafkaStreams(builder.build(props), props);
@@ -424,12 +414,7 @@ public class RestoreIntegrationTest {
createStateForRestoration(changelog, 0);
createStateForRestoration(inputStream, 10000);
- final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime(), true, false);
- // note here the checkpointed offset is the last processed record's
offset, so without control message we should write this offset - 1
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(changelog, 0),
(long) offsetCheckpointed - 1));
- new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)),
".checkpoint"))
- .write(Collections.singletonMap(new TopicPartition(changelog, 1),
(long) offsetCheckpointed - 1));
+ setCheckpointedOffset(props, changelog, offsetCheckpointed);
final CountDownLatch startupLatch = new CountDownLatch(1);
final CountDownLatch shutdownLatch = new CountDownLatch(1);
@@ -988,6 +973,15 @@ public class RestoreIntegrationTest {
}
}
+ private void setCheckpointedOffset(final Properties props, final String
inputStream, final long offsetCheckpointed) throws IOException {
+ final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime(), true, false);
+ // note here the checkpointed offset is the last processed record's
offset, so without control message we should write this offset - 1
+ new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0)),
".checkpoint"))
+ .write(Collections.singletonMap(new
TopicPartition(inputStream, 0), offsetCheckpointed - 1));
+ new OffsetCheckpoint(new
File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)),
".checkpoint"))
+ .write(Collections.singletonMap(new
TopicPartition(inputStream, 1), offsetCheckpointed - 1));
+ }
+
private void waitForTransitionTo(final Set<KafkaStreams.State> observed,
final KafkaStreams.State state, final Duration timeout) throws Exception {
waitForCondition(
() -> observed.contains(state),