This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 351a22a  KAFKA-10755: Should consider commit latency when computing 
next commit timestamp (#9634)
351a22a is described below

commit 351a22a12ecb79a1e1070bf1129f2e168e4e0670
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 20 18:55:40 2020 -0800

    KAFKA-10755: Should consider commit latency when computing next commit 
timestamp (#9634)
    
    Reviewer: Guozhang Wang <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  |  1 +
 .../processor/internals/StreamThreadTest.java      | 43 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index edb7709..0407b98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -944,6 +944,7 @@ public class StreamThread extends Thread {
             if (committed == -1) {
                 log.debug("Unable to commit as we are in the middle of a 
rebalance, will try again when it completes.");
             } else {
+                advanceNowAndComputeLatency();
                 lastCommitMs = now;
             }
         } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 54361f0..a0a0e3a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -741,18 +741,41 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCommitAfterTheCommitInterval() {
-        final long commitInterval = 1000L;
+    public void shouldCommitAfterCommitInterval() {
+        final long commitInterval = 100L;
+        final long commitLatency = 10L;
+
         final Properties props = configProps(false);
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
         props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Long.toString(commitInterval));
 
         final StreamsConfig config = new StreamsConfig(props);
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+
+        final AtomicBoolean committed = new AtomicBoolean(false);
+        final TaskManager taskManager = new TaskManager(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ) {
+            @Override
+            int commit(final Collection<Task> tasksToCommit) {
+                committed.set(true);
+                // we advance time to make sure the commit delay is considered 
when computing the next commit timestamp
+                mockTime.sleep(commitLatency);
+                return 1;
+            }
+        };
+
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -775,11 +798,21 @@ public class StreamThreadTest {
 
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
-        mockTime.sleep(commitInterval + 1);
+        assertTrue(committed.get());
+
+        mockTime.sleep(commitInterval);
+
+        committed.set(false);
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
+        assertFalse(committed.get());
 
-        verify(taskManager);
+        mockTime.sleep(1);
+
+        committed.set(false);
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+        assertTrue(committed.get());
     }
 
     @Test

Reply via email to