Repository: kafka Updated Branches: refs/heads/0.10.1 baae90a1e -> f3e5e6d65
HOTFIX: improve error message on invalid input record timestamp Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #2079 from mjsax/hotfixTSExtractor-0.10.1 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3e5e6d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3e5e6d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3e5e6d6 Branch: refs/heads/0.10.1 Commit: f3e5e6d65d00204a4f15f60b1f534a3b03048e71 Parents: baae90a Author: Matthias J. Sax <matth...@confluent.io> Authored: Tue Nov 1 10:21:56 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Nov 1 10:21:56 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/internals/SinkNode.java | 12 +- .../processor/internals/SinkNodeTest.java | 145 +++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 2b5692d..c330ea9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.ChangedSerializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -69,7 +70,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @Override public void process(final K key, final V value) { RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); - collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner); + + final long timestamp = context.timestamp(); + if (timestamp < 0) { + throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " + + "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + + "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."); + } + + collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java new file mode 100644 index 0000000..3b41517 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class SinkNodeTest { + + @Test(expected = StreamsException.class) + @SuppressWarnings("unchecked") + public void invalidInputRecordTimestampTest() { + final Serializer anySerializer = Serdes.Bytes().serializer(); + + final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null); + sink.init(new MockProcessorContext()); + + sink.process(null, null); + } + + private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier { + private final long invalidTimestamp = -1; + + @Override + public String applicationId() { + return null; + } + + @Override + public TaskId taskId() { + return null; + } + + @Override + public Serde<?> keySerde() { + return null; + } + + @Override + public Serde<?> valueSerde() { + return null; + } + + @Override + public File stateDir() { + return null; + } + + @Override + public StreamsMetrics metrics() { + return null; + } + + @Override + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { + } + + @Override + public StateStore getStateStore(String name) { + return null; + } + + @Override + public void schedule(long interval) { + } + + @Override + public <K, V> void forward(K key, V value) { + } + + @Override + public <K, V> void forward(K key, V value, int childIndex) { + } + + @Override + public <K, V> void forward(K key, V value, String childName) { + } + + @Override + public void commit() { + } + + @Override + public String topic() { + return null; + } + + @Override + public int partition() { + return 0; + } + + @Override + public long offset() { + return 0; + } + + @Override + public long timestamp() { + return invalidTimestamp; + } + + @Override + public Map<String, Object> appConfigs() { + return null; + } + + @Override + public Map<String, Object> appConfigsWithPrefix(String prefix) { + return null; + } + + @Override + public RecordCollector recordCollector() { + return null; + } + } + +} \ No newline at end of file