Repository: kafka Updated Branches: refs/heads/trunk 260d487cd -> 5dabca025
KAFKA-4758; Connect missing checks for NO_TIMESTAMP Author: rnpridgeon <ryan.n.pridg...@gmail.com> Reviewers: Ewen Cheslack-Postava <m...@ewencp.org>, Jason Gustafson <ja...@confluent.io> Closes #2533 from rnpridgeon/no_timestamp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dabca02 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dabca02 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dabca02 Branch: refs/heads/trunk Commit: 5dabca02594e4b71fb6ff275283a3851f1c62d60 Parents: 260d487 Author: rnpridgeon <ryan.n.pridg...@gmail.com> Authored: Sun Feb 12 16:01:20 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Sun Feb 12 16:06:33 2017 -0800 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSinkTask.java | 4 +- .../kafka/connect/runtime/WorkerSourceTask.java | 4 +- .../apache/kafka/connect/util/ConnectUtils.java | 31 +++++++++++++++ .../connect/runtime/WorkerSinkTaskTest.java | 4 +- .../connect/runtime/WorkerSourceTaskTest.java | 41 ++++++++++++++++++++ 5 files changed, 79 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 5cc70d5..6de97b2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.SchemaAndValue; @@ -36,6 +35,7 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; import org.slf4j.Logger; @@ -403,7 +403,7 @@ class WorkerSinkTask extends WorkerTask { keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), - (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? null : msg.timestamp(), + ConnectUtils.checkAndConvertTimestamp(msg.timestamp()), msg.timestampType()); record = transformationChain.apply(record); if (record != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index adf1582..e86924b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,7 +196,8 @@ class WorkerSourceTask extends WorkerTask { byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); - final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), record.timestamp(), key, value); + final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), + ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value); log.trace("Appending record with key {}, value {}", record.key(), record.value()); // We need this queued first since the callback could happen immediately (even synchronously in some cases). // Because of this we need to be careful about handling retries -- we always save the previously attempted http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java new file mode 100644 index 0000000..9bb1468 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.record.Record; + +public final class ConnectUtils { + public static Long checkAndConvertTimestamp(Long timestamp) { + if (timestamp == null || timestamp >= 0) + return timestamp; + else if (timestamp == Record.NO_TIMESTAMP) + return null; + else + throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index f93e385..e777902 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -534,7 +534,7 @@ public class WorkerSinkTaskTest { @Test public void testMissingTimestampPropagation() throws Exception { expectInitializeTask(); - expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE); + expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME); expectConversionAndTransformation(1); Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL); @@ -551,7 +551,7 @@ public class WorkerSinkTaskTest { // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API assertEquals(null, record.timestamp()); - assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); PowerMock.verifyAll(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dabca02/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 8c07ba1..789d2c7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -438,6 +439,46 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.verifyAll(); } + @Test(expected = InvalidRecordException.class) + public void testSendRecordsCorruptTimestamp() throws Exception { + final Long timestamp = -3L; + createWorkerTask(); + + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(null, sent.getValue().timestamp()); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsNoTimestamp() throws Exception { + final Long timestamp = -1L; + createWorkerTask(); + + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(null, sent.getValue().timestamp()); + + PowerMock.verifyAll(); + } + @Test public void testSendRecordsRetries() throws Exception { createWorkerTask();