[ 
https://issues.apache.org/jira/browse/KAFKA-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346143#comment-16346143
 ] 

ASF GitHub Bot commented on KAFKA-6323:
---------------------------------------

mjsax closed pull request #4301: KAFKA-6323: punctuate with WALL_CLOCK_TIME 
triggered immediately
URL: https://github.com/apache/kafka/pull/4301
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index d4393aacba7..42902a866b1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -102,11 +102,22 @@
      * <ul>
      *   <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which 
is advanced by the processing of messages
      *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
      *   <b>NOTE:</b> Only advanced if messages arrive</li>
      *   <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the 
wall-clock time),
      *   which is advanced at the polling interval ({@link 
org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
-     *   independent of whether new messages arrive. <b>NOTE:</b> This is best 
effort only as its granularity is limited
-     *   by how long an iteration of the processing loop takes to complete</li>
+     *   independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited 
by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more 
than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time 
advances more than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too 
short interval, ...</li>
      * </ul>
      *
      * @param intervalMs the time interval between punctuations in milliseconds
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index cf50005fcd1..9c0ec88b69f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -27,12 +27,19 @@
     // this Cancellable will be re-pointed at the successor schedule in next()
     private final RepointableCancellable cancellable;
 
-    PunctuationSchedule(ProcessorNode node, long interval, Punctuator 
punctuator) {
-        this(node, 0L, interval, punctuator, new RepointableCancellable());
+    PunctuationSchedule(final ProcessorNode node,
+                        final long time,
+                        final long interval,
+                        final Punctuator punctuator) {
+        this(node, time, interval, punctuator, new RepointableCancellable());
         cancellable.setSchedule(this);
     }
 
-    private PunctuationSchedule(ProcessorNode node, long time, long interval, 
Punctuator punctuator, RepointableCancellable cancellable) {
+    private PunctuationSchedule(final ProcessorNode node,
+                                final long time,
+                                final long interval,
+                                final Punctuator punctuator,
+                                final RepointableCancellable cancellable) {
         super(node, time);
         this.interval = interval;
         this.punctuator = punctuator;
@@ -59,14 +66,19 @@ boolean isCancelled() {
         return isCancelled;
     }
 
-    public PunctuationSchedule next(long currTimestamp) {
-        PunctuationSchedule nextSchedule;
-        // we need to special handle the case when it is firstly triggered 
(i.e. the timestamp
-        // is equal to the interval) by reschedule based on the currTimestamp
-        if (timestamp == 0L)
-            nextSchedule = new PunctuationSchedule(value, currTimestamp + 
interval, interval, punctuator, cancellable);
-        else
-            nextSchedule = new PunctuationSchedule(value, timestamp + 
interval, interval, punctuator, cancellable);
+    public PunctuationSchedule next(final long currTimestamp) {
+        long nextPunctuationTime = timestamp + interval;
+        if (currTimestamp >= nextPunctuationTime) {
+            // we missed one ore more punctuations
+            // avoid scheduling a new punctuations immediately, this can 
happen:
+            // - when using STREAM_TIME punctuation and there was a gap i.e., 
no data was
+            //   received for at least 2*interval
+            // - when using WALL_CLOCK_TIME and there was a gap i.e., 
punctuation was delayed for at least 2*interval (GC pause, overload, ...)
+            final long intervalsMissed = (currTimestamp - timestamp) / 
interval;
+            nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
+        }
+
+        final PunctuationSchedule nextSchedule = new 
PunctuationSchedule(value, nextPunctuationTime, interval, punctuator, 
cancellable);
 
         cancellable.setSchedule(nextSchedule);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 11b2f89317b..56c0ab31f4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -578,16 +578,40 @@ public int addRecords(final TopicPartition partition, 
final Iterable<ConsumerRec
      * @throws IllegalStateException if the current node is not null
      */
     public Cancellable schedule(final long interval, final PunctuationType 
type, final Punctuator punctuator) {
+        switch (type) {
+            case STREAM_TIME:
+                // align punctuation to 0L, punctuate as soon as we have data
+                return schedule(0L, interval, type, punctuator);
+            case WALL_CLOCK_TIME:
+                // align punctuation to now, punctuate after interval has 
elapsed
+                return schedule(time.milliseconds() + interval, interval, 
type, punctuator);
+            default:
+                throw new IllegalArgumentException("Unrecognized 
PunctuationType: " + type);
+        }
+    }
+
+    /**
+     * Schedules a punctuation for the processor
+     *
+     * @param startTime time of the first punctuation
+     * @param interval the interval in milliseconds
+     * @param type the punctuation type
+     * @throws IllegalStateException if the current node is not null
+     */
+    Cancellable schedule(final long startTime, final long interval, final 
PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() == null) {
             throw new IllegalStateException(String.format("%sCurrent node is 
null", logPrefix));
         }
 
-        final PunctuationSchedule schedule = new 
PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
+        final PunctuationSchedule schedule = new 
PunctuationSchedule(processorContext.currentNode(), startTime, interval, 
punctuator);
 
         switch (type) {
             case STREAM_TIME:
+                // STREAM_TIME punctuation is data driven, will first 
punctuate as soon as stream-time is known and >= time,
+                // stream-time is known when we have received at least one 
record from each input topic
                 return streamTimePunctuationQueue.schedule(schedule);
             case WALL_CLOCK_TIME:
+                // WALL_CLOCK_TIME is driven by the wall clock time, will 
first punctuate when now >= time
                 return systemTimePunctuationQueue.schedule(schedule);
             default:
                 throw new IllegalArgumentException("Unrecognized 
PunctuationType: " + type);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 1570c9bdd13..09c7a0a3183 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -40,7 +40,7 @@ public void punctuate(long timestamp) {
             }
         };
 
-        final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, 
punctuator);
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 
100L, punctuator);
         final long now = sched.timestamp - 100L;
 
         queue.schedule(sched);
@@ -66,6 +66,64 @@ public void punctuate(ProcessorNode node, long time, 
PunctuationType type, Punct
 
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
         assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
+    }
+
+    @Test
+    public void testPunctuationIntervalCustomAlignment() {
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test", 
processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
+
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 
100L, punctuator);
+        final long now = sched.timestamp - 50L;
+
+        queue.schedule(sched);
+
+        ProcessorNodePunctuator processorNodePunctuator = new 
ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, 
PunctuationType type, Punctuator punctuator) {
+                punctuator.punctuate(time);
+            }
+        };
+
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, 
processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
     }
 
     private static class TestProcessor extends AbstractProcessor<String, 
String> {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92cfe66c3f6..1165d76cf2e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -322,63 +322,173 @@ public void testMaybePunctuateStreamTime() {
         task.initializeTopology();
 
         task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
-                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
-                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 32, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 60, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)
         ));
 
         task.addRecords(partition2, records(
                 new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
                 new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
-                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 61, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)
         ));
 
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
+        assertEquals(8, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(1, source1.numReceived);
+        assertEquals(6, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
+        assertEquals(3, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(4, task.numBuffered());
+        assertEquals(3, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(2, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
+        assertFalse(task.process());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 0L, 20L, 32L, 40L, 60L);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPunctuateOnceStreamTimeAfterGap() {
+        task = createStatelessTask(false);
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 142, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 155, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 160, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 145, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 159, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 161, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue)
+        ));
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(6, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
+
+        // only one punctuation after 100ms gap
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
+        assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertTrue(task.maybePunctuateStreamTime());
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
 
         assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
+        assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
+        assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, 
still aligned on the initial punctuation
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
 
-        
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 30L, 40L);
+        
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 142L, 155L, 160L);
     }
 
     @SuppressWarnings("unchecked")
@@ -425,9 +535,14 @@ public void shouldPunctuateSystemTimeWhenIntervalElapsed() 
{
         assertTrue(task.maybePunctuateSystemTime());
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
-        time.sleep(10);
+        time.sleep(9);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(20);
         assertTrue(task.maybePunctuateSystemTime());
-        
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10, now + 20, now + 30);
+        assertFalse(task.maybePunctuateSystemTime());
+        
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10, now + 20, now + 30, now + 50);
     }
 
     @Test
@@ -435,11 +550,36 @@ public void 
shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
         task = createStatelessTask(false);
         task.initializeStateStores();
         task.initializeTopology();
-        long now = time.milliseconds();
-        assertTrue(task.maybePunctuateSystemTime()); // first time we always 
punctuate
+        assertFalse(task.maybePunctuateSystemTime());
         time.sleep(9);
         assertFalse(task.maybePunctuateSystemTime());
-        
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now);
+        
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+    }
+
+    @Test
+    public void shouldPunctuateOnceSystemTimeAfterGap() {
+        task = createStatelessTask(false);
+        task.initializeStateStores();
+        task.initializeTopology();
+        long now = time.milliseconds();
+        time.sleep(100);
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(12);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(7);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1); // punctuate at now + 130
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(105); // punctuate at now + 235
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(5); // punctuate at now + 240, still aligned on the initial 
punctuation
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
     @Test
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 4100f186566..5073efdb64c 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -593,36 +593,35 @@ public void shouldPunctuateOnStreamsTime() {
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 42L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(51L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 51L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(52L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 52L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(61L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 61L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(65L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 65L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(71L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 71L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(72L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 72L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(95L);
         expectedPunctuations.add(95L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 95L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(101L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 101L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(102L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, 
key1, value1, 102L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
     }
@@ -637,21 +636,23 @@ public void shouldPunctuateOnWallClockTime() {
 
         final List<Long> expectedPunctuations = new LinkedList<>();
 
-        expectedPunctuations.add(5L);
         testDriver.advanceWallClockTime(5L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(14L);
         testDriver.advanceWallClockTime(9L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(15L);
         testDriver.advanceWallClockTime(1L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(35L);
         expectedPunctuations.add(35L);
         testDriver.advanceWallClockTime(20L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(40L);
+        testDriver.advanceWallClockTime(5L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> punctuate with WALL_CLOCK_TIME triggered immediately
> ----------------------------------------------------
>
>                 Key: KAFKA-6323
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6323
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Arno
>            Assignee: Frederic Arno
>            Priority: Major
>             Fix For: 1.1.0, 1.0.1
>
>
> When working on a custom Processor from which I am scheduling a punctuation 
> using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I 
> set, a call to my Punctuator is always triggered immediately.
> Having a quick look at kafka-streams' code, I could find that all 
> PunctuationSchedule's timestamps are matched against the current time in 
> order to decide whether or not to trigger the punctuator 
> (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). 
> However, I've only seen code that initializes PunctuationSchedule's timestamp 
> to 0, which I guess is what is causing an immediate punctuation.
> At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's 
> timestamp be initialized to current time + interval?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to