This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 351a22a KAFKA-10755: Should consider commit latency when computing
next commit timestamp (#9634)
351a22a is described below
commit 351a22a12ecb79a1e1070bf1129f2e168e4e0670
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 20 18:55:40 2020 -0800
KAFKA-10755: Should consider commit latency when computing next commit
timestamp (#9634)
Reviewer: Guozhang Wang <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 1 +
.../processor/internals/StreamThreadTest.java | 43 +++++++++++++++++++---
2 files changed, 39 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index edb7709..0407b98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -944,6 +944,7 @@ public class StreamThread extends Thread {
if (committed == -1) {
log.debug("Unable to commit as we are in the middle of a
rebalance, will try again when it completes.");
} else {
+ advanceNowAndComputeLatency();
lastCommitMs = now;
}
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 54361f0..a0a0e3a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -741,18 +741,41 @@ public class StreamThreadTest {
}
@Test
- public void shouldCommitAfterTheCommitInterval() {
- final long commitInterval = 1000L;
+ public void shouldCommitAfterCommitInterval() {
+ final long commitInterval = 100L;
+ final long commitLatency = 10L;
+
final Properties props = configProps(false);
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.toString(commitInterval));
final StreamsConfig config = new StreamsConfig(props);
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
- final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST, mockTime);
+
+ final AtomicBoolean committed = new AtomicBoolean(false);
+ final TaskManager taskManager = new TaskManager(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ) {
+ @Override
+ int commit(final Collection<Task> tasksToCommit) {
+ committed.set(true);
+ // we advance time to make sure the commit delay is considered
when computing the next commit timestamp
+ mockTime.sleep(commitLatency);
+ return 1;
+ }
+ };
+
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -775,11 +798,21 @@ public class StreamThreadTest {
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
- mockTime.sleep(commitInterval + 1);
+ assertTrue(committed.get());
+
+ mockTime.sleep(commitInterval);
+
+ committed.set(false);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
+ assertFalse(committed.get());
- verify(taskManager);
+ mockTime.sleep(1);
+
+ committed.set(false);
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+ assertTrue(committed.get());
}
@Test