Repository: kafka Updated Branches: refs/heads/trunk d54616bc3 -> 53fd22a76
KAFKA-3716; Validate all timestamps are not negative Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Eno Thereska, Ismael Juma Closes #1393 from guozhangwang/K3716-check-non-negative-timestamps Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53fd22a7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53fd22a7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53fd22a7 Branch: refs/heads/trunk Commit: 53fd22a76613b309b7941a5b0c64f17523b39202 Parents: d54616b Author: Guozhang Wang <wangg...@gmail.com> Authored: Tue May 17 11:25:49 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue May 17 11:25:49 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/kafka/streams/kstream/Windows.java | 6 +----- .../apache/kafka/streams/processor/internals/RecordQueue.java | 5 +++++ .../apache/kafka/streams/processor/internals/StreamTask.java | 7 ++++++- 3 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 06cacb4..c64a80f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -17,9 +17,7 @@ package org.apache.kafka.streams.kstream; - import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * The window specification interface that can be extended for windowing operation in joins and aggregations. @@ -32,8 +30,6 @@ public abstract class Windows<W extends Window> { private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day - private static final AtomicInteger NAME_INDEX = new AtomicInteger(0); - protected String name; private long maintainDurationMs; @@ -86,7 +82,7 @@ public abstract class Windows<W extends Window> { } /** - * Creates all windows that contain the provided timestamp. + * Creates all windows that contain the provided timestamp, indexed by non-negative window start timestamps. * * @param timestamp the timestamp window should get created for * @return a map of {@code windowStartTimestamp -> Window} entries http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 6911a45..7e5baf3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TimestampExtractor; import java.util.ArrayDeque; @@ -84,6 +85,10 @@ public class RecordQueue { rawRecord.serializedValueSize(), key, value); long timestamp = timestampExtractor.extract(record); + // validate that timestamp must be non-negative + if (timestamp < 0) + throw new StreamsException("Extracted timestamp value is negative, which is not allowed."); + StampedRecord stampedRecord = new StampedRecord(record, timestamp); fifoQueue.addLast(stampedRecord); http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d9efb6d..e7e24fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -209,7 +209,12 @@ public class StreamTask extends AbstractTask implements Punctuator { public boolean maybePunctuate() { long timestamp = partitionGroup.timestamp(); - return punctuationQueue.mayPunctuate(timestamp, this); + // if the timestamp is not known yet, meaning there is not enough data accumulated + // to reason stream partition time, then skip. + if (timestamp == TimestampTracker.NOT_KNOWN) + return false; + else + return punctuationQueue.mayPunctuate(timestamp, this); } /**