Repository: kafka
Updated Branches:
  refs/heads/0.10.1 baae90a1e -> f3e5e6d65


HOTFIX: improve error message on invalid input record timestamp

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Guozhang Wang <wangg...@gmail.com>

Closes #2079 from mjsax/hotfixTSExtractor-0.10.1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3e5e6d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3e5e6d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3e5e6d6

Branch: refs/heads/0.10.1
Commit: f3e5e6d65d00204a4f15f60b1f534a3b03048e71
Parents: baae90a
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Tue Nov 1 10:21:56 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Nov 1 10:21:56 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/SinkNode.java   |  12 +-
 .../processor/internals/SinkNodeTest.java       | 145 +++++++++++++++++++
 2 files changed, 156 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..c330ea9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -69,7 +70,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     @Override
     public void process(final K key, final V value) {
         RecordCollector collector = ((RecordCollector.Supplier) 
context).recordCollector();
-        collector.send(new ProducerRecord<>(topic, null, context.timestamp(), 
key, value), keySerializer, valSerializer, partitioner);
+
+        final long timestamp = context.timestamp();
+        if (timestamp < 0) {
+            throw new StreamsException("A record consumed from an input topic 
has invalid (negative) timestamp, " +
+                "possibly because a pre-0.10 producer client was used to write 
this record to Kafka without embedding a timestamp, " +
+                "or because the input topic was created before upgrading the 
Kafka cluster to 0.10+. " +
+                "Use a different TimestampExtractor to process this data.");
+        }
+
+        collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, 
value), keySerializer, valSerializer, partitioner);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
new file mode 100644
index 0000000..3b41517
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class SinkNodeTest {
+
+    @Test(expected = StreamsException.class)
+    @SuppressWarnings("unchecked")
+    public void invalidInputRecordTimestampTest() {
+        final Serializer anySerializer = Serdes.Bytes().serializer();
+
+        final SinkNode sink = new SinkNode<>("name", "output-topic", 
anySerializer, anySerializer, null);
+        sink.init(new MockProcessorContext());
+
+        sink.process(null, null);
+    }
+
+    private final class MockProcessorContext implements ProcessorContext, 
RecordCollector.Supplier {
+        private final long invalidTimestamp = -1;
+
+        @Override
+        public String applicationId() {
+            return null;
+        }
+
+        @Override
+        public TaskId taskId() {
+            return null;
+        }
+
+        @Override
+        public Serde<?> keySerde() {
+            return null;
+        }
+
+        @Override
+        public Serde<?> valueSerde() {
+            return null;
+        }
+
+        @Override
+        public File stateDir() {
+            return null;
+        }
+
+        @Override
+        public StreamsMetrics metrics() {
+            return null;
+        }
+
+        @Override
+        public void register(StateStore store, boolean loggingEnabled, 
StateRestoreCallback stateRestoreCallback) {
+        }
+
+        @Override
+        public StateStore getStateStore(String name) {
+            return null;
+        }
+
+        @Override
+        public void schedule(long interval) {
+        }
+
+        @Override
+        public <K, V> void forward(K key, V value) {
+        }
+
+        @Override
+        public <K, V> void forward(K key, V value, int childIndex) {
+        }
+
+        @Override
+        public <K, V> void forward(K key, V value, String childName) {
+        }
+
+        @Override
+        public void commit() {
+        }
+
+        @Override
+        public String topic() {
+            return null;
+        }
+
+        @Override
+        public int partition() {
+            return 0;
+        }
+
+        @Override
+        public long offset() {
+            return 0;
+        }
+
+        @Override
+        public long timestamp() {
+            return invalidTimestamp;
+        }
+
+        @Override
+        public Map<String, Object> appConfigs() {
+            return null;
+        }
+
+        @Override
+        public Map<String, Object> appConfigsWithPrefix(String prefix) {
+            return null;
+        }
+
+        @Override
+        public RecordCollector recordCollector() {
+            return null;
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to