[ https://issues.apache.org/jira/browse/KAFKA-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346143#comment-16346143 ]
ASF GitHub Bot commented on KAFKA-6323: --------------------------------------- mjsax closed pull request #4301: KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately URL: https://github.com/apache/kafka/pull/4301 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index d4393aacba7..42902a866b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -102,11 +102,22 @@ * <ul> * <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. + * The first punctuation will be triggered by the first record that is processed. * <b>NOTE:</b> Only advanced if messages arrive</li> * <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock time), * which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) - * independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited - * by how long an iteration of the processing loop takes to complete</li> + * independent of whether new messages arrive. + * The first punctuation will be triggered after interval has elapsed. + * <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the + * processing loop takes to complete</li> + * </ul> + * + * <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp. + * This means that "missed" punctuation will be skipped. + * It's possible to "miss" a punctuation if: + * <ul> + * <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li> + * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li> * </ul> * * @param intervalMs the time interval between punctuations in milliseconds diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java index cf50005fcd1..9c0ec88b69f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java @@ -27,12 +27,19 @@ // this Cancellable will be re-pointed at the successor schedule in next() private final RepointableCancellable cancellable; - PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) { - this(node, 0L, interval, punctuator, new RepointableCancellable()); + PunctuationSchedule(final ProcessorNode node, + final long time, + final long interval, + final Punctuator punctuator) { + this(node, time, interval, punctuator, new RepointableCancellable()); cancellable.setSchedule(this); } - private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator punctuator, RepointableCancellable cancellable) { + private PunctuationSchedule(final ProcessorNode node, + final long time, + final long interval, + final Punctuator punctuator, + final RepointableCancellable cancellable) { super(node, time); this.interval = interval; this.punctuator = punctuator; @@ -59,14 +66,19 @@ boolean isCancelled() { return isCancelled; } - public PunctuationSchedule next(long currTimestamp) { - PunctuationSchedule nextSchedule; - // we need to special handle the case when it is firstly triggered (i.e. the timestamp - // is equal to the interval) by reschedule based on the currTimestamp - if (timestamp == 0L) - nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval, punctuator, cancellable); - else - nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval, punctuator, cancellable); + public PunctuationSchedule next(final long currTimestamp) { + long nextPunctuationTime = timestamp + interval; + if (currTimestamp >= nextPunctuationTime) { + // we missed one ore more punctuations + // avoid scheduling a new punctuations immediately, this can happen: + // - when using STREAM_TIME punctuation and there was a gap i.e., no data was + // received for at least 2*interval + // - when using WALL_CLOCK_TIME and there was a gap i.e., punctuation was delayed for at least 2*interval (GC pause, overload, ...) + final long intervalsMissed = (currTimestamp - timestamp) / interval; + nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval; + } + + final PunctuationSchedule nextSchedule = new PunctuationSchedule(value, nextPunctuationTime, interval, punctuator, cancellable); cancellable.setSchedule(nextSchedule); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 11b2f89317b..56c0ab31f4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -578,16 +578,40 @@ public int addRecords(final TopicPartition partition, final Iterable<ConsumerRec * @throws IllegalStateException if the current node is not null */ public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) { + switch (type) { + case STREAM_TIME: + // align punctuation to 0L, punctuate as soon as we have data + return schedule(0L, interval, type, punctuator); + case WALL_CLOCK_TIME: + // align punctuation to now, punctuate after interval has elapsed + return schedule(time.milliseconds() + interval, interval, type, punctuator); + default: + throw new IllegalArgumentException("Unrecognized PunctuationType: " + type); + } + } + + /** + * Schedules a punctuation for the processor + * + * @param startTime time of the first punctuation + * @param interval the interval in milliseconds + * @param type the punctuation type + * @throws IllegalStateException if the current node is not null + */ + Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) { if (processorContext.currentNode() == null) { throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix)); } - final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator); + final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator); switch (type) { case STREAM_TIME: + // STREAM_TIME punctuation is data driven, will first punctuate as soon as stream-time is known and >= time, + // stream-time is known when we have received at least one record from each input topic return streamTimePunctuationQueue.schedule(schedule); case WALL_CLOCK_TIME: + // WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate when now >= time return systemTimePunctuationQueue.schedule(schedule); default: throw new IllegalArgumentException("Unrecognized PunctuationType: " + type); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java index 1570c9bdd13..09c7a0a3183 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java @@ -40,7 +40,7 @@ public void punctuate(long timestamp) { } }; - final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator); + final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator); final long now = sched.timestamp - 100L; queue.schedule(sched); @@ -66,6 +66,64 @@ public void punctuate(ProcessorNode node, long time, PunctuationType type, Punct queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(2, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(3, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(3, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(4, processor.punctuatedAt.size()); + } + + @Test + public void testPunctuationIntervalCustomAlignment() { + final TestProcessor processor = new TestProcessor(); + final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null); + final PunctuationQueue queue = new PunctuationQueue(); + final Punctuator punctuator = new Punctuator() { + @Override + public void punctuate(long timestamp) { + node.processor().punctuate(timestamp); + } + }; + + final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator); + final long now = sched.timestamp - 50L; + + queue.schedule(sched); + + ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() { + @Override + public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) { + punctuator.punctuate(time); + } + }; + + queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(0, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(0, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(1, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(1, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(2, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(3, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(3, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator); + assertEquals(4, processor.punctuatedAt.size()); } private static class TestProcessor extends AbstractProcessor<String, String> { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 92cfe66c3f6..1165d76cf2e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -322,63 +322,173 @@ public void testMaybePunctuateStreamTime() { task.initializeTopology(); task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 32, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 60, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); task.addRecords(partition2, records( new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 61, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); - assertEquals(5, task.numBuffered()); + assertEquals(8, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(7, task.numBuffered()); + assertEquals(2, source1.numReceived); + assertEquals(0, source2.numReceived); + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); - assertEquals(4, task.numBuffered()); - assertEquals(1, source1.numReceived); + assertEquals(6, task.numBuffered()); + assertEquals(2, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertTrue(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(5, task.numBuffered()); + assertEquals(3, source1.numReceived); assertEquals(1, source2.numReceived); + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(4, task.numBuffered()); + assertEquals(3, source1.numReceived); + assertEquals(2, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(3, task.numBuffered()); + assertEquals(4, source1.numReceived); + assertEquals(2, source2.numReceived); + + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(2, task.numBuffered()); + assertEquals(4, source1.numReceived); + assertEquals(3, source2.numReceived); + + assertTrue(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(1, task.numBuffered()); + assertEquals(5, source1.numReceived); + assertEquals(3, source2.numReceived); + + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(0, task.numBuffered()); + assertEquals(5, source1.numReceived); + assertEquals(4, source2.numReceived); + + assertFalse(task.process()); + assertFalse(task.maybePunctuateStreamTime()); + + processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldPunctuateOnceStreamTimeAfterGap() { + task = createStatelessTask(false); + task.initializeStateStores(); + task.initializeTopology(); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 142, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 155, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 160, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 145, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 159, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 161, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20 + + assertTrue(task.process()); + assertEquals(7, task.numBuffered()); + assertEquals(1, source1.numReceived); + assertEquals(0, source2.numReceived); + + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(6, task.numBuffered()); + assertEquals(1, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142 + + // only one punctuation after 100ms gap + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(5, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); - assertEquals(2, task.numBuffered()); + assertEquals(4, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(2, source2.numReceived); - assertTrue(task.maybePunctuateStreamTime()); + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155 assertTrue(task.process()); - assertEquals(1, task.numBuffered()); + assertEquals(3, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); - assertEquals(0, task.numBuffered()); + assertEquals(2, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(3, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on the initial punctuation + + assertTrue(task.process()); + assertEquals(1, task.numBuffered()); + assertEquals(4, source1.numReceived); + assertEquals(3, source2.numReceived); + + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + assertEquals(0, task.numBuffered()); + assertEquals(4, source1.numReceived); + assertEquals(4, source2.numReceived); + assertFalse(task.process()); assertFalse(task.maybePunctuateStreamTime()); - processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 30L, 40L); + processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); } @SuppressWarnings("unchecked") @@ -425,9 +535,14 @@ public void shouldPunctuateSystemTimeWhenIntervalElapsed() { assertTrue(task.maybePunctuateSystemTime()); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); - time.sleep(10); + time.sleep(9); + assertFalse(task.maybePunctuateSystemTime()); + time.sleep(1); + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(20); assertTrue(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50); } @Test @@ -435,11 +550,36 @@ public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() { task = createStatelessTask(false); task.initializeStateStores(); task.initializeTopology(); - long now = time.milliseconds(); - assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate + assertFalse(task.maybePunctuateSystemTime()); time.sleep(9); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME); + } + + @Test + public void shouldPunctuateOnceSystemTimeAfterGap() { + task = createStatelessTask(false); + task.initializeStateStores(); + task.initializeTopology(); + long now = time.milliseconds(); + time.sleep(100); + assertTrue(task.maybePunctuateSystemTime()); + assertFalse(task.maybePunctuateSystemTime()); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(12); + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(7); + assertFalse(task.maybePunctuateSystemTime()); + time.sleep(1); // punctuate at now + 130 + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(105); // punctuate at now + 235 + assertTrue(task.maybePunctuateSystemTime()); + assertFalse(task.maybePunctuateSystemTime()); + time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation + assertTrue(task.maybePunctuateSystemTime()); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); } @Test diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 4100f186566..5073efdb64c 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -593,36 +593,35 @@ public void shouldPunctuateOnStreamsTime() { testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + expectedPunctuations.add(51L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 51L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(52L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 52L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + expectedPunctuations.add(61L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 61L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(65L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 65L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + expectedPunctuations.add(71L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 71L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(72L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 72L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(95L); expectedPunctuations.add(95L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 95L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + expectedPunctuations.add(101L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 101L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(102L); testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 102L)); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); } @@ -637,21 +636,23 @@ public void shouldPunctuateOnWallClockTime() { final List<Long> expectedPunctuations = new LinkedList<>(); - expectedPunctuations.add(5L); testDriver.advanceWallClockTime(5L); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + expectedPunctuations.add(14L); testDriver.advanceWallClockTime(9L); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(15L); testDriver.advanceWallClockTime(1L); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); - expectedPunctuations.add(35L); expectedPunctuations.add(35L); testDriver.advanceWallClockTime(20L); assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); + + expectedPunctuations.add(40L); + testDriver.advanceWallClockTime(5L); + assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations)); } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > punctuate with WALL_CLOCK_TIME triggered immediately > ---------------------------------------------------- > > Key: KAFKA-6323 > URL: https://issues.apache.org/jira/browse/KAFKA-6323 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Frederic Arno > Assignee: Frederic Arno > Priority: Major > Fix For: 1.1.0, 1.0.1 > > > When working on a custom Processor from which I am scheduling a punctuation > using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I > set, a call to my Punctuator is always triggered immediately. > Having a quick look at kafka-streams' code, I could find that all > PunctuationSchedule's timestamps are matched against the current time in > order to decide whether or not to trigger the punctuator > (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). > However, I've only seen code that initializes PunctuationSchedule's timestamp > to 0, which I guess is what is causing an immediate punctuation. > At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's > timestamp be initialized to current time + interval? -- This message was sent by Atlassian JIRA (v7.6.3#76005)