This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 772d764 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) 772d764 is described below commit 772d7647fe3a7fe3c3ff65a0348f3aa61664ef43 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Thu Jul 18 13:54:46 2019 -0700 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com>, Matthias J. Sax <mj...@apache.org> --- .../examples/pageview/JsonTimestampExtractor.java | 2 +- .../processor/ExtractRecordMetadataTimestamp.java | 10 +- .../streams/processor/FailOnInvalidTimestamp.java | 4 +- .../processor/LogAndSkipOnInvalidTimestamp.java | 4 +- .../streams/processor/TimestampExtractor.java | 4 +- .../UsePreviousTimeOnInvalidTimestamp.java | 10 +- .../processor/WallclockTimestampExtractor.java | 4 +- .../processor/internals/PartitionGroup.java | 36 ++++--- .../streams/processor/internals/RecordQueue.java | 23 ++++- .../streams/processor/internals/StreamTask.java | 10 +- .../apache/kafka/streams/StreamsConfigTest.java | 2 +- .../processor/internals/PartitionGroupTest.java | 8 +- .../processor/internals/RecordQueueTest.java | 106 ++++++++++++++++++--- .../apache/kafka/test/MockTimestampExtractor.java | 2 +- 14 files changed, 166 insertions(+), 59 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 4f6257a..d760183 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 79c8dd3..3c7428a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override - public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { - return onInvalidTimestamp(record, timestamp, previousTimestamp); + return onInvalidTimestamp(record, timestamp, partitionTime); } return timestamp; @@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, final long recordTimestamp, - final long previousTimestamp); + final long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index 87cb0de..40d3e0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return nothing; always raises an exception * @throws StreamsException on every invocation */ @Override public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { final String message = "Input record " + record + " has invalid (negative) timestamp. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java index 0561e61..b759e5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java @@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the originally extracted timestamp of the record */ @Override public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, final long recordTimestamp, - final long previousTimestamp) { + final long partitionTime) { log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); return recordTimestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 0780dc0..1e6d6cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -46,8 +46,8 @@ public interface TimestampExtractor { * * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the timestamp of the record */ - long extract(ConsumerRecord<Object, Object> record, long previousTimestamp); + long extract(ConsumerRecord<Object, Object> record, long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index dd952cc..89e2fd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the provided latest extracted valid timestamp as new timestamp for the record * @throws StreamsException if latest extracted valid timestamp is unknown */ @Override public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { - if (previousTimestamp < 0) { + if (partitionTime < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record - + " because latest extracted valid timestamp is unknown."); + + " because partition time is unknown."); } - return previousTimestamp; + return partitionTime; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index ad3b3bc..baa1cb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { return System.currentTimeMillis(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 1fdd454..1311354 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -31,10 +31,24 @@ import java.util.Set; * group, a.k.a. the stream time of the associated task. It is defined as the maximum timestamp of * all the records having been retrieved for processing from this PartitionGroup so far. * - * We decide from which partition to retrieve the next record to process based on partitions' timestamps. - * The timestamp of a specific partition is initialized as UNKNOWN (-1), and is updated with the head record's timestamp - * if it is smaller (i.e. it should be monotonically increasing); when the partition's buffer becomes empty and there is - * no head record, the partition's timestamp will not be updated any more. + * In other words, it represents the "same" partition over multiple co-partitioned topics, and it is used + * to buffer records from that partition in each of the contained topic-partitions. + * Each StreamTask has exactly one PartitionGroup. + * + * PartitionGroup implements the algorithm that determines in what order buffered records are selected for processing. + * + * Specifically, when polled, it returns the record from the topic-partition with the lowest stream-time. + * Stream-time for a topic-partition is defined as the highest timestamp + * yet observed at the head of that topic-partition. + * + * PartitionGroup also maintains a stream-time for the group as a whole. + * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. + * Note however that any computation that depends on stream-time should track it on a per-operator basis to obtain an + * accurate view of the local time as seen by that processor. + * + * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. + * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing + * (i.e., it increases or stays the same over time). */ public class PartitionGroup { @@ -64,7 +78,7 @@ public class PartitionGroup { } PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) { - nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); + nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; @@ -97,7 +111,7 @@ public class PartitionGroup { nonEmptyQueuesByTime.offer(queue); } - // always update the stream time to the record's timestamp yet to be processed if it is larger + // always update the stream-time to the record's timestamp yet to be processed if it is larger if (record.timestamp > streamTime) { streamTime = record.timestamp; recordLatenessSensor.record(0); @@ -128,8 +142,8 @@ public class PartitionGroup { nonEmptyQueuesByTime.offer(recordQueue); // if all partitions now are non-empty, set the flag - // we do not need to update the stream time here since this task will definitely be - // processed next, and hence the stream time will be updated when we retrieved records by then + // we do not need to update the stream-time here since this task will definitely be + // processed next, and hence the stream-time will be updated when we retrieved records by then if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) { allBuffered = true; } @@ -145,10 +159,9 @@ public class PartitionGroup { } /** - * Return the timestamp of this partition group as the smallest - * partition timestamp among all its partitions + * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions */ - public long timestamp() { + public long streamTime() { return streamTime; } @@ -180,6 +193,7 @@ public class PartitionGroup { public void clear() { nonEmptyQueuesByTime.clear(); + streamTime = RecordQueue.UNKNOWN; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 7f3c08d..c182313 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -31,8 +31,8 @@ import java.util.ArrayDeque; /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the - * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition - * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. + * partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the + * timestamp extractor. */ public class RecordQueue { @@ -47,6 +47,7 @@ public class RecordQueue { private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue; private StampedRecord headRecord = null; + private long partitionTime = RecordQueue.UNKNOWN; RecordQueue(final TopicPartition partition, final SourceNode source, @@ -136,20 +137,30 @@ public class RecordQueue { } /** - * Returns the tracked partition timestamp + * Returns the head record's timestamp * * @return timestamp */ - public long timestamp() { + public long headRecordTimestamp() { return headRecord == null ? UNKNOWN : headRecord.timestamp; } /** + * Returns the tracked partition time + * + * @return partition time + */ + long partitionTime() { + return partitionTime; + } + + /** * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements */ public void clear() { fifoQueue.clear(); headRecord = null; + partitionTime = RecordQueue.UNKNOWN; } private void updateHead() { @@ -164,7 +175,7 @@ public class RecordQueue { final long timestamp; try { - timestamp = timestampExtractor.extract(deserialized, timestamp()); + timestamp = timestampExtractor.extract(deserialized, partitionTime); } catch (final StreamsException internalFatalExtractorException) { throw internalFatalExtractorException; } catch (final Exception fatalUserException) { @@ -185,6 +196,8 @@ public class RecordQueue { } headRecord = new StampedRecord(deserialized, timestamp); + + partitionTime = Math.max(partitionTime, timestamp); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index e78cd36..5ee00e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -788,14 +788,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateStreamTime() { - final long timestamp = partitionGroup.timestamp(); + final long streamTime = partitionGroup.streamTime(); // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == RecordQueue.UNKNOWN) { + if (streamTime == RecordQueue.UNKNOWN) { return false; } else { - final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this); if (punctuated) { commitNeeded = true; @@ -813,9 +813,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateSystemTime() { - final long timestamp = time.milliseconds(); + final long systemTime = time.milliseconds(); - final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this); if (punctuated) { commitNeeded = true; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 38563be..138f901 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -658,7 +658,7 @@ public class StreamsConfigTest { public static class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { return 0; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 6b95bdf..cfc814f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -107,7 +107,7 @@ public class PartitionGroupTest { // st: -1 since no records was being processed yet verifyBuffered(6, 3, 3); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -143,7 +143,7 @@ public class PartitionGroupTest { // 2:[4, 6] // st: 2 (just adding records shouldn't change it) verifyBuffered(6, 4, 2); - assertEquals(2L, group.timestamp()); + assertEquals(2L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should be advanced @@ -221,7 +221,7 @@ public class PartitionGroupTest { group.addRawRecords(partition1, list1); verifyBuffered(3, 3, 0); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -258,7 +258,7 @@ public class PartitionGroupTest { private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) { assertEquals(recordTime, record.timestamp); - assertEquals(streamTime, group.timestamp()); + assertEquals(streamTime, group.streamTime()); } private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index c16cb2a..6dadb49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -101,7 +101,7 @@ public class RecordQueueTest { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( @@ -112,17 +112,17 @@ public class RecordQueueTest { queue.addRawRecords(list1); assertEquals(3, queue.size()); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -134,24 +134,24 @@ public class RecordQueueTest { queue.addRawRecords(list2); assertEquals(4, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll the rest records assertEquals(4L, queue.poll().timestamp); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); assertEquals(1L, queue.poll().timestamp); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three more records with 4, 5, 6 final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( @@ -162,24 +162,51 @@ public class RecordQueueTest { queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll one record again, the timestamp should advance now assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(5L, queue.timestamp()); + assertEquals(5L, queue.headRecordTimestamp()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); + } + + @Test + public void shouldTrackPartitionTimeAsMaxSeenTimestamp() { + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + assertEquals(queue.partitionTime(), RecordQueue.UNKNOWN); + + queue.addRawRecords(list1); + + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 3L); } @Test(expected = StreamsException.class) @@ -253,4 +280,57 @@ public class RecordQueueTest { assertEquals(0, queue.size()); } + + @Test + public void shouldPassPartitionTimeToTimestampExtractor() { + + final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor(); + final RecordQueue queue = new RecordQueue( + new TopicPartition(topics[0], 1), + mockSourceNodeWithMetrics, + timestampExtractor, + new LogAndFailExceptionHandler(), + context, + new LogContext()); + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.addRawRecords(list1); + + // no (known) timestamp has yet been passed to the timestamp extractor + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(3L, timestampExtractor.partitionTime); + + } + + class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { + private long partitionTime = RecordQueue.UNKNOWN; + + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { + if (partitionTime < this.partitionTime) { + throw new IllegalStateException("Partition time should not decrease"); + } + this.partitionTime = partitionTime; + return record.offset(); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 1701164..f437772 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) { return record.offset(); } }