Repository: kafka Updated Branches: refs/heads/0.10.0 0cc997ae7 -> 4e557f8ef
KAFKA-3434; add old constructor to ConsumerRecord Author: Jason Gustafson <[email protected]> Reviewers: Grant Henke <[email protected]>, Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1123 from hachikuji/KAFKA-3434 (cherry picked from commit cb78223bf90aca4f75699f36c1a82db7661a62f3) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e557f8e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e557f8e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e557f8e Branch: refs/heads/0.10.0 Commit: 4e557f8ef60d46a8870704655c9a35092f74d125 Parents: 0cc997a Author: Jason Gustafson <[email protected]> Authored: Wed Mar 23 22:36:19 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Mar 23 22:36:53 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerRecord.java | 29 ++++++++++++ .../clients/consumer/internals/Fetcher.java | 4 +- .../clients/consumer/ConsumerRecordTest.java | 48 ++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4e557f8e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 4165534..586156e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; /** @@ -19,6 +20,10 @@ import org.apache.kafka.common.record.TimestampType; * record is being received and an offset that points to the record in a Kafka partition. */ public final class ConsumerRecord<K, V> { + public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP; + public static final int NULL_SIZE = -1; + public static final int NULL_CHECKSUM = -1; + private final String topic; private final int partition; private final long offset; @@ -31,6 +36,27 @@ public final class ConsumerRecord<K, V> { private final V value; /** + * Creates a record to be received from a specified topic and partition (provided for + * compatibility with Kafka 0.9 before the message format supported timestamps and before + * serialized metadata were exposed). + * + * @param topic The topic this record is received from + * @param partition The partition of the topic this record is received from + * @param offset The offset of this record in the corresponding Kafka partition + * @param key The key of the record, if one exists (null is allowed) + * @param value The record contents + */ + public ConsumerRecord(String topic, + int partition, + long offset, + K key, + V value) { + this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value); + } + + + /** * Creates a record to be received from a specified topic and partition * * @param topic The topic this record is received from @@ -38,6 +64,9 @@ public final class ConsumerRecord<K, V> { * @param offset The offset of this record in the corresponding Kafka partition * @param timestamp The timestamp of the record. * @param timestampType The timestamp type + * @param checksum The checksum (CRC32) of the full record + * @param serializedKeySize The length of the serialized key + * @param serializedValueSize The length of the serialized value * @param key The key of the record, if one exists (null is allowed) * @param value The record contents */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4e557f8e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 802a2f0..9a26551 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -653,8 +653,8 @@ public class Fetcher<K, V> { return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, logEntry.record().checksum(), - keyByteArray == null ? -1 : keyByteArray.length, - valueByteArray == null ? -1 : valueByteArray.length, + keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, + valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value); } catch (KafkaException e) { throw e; http://git-wip-us.apache.org/repos/asf/kafka/blob/4e557f8e/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java new file mode 100644 index 0000000..d1d3b24 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ConsumerRecordTest { + + @Test + public void testOldConstructor() { + String topic = "topic"; + int partition = 0; + long offset = 23; + String key = "key"; + String value = "value"; + + ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value); + assertEquals(topic, record.topic()); + assertEquals(partition, record.partition()); + assertEquals(offset, record.offset()); + assertEquals(key, record.key()); + assertEquals(value, record.value()); + assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType()); + assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp()); + assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum()); + assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize()); + assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize()); + } + + +}
