Repository: kafka
Updated Branches:
  refs/heads/trunk 260d487cd -> 5dabca025


KAFKA-4758; Connect missing checks for NO_TIMESTAMP

Author: rnpridgeon <ryan.n.pridg...@gmail.com>

Reviewers: Ewen Cheslack-Postava <m...@ewencp.org>, Jason Gustafson 
<ja...@confluent.io>

Closes #2533 from rnpridgeon/no_timestamp


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dabca02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dabca02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dabca02

Branch: refs/heads/trunk
Commit: 5dabca02594e4b71fb6ff275283a3851f1c62d60
Parents: 260d487
Author: rnpridgeon <ryan.n.pridg...@gmail.com>
Authored: Sun Feb 12 16:01:20 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sun Feb 12 16:06:33 2017 -0800

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   |  4 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  4 +-
 .../apache/kafka/connect/util/ConnectUtils.java | 31 +++++++++++++++
 .../connect/runtime/WorkerSinkTaskTest.java     |  4 +-
 .../connect/runtime/WorkerSourceTaskTest.java   | 41 ++++++++++++++++++++
 5 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5cc70d5..6de97b2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -36,6 +35,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
@@ -403,7 +403,7 @@ class WorkerSinkTask extends WorkerTask {
                     keyAndSchema.schema(), keyAndSchema.value(),
                     valueAndSchema.schema(), valueAndSchema.value(),
                     msg.offset(),
-                    (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? 
null : msg.timestamp(),
+                    ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
                     msg.timestampType());
             record = transformationChain.apply(record);
             if (record != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index adf1582..e86924b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,7 +196,8 @@ class WorkerSourceTask extends WorkerTask {
 
             byte[] key = keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
-            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(), record.timestamp(), 
key, value);
+            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(),
+                    ConnectUtils.checkAndConvertTimestamp(record.timestamp()), 
key, value);
             log.trace("Appending record with key {}, value {}", record.key(), 
record.value());
             // We need this queued first since the callback could happen 
immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- 
we always save the previously attempted

http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
new file mode 100644
index 0000000..9bb1468
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.record.InvalidRecordException;
+import org.apache.kafka.common.record.Record;
+
+public final class ConnectUtils {
+    public static Long checkAndConvertTimestamp(Long timestamp) {
+        if (timestamp == null || timestamp >= 0)
+            return timestamp;
+        else if (timestamp == Record.NO_TIMESTAMP)
+            return null;
+        else
+            throw new InvalidRecordException(String.format("Invalid record 
timestamp %d", timestamp));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index f93e385..e777902 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -534,7 +534,7 @@ public class WorkerSinkTaskTest {
     @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
-        expectConsumerPoll(1, Record.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE);
+        expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME);
         expectConversionAndTransformation(1);
 
         Capture<Collection<SinkRecord>> records = 
EasyMock.newCapture(CaptureType.ALL);
@@ -551,7 +551,7 @@ public class WorkerSinkTaskTest {
 
         // we expect null for missing timestamp, the sentinel value of 
Record.NO_TIMESTAMP is Kafka's API
         assertEquals(null, record.timestamp());
-        assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
 
         PowerMock.verifyAll();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8c07ba1..789d2c7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -438,6 +439,46 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = InvalidRecordException.class)
+    public void testSendRecordsCorruptTimestamp() throws Exception {
+        final Long timestamp = -3L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsNoTimestamp() throws Exception {
+        final Long timestamp = -1L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testSendRecordsRetries() throws Exception {
         createWorkerTask();

Reply via email to