Repository: kafka Updated Branches: refs/heads/trunk 23f239b06 -> 002b377da
KAFKA-3196; Added checksum and size to RecordMetadata and ConsumerRecord This is the second (remaining) part of KIP-42. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors Author: Anna Povzner <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>, Jun Rao <[email protected]> Closes #951 from apovzner/kafka-3196 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/002b377d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/002b377d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/002b377d Branch: refs/heads/trunk Commit: 002b377dad9c956cd0ae0597981f29698883b6d5 Parents: 23f239b Author: Anna Povzner <[email protected]> Authored: Wed Mar 2 09:40:34 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Mar 2 09:40:34 2016 -0800 ---------------------------------------------------------------------- .../clients/consumer/ConsumerInterceptor.java | 5 ++- .../kafka/clients/consumer/ConsumerRecord.java | 37 ++++++++++++++++++- .../clients/consumer/internals/Fetcher.java | 14 +++++--- .../kafka/clients/producer/MockProducer.java | 6 ++-- .../kafka/clients/producer/RecordMetadata.java | 38 ++++++++++++++++++-- .../internals/FutureRecordMetadata.java | 24 +++++++++++-- .../clients/producer/internals/RecordBatch.java | 12 +++++-- .../apache/kafka/common/record/Compressor.java | 16 ++++++--- .../kafka/common/record/MemoryRecords.java | 6 ++-- .../clients/consumer/MockConsumerTest.java | 4 +-- .../internals/ConsumerInterceptorsTest.java | 8 +++-- .../kafka/clients/producer/RecordSendTest.java | 9 +++-- .../internals/ProducerInterceptorsTest.java | 2 +- .../kafka/test/MockConsumerInterceptor.java | 6 ++-- .../connect/runtime/WorkerSinkTaskTest.java | 2 +- .../runtime/WorkerSinkTaskThreadedTest.java | 4 +-- .../connect/runtime/WorkerSourceTaskTest.java | 3 +- .../connect/storage/KafkaConfigStorageTest.java | 24 ++++++------- .../storage/KafkaOffsetBackingStoreTest.java | 16 ++++----- .../storage/KafkaStatusBackingStoreTest.java | 2 +- .../kafka/connect/util/KafkaBasedLogTest.java | 16 ++++----- .../scala/kafka/tools/ConsoleConsumer.scala | 4 +-- .../scala/kafka/tools/SimpleConsumerShell.scala | 4 ++- .../kafka/api/BaseConsumerTest.scala | 3 ++ .../kafka/api/BaseProducerSendTest.scala | 7 ++++ .../processor/internals/RecordQueue.java | 7 ++-- .../processor/internals/PartitionGroupTest.java | 12 +++---- .../internals/ProcessorStateManagerTest.java | 6 ++-- .../processor/internals/RecordQueueTest.java | 18 +++++----- .../processor/internals/StandbyTaskTest.java | 18 +++++----- .../processor/internals/StreamTaskTest.java | 30 ++++++++-------- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 32 files changed, 251 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java index 5c13a41..70ea444 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java @@ -38,7 +38,10 @@ public interface ConsumerInterceptor<K, V> extends Configurable { /** * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} * <p> - * This method is allowed to modify consumer records, in which case the new records will be returned. + * This method is allowed to modify consumer records, in which case the new records will be + * returned. There is no limitation on number of records that could be returned from this + * method. I.e., the interceptor can filter the records or generate new records. + * <p> * Any exception thrown by this method will be caught by the caller, logged, but not propagated to the client. * <p> * Since the consumer may run multiple interceptors, a particular interceptor's onConsume() callback will be called http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 42e0a90..4165534 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -24,6 +24,9 @@ public final class ConsumerRecord<K, V> { private final long offset; private final long timestamp; private final TimestampType timestampType; + private final long checksum; + private final int serializedKeySize; + private final int serializedValueSize; private final K key; private final V value; @@ -43,6 +46,9 @@ public final class ConsumerRecord<K, V> { long offset, long timestamp, TimestampType timestampType, + long checksum, + int serializedKeySize, + int serializedValueSize, K key, V value) { if (topic == null) @@ -52,6 +58,9 @@ public final class ConsumerRecord<K, V> { this.offset = offset; this.timestamp = timestamp; this.timestampType = timestampType; + this.checksum = checksum; + this.serializedKeySize = serializedKeySize; + this.serializedValueSize = serializedValueSize; this.key = key; this.value = value; } @@ -105,9 +114,35 @@ public final class ConsumerRecord<K, V> { return timestampType; } + /** + * The checksum (CRC32) of the record. + */ + public long checksum() { + return this.checksum; + } + + /** + * The size of the serialized, uncompressed key in bytes. If key is null, the returned size + * is -1. + */ + public int serializedKeySize() { + return this.serializedKeySize; + } + + /** + * The size of the serialized, uncompressed value in bytes. If value is null, the + * returned size is -1. + */ + public int serializedValueSize() { + return this.serializedValueSize; + } + @Override public String toString() { return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset() - + ", " + timestampType + " = " + timestamp + ", key = " + key + ", value = " + value + ")"; + + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum + + ", serialized key size = " + serializedKeySize + + ", serialized value size = " + serializedValueSize + + ", key = " + key + ", value = " + value + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 5d92a76..e2a5548 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -644,11 +644,17 @@ public class Fetcher<K, V> { long timestamp = logEntry.record().timestamp(); TimestampType timestampType = logEntry.record().timestampType(); ByteBuffer keyBytes = logEntry.record().key(); - K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes)); + byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); + K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray); ByteBuffer valueBytes = logEntry.record().value(); - V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes)); - - return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, key, value); + byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); + V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray); + + return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, + timestamp, timestampType, logEntry.record().checksum(), + keyByteArray == null ? -1 : keyByteArray.length, + valueByteArray == null ? -1 : valueByteArray.length, + key, value); } catch (KafkaException e) { throw e; } catch (RuntimeException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 5f97bae..109b0ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -117,10 +117,12 @@ public class MockProducer<K, V> implements Producer<K, V> { if (this.cluster.partitionsForTopic(record.topic()) != null) partition = partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); - FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP); + FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); long offset = nextOffset(topicPartition); - Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP), result, callback); + Completion completion = new Completion(topicPartition, offset, + new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0), + result, callback); this.sent.add(record); if (autoComplete) completion.complete(null); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index d9ea239..c60a53d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -30,19 +30,28 @@ public final class RecordMetadata { // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the // producer. private final long timestamp; + private final long checksum; + private final int serializedKeySize; + private final int serializedValueSize; private final TopicPartition topicPartition; - private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp) { + private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long + checksum, int serializedKeySize, int serializedValueSize) { super(); this.offset = offset; this.timestamp = timestamp; + this.checksum = checksum; + this.serializedKeySize = serializedKeySize; + this.serializedValueSize = serializedValueSize; this.topicPartition = topicPartition; } - public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp) { + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, + long timestamp, long checksum, int serializedKeySize, int serializedValueSize) { // ignore the relativeOffset if the base offset is -1, // since this indicates the offset is unknown - this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, timestamp); + this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, + timestamp, checksum, serializedKeySize, serializedValueSize); } /** @@ -60,6 +69,29 @@ public final class RecordMetadata { } /** + * The checksum (CRC32) of the record. + */ + public long checksum() { + return this.checksum; + } + + /** + * The size of the serialized, uncompressed key in bytes. If key is null, the returned size + * is -1. + */ + public int serializedKeySize() { + return this.serializedKeySize; + } + + /** + * The size of the serialized, uncompressed value in bytes. If value is null, the returned + * size is -1. + */ + public int serializedValueSize() { + return this.serializedValueSize; + } + + /** * The topic the record was appended to */ public String topic() { http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index a140371..d5995a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -27,11 +27,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { private final ProduceRequestResult result; private final long relativeOffset; private final long timestamp; + private final long checksum; + private final int serializedKeySize; + private final int serializedValueSize; - public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp) { + public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp, + long checksum, int serializedKeySize, int serializedValueSize) { this.result = result; this.relativeOffset = relativeOffset; this.timestamp = timestamp; + this.checksum = checksum; + this.serializedKeySize = serializedKeySize; + this.serializedValueSize = serializedValueSize; } @Override @@ -61,7 +68,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { } RecordMetadata value() { - return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, this.timestamp); + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, + this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize); } public long relativeOffset() { @@ -72,6 +80,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { return this.timestamp; } + public long checksum() { + return this.checksum; + } + + public int serializedKeySize() { + return this.serializedKeySize; + } + + public int serializedValueSize() { + return this.serializedValueSize; + } + @Override public boolean isCancelled() { return false; http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index af9095d..7b5fbbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -67,10 +67,13 @@ public final class RecordBatch { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(offsetCounter++, timestamp, key, value); + long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; - FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp); + FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, + timestamp, checksum, + key == null ? -1 : key.length, + value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; @@ -97,7 +100,10 @@ public final class RecordBatch { if (exception == null) { // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), - timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp); + timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp, + thunk.future.checksum(), + thunk.future.serializedKeySize(), + thunk.future.serializedValueSize()); thunk.callback.onCompletion(metadata, null); } else { thunk.callback.onCompletion(null, exception); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index cde2178..afa85a4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -201,16 +201,24 @@ public class Compressor { } } - public void putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + /** + * @return CRC of the record + */ + public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, + int valueOffset, int valueSize) { // put a record as un-compressed into the underlying stream long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize); byte attributes = Record.computeAttributes(type); putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize); - + return crc; } - public void putRecord(long timestamp, byte[] key, byte[] value) { - putRecord(timestamp, key, value, CompressionType.NONE, 0, -1); + /** + * Put a record as uncompressed into the underlying stream + * @return CRC of the record + */ + public long putRecord(long timestamp, byte[] key, byte[] value) { + return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1); } private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 01da1e2..f37ef39 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -88,16 +88,18 @@ public class MemoryRecords implements Records { /** * Append a new record and offset to the buffer + * @return crc of the record */ - public void append(long offset, long timestamp, byte[] key, byte[] value) { + public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); - compressor.putRecord(timestamp, key, value); + long crc = compressor.putRecord(timestamp, key, value); compressor.recordWritten(size + Records.LOG_OVERHEAD); + return crc; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 3ef5c8b..70b1c09 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -43,8 +43,8 @@ public class MockConsumerTest { beginningOffsets.put(new TopicPartition("test", 1), 0L); consumer.updateBeginningOffsets(beginningOffsets); consumer.seek(new TopicPartition("test", 0), 0); - ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, "key1", "value1"); - ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, "key2", "value2"); + ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1"); + ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2"); consumer.addRecord(rec1); consumer.addRecord(rec2); ConsumerRecords<String, String> recs = consumer.poll(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java index 25843c7..4259c75 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java @@ -44,7 +44,7 @@ public class ConsumerInterceptorsTest { private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1); private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2); private final ConsumerRecord<Integer, Integer> consumerRecord = - new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 1, 1); + new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1); private int onCommitCount = 0; private int onConsumeCount = 0; @@ -117,9 +117,11 @@ public class ConsumerInterceptorsTest { List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>(); list1.add(consumerRecord); List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>(); - list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1)); + list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, + 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1)); List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>(); - list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1)); + list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, + 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1)); records.put(tp, list1); records.put(filterTopicPart1, list2); records.put(filterTopicPart2, list3); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java index 5591129..d820dab 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java @@ -45,7 +45,8 @@ public class RecordSendTest { @Test public void testTimeout() throws Exception { ProduceRequestResult request = new ProduceRequestResult(); - FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, Record.NO_TIMESTAMP); + FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, + Record.NO_TIMESTAMP, 0, 0, 0); assertFalse("Request is not completed", future.isDone()); try { future.get(5, TimeUnit.MILLISECONDS); @@ -63,7 +64,8 @@ public class RecordSendTest { */ @Test(expected = ExecutionException.class) public void testError() throws Exception { - FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, Record.NO_TIMESTAMP); + FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), + relOffset, Record.NO_TIMESTAMP, 0, 0, 0); future.get(); } @@ -72,7 +74,8 @@ public class RecordSendTest { */ @Test public void testBlocking() throws Exception { - FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, Record.NO_TIMESTAMP); + FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), + relOffset, Record.NO_TIMESTAMP, 0, 0, 0); assertEquals(baseOffset + relOffset, future.get().offset()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java index 26d15d0..5a32dda 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java @@ -128,7 +128,7 @@ public class ProducerInterceptorsTest { ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify onAck is called on all interceptors - RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0); + RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0); interceptors.onAcknowledgement(meta, null); assertEquals(2, onAckCount); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java index 3246578..0c187cb 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.record.TimestampType; import java.util.ArrayList; import java.util.HashMap; @@ -57,7 +56,10 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri List<ConsumerRecord<String, String>> lst = new ArrayList<>(); for (ConsumerRecord<String, String> record: records.records(tp)) { lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), - 0L, TimestampType.CREATE_TIME, record.key(), record.value().toUpperCase())); + record.timestamp(), record.timestampType(), + record.checksum(), record.serializedKeySize(), + record.serializedValueSize(), + record.key(), record.value().toUpperCase())); } recordMap.put(tp, lst); } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 6721609..aef3344 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -293,7 +293,7 @@ public class WorkerSinkTaskTest { public ConsumerRecords<byte[], byte[]> answer() throws Throwable { List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); for (int i = 0; i < numMessages; i++) - records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)); + records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)); recordsReturned += numMessages; return new ConsumerRecords<>( numMessages > 0 ? http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 77f1ed0..b37b34f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -520,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { Collections.singletonMap( new TopicPartition(TOPIC, PARTITION), Arrays.asList( - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE) + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE) ))); recordsReturned++; return records; @@ -548,7 +548,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { Collections.singletonMap( new TopicPartition(TOPIC, PARTITION), Arrays.asList( - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE) + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE) ))); recordsReturned++; return records; http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 8e9eb72..8fb8bb5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -408,7 +408,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { public Future<RecordMetadata> answer() throws Throwable { synchronized (producerCallbacks) { for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L), null); + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, + 0L, 0L, 0, 0), null); } producerCallbacks.reset(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java index e878e12..cfc713f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java @@ -289,14 +289,14 @@ public class KafkaConfigStorageTest { expectConfigure(); // Overwrite each type at least once to ensure we see the latest data after loading List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), // Connector after root update should make it through, task update shouldn't - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), - new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); @@ -343,12 +343,12 @@ public class KafkaConfigStorageTest { expectConfigure(); List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), // This is the record that has been compacted: //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -484,7 +484,7 @@ public class KafkaConfigStorageTest { public Future<Void> answer() throws Throwable { TestFuture<Void> future = new TestFuture<Void>(); for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, entry.getKey(), entry.getValue())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue())); future.resolveOnGet((Void) null); return future; } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 61763a8..22bb376 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -126,10 +126,10 @@ public class KafkaOffsetBackingStoreTest { public void testReloadOnStart() throws Exception { expectConfigure(); expectStart(Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()), - new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()), - new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array()) + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()), + new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()), + new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()) )); expectStop(); @@ -177,8 +177,8 @@ public class KafkaOffsetBackingStoreTest { PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array())); - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array())); secondGetReadToEndCallback.getValue().onCompletion(null, null); return null; } @@ -190,8 +190,8 @@ public class KafkaOffsetBackingStoreTest { PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array())); - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())); thirdGetReadToEndCallback.getValue().onCompletion(null, null); return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index cdbab64..8acd31f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -367,7 +367,7 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport { private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) { return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(), - TimestampType.CREATE_TIME, key, value); + TimestampType.CREATE_TIME, 0L, 0, 0, key, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index b2246f5..ec58cb6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -183,7 +183,7 @@ public class KafkaBasedLogTest { consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE)); } }); consumer.scheduleNopPollTask(); @@ -191,7 +191,7 @@ public class KafkaBasedLogTest { consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE)); } }); consumer.schedulePollTask(new Runnable() { @@ -298,16 +298,16 @@ public class KafkaBasedLogTest { consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE)); } }); consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE_NEW)); } }); @@ -363,8 +363,8 @@ public class KafkaBasedLogTest { consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW)); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 855025e..50add72 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -127,8 +127,8 @@ object ConsoleConsumer extends Logging { } messageCount += 1 try { - formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType, - msg.key, msg.value), System.out) + formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, + msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out) } catch { case e: Throwable => if (skipMessageOnError) { http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b4b68e0..6ad68b6 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -223,8 +223,10 @@ object SimpleConsumerShell extends Logging { val message = messageAndOffset.message val key = if (message.hasKey) Utils.readBytes(message.key) else null val value = if (message.isNull) null else Utils.readBytes(message.payload) + val serializedKeySize = if (message.hasKey) key.size else -1 + val serializedValueSize = if (message.isNull) -1 else value.size formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp, - message.timestampType, key, value), System.out) + message.timestampType, message.checksum, serializedKeySize, serializedValueSize, key, value), System.out) numMessagesConsumed += 1 } catch { case e: Throwable => http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 19a8882..684b38f 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -309,6 +309,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { val keyAndValueIndex = startingKeyAndValueIndex + i assertEquals(s"key $keyAndValueIndex", new String(record.key)) assertEquals(s"value $keyAndValueIndex", new String(record.value)) + // this is true only because K and V are byte arrays + assertEquals(s"key $keyAndValueIndex".length, record.serializedKeySize) + assertEquals(s"value $keyAndValueIndex".length, record.serializedValueSize) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 807b8bb..2d89bf8 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -96,6 +96,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { assertEquals(offset, metadata.offset()) assertEquals(topic, metadata.topic()) assertEquals(partition, metadata.partition()) + offset match { + case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize, "key".getBytes.length + "value".getBytes.length) + case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes.length) + case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes.length) + case _ => assertTrue(metadata.serializedValueSize > 0) + } + assertNotEquals(metadata.checksum(), 0) offset += 1 } else { fail("Send callback returns the following exception", exception) http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 62bf307..6911a45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -77,8 +77,11 @@ public class RecordQueue { Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key()); Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value()); - ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), - rawRecord.offset(), rawRecord.timestamp(), TimestampType.CREATE_TIME, key, value); + ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), + rawRecord.timestamp(), TimestampType.CREATE_TIME, + rawRecord.checksum(), + rawRecord.serializedKeySize(), + rawRecord.serializedValueSize(), key, value); long timestamp = timestampExtractor.extract(record); StampedRecord stampedRecord = new StampedRecord(record, timestamp); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 61f6dbf..5bf1b5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -60,17 +60,17 @@ public class PartitionGroupTest { // add three 3 records with timestamp 1, 3, 5 to partition-1 List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); group.addRawRecords(partition1, list1); // add three 3 records with timestamp 2, 4, 6 to partition-2 List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); group.addRawRecords(partition2, list2); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 916079d..14cb493 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -85,7 +85,7 @@ public class ProcessorStateManagerTest { public void bufferRecord(ConsumerRecord<Integer, Integer> record) { recordBuffer.add( new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L, - TimestampType.CREATE_TIME, + TimestampType.CREATE_TIME, 0L, 0, 0, serializer.serialize(record.topic(), record.key()), serializer.serialize(record.topic(), record.value()))); endOffset = record.offset(); @@ -269,7 +269,7 @@ public class ProcessorStateManagerTest { int key = i * 10; expectedKeys.add(key); restoreConsumer.bufferRecord( - new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0) + new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0) ); } @@ -322,7 +322,7 @@ public class ProcessorStateManagerTest { int key = i; expectedKeys.add(i); restoreConsumer.bufferRecord( - new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0) + new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0) ); } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 36f38e6..8d9c91c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -51,9 +51,9 @@ public class RecordQueueTest { // add three 3 out-of-order records with timestamp 2, 1, 3 List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); queue.addRawRecords(list1, timestampExtractor); @@ -73,9 +73,9 @@ public class RecordQueueTest { // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); queue.addRawRecords(list2, timestampExtractor); @@ -100,9 +100,9 @@ public class RecordQueueTest { // add three more records with 4, 5, 6 List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); queue.addRawRecords(list3, timestampExtractor); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index e0be587..295f0dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -153,7 +153,7 @@ public class StandbyTaskTest { restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); task.update(partition1, - records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)) + records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)) ); } finally { @@ -172,9 +172,9 @@ public class StandbyTaskTest { restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); for (ConsumerRecord<Integer, Integer> record : Arrays.asList( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100))) { + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) { restoreStateConsumer.bufferRecord(record); } @@ -235,11 +235,11 @@ public class StandbyTaskTest { restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); for (ConsumerRecord<Integer, Integer> record : Arrays.asList( - new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100), - new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100), - new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100), - new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 4, 100), - new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 5, 100))) { + new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), + new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100), + new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100), + new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100), + new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) { restoreStateConsumer.bufferRecord(record); } http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 0430566..1f401db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -108,15 +108,15 @@ public class StreamTaskTest { StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue) + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); task.addRecords(partition2, records( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue) + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); assertEquals(5, task.process()); @@ -159,15 +159,15 @@ public class StreamTaskTest { StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue) + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); task.addRecords(partition2, records( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, recordKey, recordValue) + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); assertEquals(5, task.process()); @@ -178,9 +178,9 @@ public class StreamTaskTest { assertTrue(consumer.paused().contains(partition2)); task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, recordKey, recordValue) + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); assertEquals(2, consumer.paused().size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index e414d80..34fd10c 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -201,7 +201,7 @@ public class ProcessorTopologyTestDriver { } // Add the record ... long offset = offsetsByTopicPartition.get(tp).incrementAndGet(); - task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, key, value))); + task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value))); producer.clear(); // Process the record ... task.process();
