This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 133108c KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955) 133108c is described below commit 133108cdacf7ee1cc4569e797f2cdf9ec60f7fdd Author: Jorge Quilcate Otoya <quilcate.jo...@gmail.com> AuthorDate: Wed May 23 00:44:37 2018 +0200 KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955) Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../kafka/streams/processor/ProcessorContext.java | 7 + .../internals/AbstractProcessorContext.java | 10 ++ .../ForwardingDisabledProcessorContext.java | 6 + .../processor/internals/GlobalStateUpdateTask.java | 3 +- .../internals/ProcessorRecordContext.java | 17 +- .../processor/internals/RecordCollector.java | 3 + .../processor/internals/RecordCollectorImpl.java | 7 +- .../streams/processor/internals/RecordContext.java | 7 + .../processor/internals/RecordDeserializer.java | 2 +- .../streams/processor/internals/SinkNode.java | 2 +- .../streams/processor/internals/StampedRecord.java | 5 + .../processor/internals/StandbyContextImpl.java | 3 + .../streams/processor/internals/StreamTask.java | 8 +- .../state/internals/CachingKeyValueStore.java | 13 +- .../state/internals/CachingSessionStore.java | 11 +- .../state/internals/CachingWindowStore.java | 11 +- .../streams/state/internals/LRUCacheEntry.java | 18 +- .../streams/state/internals/StoreChangeLogger.java | 3 +- .../integration/utils/IntegrationTestUtils.java | 92 +++++++++- .../kstream/internals/KGroupedStreamImplTest.java | 2 +- .../internals/KStreamGlobalKTableJoinTest.java | 2 +- .../internals/KStreamGlobalKTableLeftJoinTest.java | 2 +- .../kstream/internals/KStreamKTableJoinTest.java | 4 +- .../internals/KStreamKTableLeftJoinTest.java | 2 +- ...KStreamSessionWindowAggregateProcessorTest.java | 2 +- .../internals/KTableKTableInnerJoinTest.java | 2 +- .../internals/KTableKTableLeftJoinTest.java | 2 +- .../internals/KTableKTableOuterJoinTest.java | 2 +- .../internals/KTableKTableRightJoinTest.java | 2 +- .../internals/KTableTransformValuesTest.java | 4 +- .../internals/AbstractProcessorContextTest.java | 28 ++- .../processor/internals/ProcessorTopologyTest.java | 65 ++++++- .../processor/internals/RecordCollectorTest.java | 68 +++---- .../processor/internals/RecordContextStub.java | 21 ++- .../internals/RecordDeserializerTest.java | 10 +- .../streams/state/KeyValueStoreTestDriver.java | 3 + .../state/internals/CachingKeyValueStoreTest.java | 3 +- .../state/internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 2 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 2 + .../ChangeLoggingSessionBytesStoreTest.java | 2 + .../ChangeLoggingWindowBytesStoreTest.java | 2 + .../streams/state/internals/NamedCacheTest.java | 32 ++-- .../state/internals/RocksDBWindowStoreTest.java | 4 +- .../state/internals/StoreChangeLoggerTest.java | 13 ++ .../streams/state/internals/ThreadCacheTest.java | 8 +- .../kafka/test/InternalMockProcessorContext.java | 12 +- .../org/apache/kafka/test/KStreamTestDriver.java | 7 +- .../org/apache/kafka/test/NoOpRecordCollector.java | 3 + .../apache/kafka/streams/TopologyTestDriver.java | 20 ++- .../streams/processor/MockProcessorContext.java | 13 +- .../kafka/streams/test/ConsumerRecordFactory.java | 170 ++++++++++++++++-- .../apache/kafka/streams/test/OutputVerifier.java | 199 +++++++++++++++++++++ .../kafka/streams/MockProcessorContextTest.java | 2 +- .../kafka/streams/TopologyTestDriverTest.java | 19 +- .../streams/test/ConsumerRecordFactoryTest.java | 7 +- 56 files changed, 839 insertions(+), 132 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 79d191c..f2a9f64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; @@ -198,6 +199,12 @@ public interface ProcessorContext { long offset(); /** + * Returns the headers of the current input record + * @return the headers + */ + Headers headers(); + + /** * Returns the current timestamp. * * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 9687477..3338669 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -142,6 +143,15 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte return recordContext.offset(); } + @Override + public Headers headers() { + if (recordContext == null) { + throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed"); + } + + return recordContext.headers(); + } + /** * @throws IllegalStateException if timestamp is null */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java index 35a0a7e..7e2610c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; @@ -133,6 +134,11 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex } @Override + public Headers headers() { + return delegate.headers(); + } + + @Override public long timestamp() { return delegate.timestamp(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 26bf493..d387713 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -89,7 +89,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { new ProcessorRecordContext(deserialized.timestamp(), deserialized.offset(), deserialized.partition(), - deserialized.topic()); + deserialized.topic(), + deserialized.headers()); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 92acfc9..c071525 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Headers; + import java.util.Objects; public class ProcessorRecordContext implements RecordContext { @@ -24,16 +26,19 @@ public class ProcessorRecordContext implements RecordContext { private final long offset; private final String topic; private final int partition; + private final Headers headers; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, - final String topic) { + final String topic, + final Headers headers) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; + this.headers = headers; } public long offset() { @@ -59,6 +64,11 @@ public class ProcessorRecordContext implements RecordContext { } @Override + public Headers headers() { + return headers; + } + + @Override public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; @@ -66,11 +76,12 @@ public class ProcessorRecordContext implements RecordContext { return timestamp == that.timestamp && offset == that.offset && partition == that.partition && - Objects.equals(topic, that.topic); + Objects.equals(topic, that.topic) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(timestamp, offset, topic, partition); + return Objects.hash(timestamp, offset, topic, partition, headers); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index b083869..bf10da2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -28,6 +29,7 @@ public interface RecordCollector { <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, @@ -36,6 +38,7 @@ public interface RecordCollector { <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 8167539..1c8b0a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; @@ -77,6 +78,7 @@ public class RecordCollectorImpl implements RecordCollector { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, @@ -93,7 +95,7 @@ public class RecordCollectorImpl implements RecordCollector { } } - send(topic, key, value, partition, timestamp, keySerializer, valueSerializer); + send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer); } private boolean productionExceptionIsFatal(final Exception exception) { @@ -142,6 +144,7 @@ public class RecordCollectorImpl implements RecordCollector { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, @@ -150,7 +153,7 @@ public class RecordCollectorImpl implements RecordCollector { final byte[] keyBytes = keySerializer.serialize(topic, key); final byte[] valBytes = valueSerializer.serialize(topic, value); - final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes); + final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); try { producer.send(serializedRecord, new Callback() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java index dd58f4c..15add71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.Processor; /** @@ -47,4 +48,10 @@ public interface RecordContext { * @return The partition the record was received on */ int partition(); + + /** + * @return The headers from the record received from Kafka + */ + Headers headers(); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 36e2c9a..ade9664 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -63,7 +63,7 @@ class RecordDeserializer { rawRecord.serializedKeySize(), rawRecord.serializedValueSize(), sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()), - sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value())); + sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers()); } catch (final Exception deserializationException) { final DeserializationExceptionHandler.DeserializationHandlerResponse response; try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 0fbd6dc..7711905 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -84,7 +84,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { } try { - collector.send(topic, key, value, timestamp, keySerializer, valSerializer, partitioner); + collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner); } catch (final ClassCastException e) { final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index 243c41a..aa9b79d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> { @@ -44,6 +45,10 @@ public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> { return value.offset(); } + public Headers headers() { + return value.headers(); + } + @Override public String toString() { return value.toString() + ", timestamp = " + timestamp; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 6aeca44..14f986c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; @@ -40,6 +41,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, @@ -50,6 +52,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 633e7ad..e2be3e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -343,7 +343,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) { - processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic())); + processorContext.setRecordContext( + new ProcessorRecordContext( + record.timestamp, + record.offset(), + record.partition(), + record.topic(), + record.headers())); processorContext.setCurrentNode(currNode); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 525e92d..285bde5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -224,8 +224,17 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im } private void putInternal(final Bytes key, final byte[] value) { - cache.put(cacheName, key, new LRUCacheEntry(value, true, context.offset(), - context.timestamp(), context.partition(), context.topic())); + cache.put( + cacheName, + key, + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + context.timestamp(), + context.partition(), + context.topic())); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 1bb2ea7..c099faf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -141,8 +141,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i public void put(final Windowed<Bytes> key, byte[] value) { validateStoreOpen(); final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key)); - final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(), - key.window().end(), context.partition(), context.topic()); + final LRUCacheEntry entry = + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + key.window().end(), + context.partition(), + context.topic()); cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 7e58b68..ca24ffd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -150,8 +150,15 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl validateStoreOpen(); final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0); - final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(), - timestamp, context.partition(), context.topic()); + final LRUCacheEntry entry = + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + timestamp, + context.partition(), + context.topic()); cache.put(name, cacheFunction.cacheKey(keyBytes), entry); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java index af7059b..78c0331 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.RecordContext; /** @@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.RecordContext; class LRUCacheEntry implements RecordContext { public final byte[] value; + private final Headers headers; private final long offset; private final String topic; private final int partition; @@ -33,13 +35,18 @@ class LRUCacheEntry implements RecordContext { private boolean isDirty; LRUCacheEntry(final byte[] value) { - this(value, false, -1, -1, -1, ""); + this(value, null, false, -1, -1, -1, ""); } - LRUCacheEntry(final byte[] value, final boolean isDirty, - final long offset, final long timestamp, final int partition, + LRUCacheEntry(final byte[] value, + final Headers headers, + final boolean isDirty, + final long offset, + final long timestamp, + final int partition, final String topic) { this.value = value; + this.headers = headers; this.partition = partition; this.topic = topic; this.offset = offset; @@ -78,6 +85,11 @@ class LRUCacheEntry implements RecordContext { return partition; } + @Override + public Headers headers() { + return headers; + } + void markClean() { isDirty = false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index 1055df5..a8a04c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -55,7 +55,8 @@ class StoreChangeLogger<K, V> { if (collector != null) { final Serializer<K> keySerializer = serialization.keySerializer(); final Serializer<V> valueSerializer = serialization.valueSerializer(); - collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer); + // Sending null headers to changelog topics (KIP-244) + collector.send(this.topic, key, value, null, this.partition, context.timestamp(), keySerializer, valueSerializer); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index d306ee4..fe897c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.requests.UpdateMetadataRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -91,6 +92,12 @@ public class IntegrationTestUtils { IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false); } + public static <K, V> void produceKeyValuesSynchronously( + final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Headers headers, final Time time) + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false); + } + /** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka @@ -102,10 +109,21 @@ public class IntegrationTestUtils { public static <K, V> void produceKeyValuesSynchronously( final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions) throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions); + } + + public static <K, V> void produceKeyValuesSynchronously(final String topic, + final Collection<KeyValue<K, V>> records, + final Properties producerConfig, + final Headers headers, + final Time time, + final boolean enableTransactions) + throws ExecutionException, InterruptedException { for (final KeyValue<K, V> record : records) { produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singleton(record), producerConfig, + headers, time.milliseconds(), enableTransactions); time.sleep(1L); @@ -123,20 +141,39 @@ public class IntegrationTestUtils { public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, + final Headers headers, + final Long timestamp) + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false); + } + + public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection<KeyValue<K, V>> records, + final Properties producerConfig, final Long timestamp, final boolean enableTransactions) throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions); + } + + public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection<KeyValue<K, V>> records, + final Properties producerConfig, + final Headers headers, + final Long timestamp, + final boolean enabledTransactions) + throws ExecutionException, InterruptedException { try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) { - if (enableTransactions) { + if (enabledTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue<K, V> record : records) { final Future<RecordMetadata> f = producer.send( - new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); + new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); f.get(); } - if (enableTransactions) { + if (enabledTransactions) { producer.commitTransaction(); } producer.flush(); @@ -194,6 +231,12 @@ public class IntegrationTestUtils { } } + public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { + return waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); + } + public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { @@ -232,6 +275,27 @@ public class IntegrationTestUtils { return accumData; } + public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime) throws InterruptedException { + final List<ConsumerRecord<K, V>> accumData = new ArrayList<>(); + try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) { + final TestCondition valuesRead = new TestCondition() { + @Override + public boolean conditionMet() { + final List<ConsumerRecord<K, V>> readData = + readRecords(topic, consumer, waitTime, expectedNumRecords); + accumData.addAll(readData); + return accumData.size() >= expectedNumRecords; + } + }; + final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic; + TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); + } + return accumData; + } + public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { @@ -380,21 +444,33 @@ public class IntegrationTestUtils { final Consumer<K, V> consumer, final long waitTime, final int maxMessages) { - final List<KeyValue<K, V>> consumedValues; + final List<KeyValue<K, V>> consumedValues = new ArrayList<>(); + final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages); + for (final ConsumerRecord<K, V> record : records) { + consumedValues.add(new KeyValue<>(record.key(), record.value())); + } + return consumedValues; + } + + private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic, + final Consumer<K, V> consumer, + final long waitTime, + final int maxMessages) { + final List<ConsumerRecord<K, V>> consumerRecords; consumer.subscribe(Collections.singletonList(topic)); final int pollIntervalMs = 100; - consumedValues = new ArrayList<>(); + consumerRecords = new ArrayList<>(); int totalPollTimeMs = 0; while (totalPollTimeMs < waitTime && - continueConsuming(consumedValues.size(), maxMessages)) { + continueConsuming(consumerRecords.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); for (final ConsumerRecord<K, V> record : records) { - consumedValues.add(new KeyValue<>(record.key(), record.value())); + consumerRecords.add(record); } } - return consumedValues; + return consumerRecords; } private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index e5acd01..1517f0e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -531,7 +531,7 @@ public class KGroupedStreamImplTest { driver.pipeInput(recordFactory.create(TOPIC, "1", "D")); driver.pipeInput(recordFactory.create(TOPIC, "3", "E")); driver.pipeInput(recordFactory.create(TOPIC, "3", "F")); - driver.pipeInput(recordFactory.create(TOPIC, "3", null)); + driver.pipeInput(recordFactory.create(TOPIC, "3", (String) null)); } private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java index ae76362..d5c5a54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java @@ -106,7 +106,7 @@ public class KStreamGlobalKTableJoinTest { private void pushNullValueToGlobalTable(final int messageCount) { final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); for (int i = 0; i < messageCount; i++) { - driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null)); + driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java index 95fe8b9..248c3ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java @@ -108,7 +108,7 @@ public class KStreamGlobalKTableLeftJoinTest { private void pushNullValueToGlobalTable(final int messageCount) { final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); for (int i = 0; i < messageCount; i++) { - driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null)); + driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index 43eaf3b..6ffce04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -97,7 +97,7 @@ public class KStreamKTableJoinTest { private void pushNullValueToTable() { for (int i = 0; i < 2; i++) { - driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null)); + driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null)); } } @@ -205,7 +205,7 @@ public class KStreamKTableJoinTest { @Test public void shouldLogAndMeterWhenSkippingNullLeftValue() { final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - driver.pipeInput(recordFactory.create(streamTopic, 1, null)); + driver.pipeInput(recordFactory.create(streamTopic, 1, (String) null)); LogCaptureAppender.unregister(appender); assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 251e58e..1c3e027 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -93,7 +93,7 @@ public class KStreamKTableLeftJoinTest { private void pushNullValueToTable(final int messageCount) { for (int i = 0; i < messageCount; i++) { - driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null)); + driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index afc9be1..6b5e577 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -308,7 +308,7 @@ public class KStreamSessionWindowAggregateProcessorTest { public void shouldLogAndMeterWhenSkippingNullKey() { initStore(false); processor.init(context); - context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic")); + context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic", null)); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); processor.process(null, "1"); LogCaptureAppender.unregister(appender); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 0c56531..cd29b50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -361,7 +361,7 @@ public class KTableKTableInnerJoinTest { ).get(); final MockProcessorContext context = new MockProcessorContext(); - context.setRecordMetadata("left", -1, -2, -3); + context.setRecordMetadata("left", -1, -2, null, -3); join.init(context); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); join.process(null, new Change<>("new", "old")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index ef64f75..9be6189 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -424,7 +424,7 @@ public class KTableKTableLeftJoinTest { ).get(); final MockProcessorContext context = new MockProcessorContext(); - context.setRecordMetadata("left", -1, -2, -3); + context.setRecordMetadata("left", -1, -2, null, -3); join.init(context); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); join.process(null, new Change<>("new", "old")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index e897ec3..3995fcf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -366,7 +366,7 @@ public class KTableKTableOuterJoinTest { ).get(); final MockProcessorContext context = new MockProcessorContext(); - context.setRecordMetadata("left", -1, -2, -3); + context.setRecordMetadata("left", -1, -2, null, -3); join.init(context); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); join.process(null, new Change<>("new", "old")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java index d7411cb..d4805a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java @@ -41,7 +41,7 @@ public class KTableKTableRightJoinTest { ).get(); final MockProcessorContext context = new MockProcessorContext(); - context.setRecordMetadata("left", -1, -2, -3); + context.setRecordMetadata("left", -1, -2, null, -3); join.init(context); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); join.process(null, new Change<>("new", "old")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java index 7c12dad..be20c86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -325,7 +325,7 @@ public class KTableTransformValuesTest { driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L)); - driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 0L)); assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!")); assertThat("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue())); @@ -349,7 +349,7 @@ public class KTableTransformValuesTest { driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L)); - driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 0L)); assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 86806b2..9aaa8a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; @@ -44,7 +48,8 @@ public class AbstractProcessorContextTest { private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics()); private final AbstractProcessorContext context = new TestProcessorContext(metrics); private final MockStateStore stateStore = new MockStateStore("store", false); - private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo"); + private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers); @Before public void before() { @@ -141,6 +146,27 @@ public class AbstractProcessorContextTest { assertThat(context.timestamp(), equalTo(recordContext.timestamp())); } + @Test + public void shouldReturnHeadersFromRecordContext() { + assertThat(context.headers(), equalTo(recordContext.headers())); + } + + @Test + public void shouldReturnNullIfHeadersAreNotSet() { + context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + assertThat(context.headers(), nullValue()); + } + + @Test + public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() { + context.setRecordContext(null); + try { + context.headers(); + } catch (final IllegalStateException e) { + // pass + } + } + @SuppressWarnings("unchecked") @Test public void appConfigsShouldReturnParsedValues() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index e247647..033c0e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -18,6 +18,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -64,6 +68,9 @@ public class ProcessorTopologyTest { private static final String OUTPUT_TOPIC_2 = "output-topic-2"; private static final String THROUGH_TOPIC_1 = "through-topic-1"; + private static final Header HEADER = new RecordHeader("key", "value".getBytes()); + private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER}); + private final TopologyWrapper topology = new TopologyWrapper(); private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier(); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L); @@ -71,6 +78,7 @@ public class ProcessorTopologyTest { private TopologyTestDriver driver; private final Properties props = new Properties(); + @Before public void setup() { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... @@ -338,10 +346,33 @@ public class ProcessorTopologyTest { assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L); } + @Test + public void shouldConsiderHeaders() { + final int partition = 10; + driver = new TopologyTestDriver(createSimpleTopology(partition), props); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", HEADERS, 10L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", HEADERS, 20L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", HEADERS, 30L)); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, partition, 10L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, partition, 20L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, partition, 30L); + } + + @Test + public void shouldAddHeaders() { + driver = new TopologyTestDriver(createAddHeaderTopology(), props); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L)); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L)); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 10L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 20L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L); + } + private void assertNextOutputRecord(final String topic, final String key, final String value) { - assertNextOutputRecord(topic, key, value, null, 0L); + assertNextOutputRecord(topic, key, value, (Integer) null, 0L); } private void assertNextOutputRecord(final String topic, @@ -354,6 +385,23 @@ public class ProcessorTopologyTest { private void assertNextOutputRecord(final String topic, final String key, final String value, + final Headers headers, + final Long timestamp) { + assertNextOutputRecord(topic, key, value, headers, null, timestamp); + } + + private void assertNextOutputRecord(final String topic, + final String key, + final String value, + final Integer partition, + final Long timestamp) { + assertNextOutputRecord(topic, key, value, new RecordHeaders(), partition, timestamp); + } + + private void assertNextOutputRecord(final String topic, + final String key, + final String value, + final Headers headers, final Integer partition, final Long timestamp) { final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); @@ -362,6 +410,7 @@ public class ProcessorTopologyTest { assertEquals(value, record.value()); assertEquals(partition, record.partition()); assertEquals(timestamp, record.timestamp()); + assertEquals(headers, record.headers()); } private void assertNoOutputRecord(final String topic) { @@ -458,6 +507,12 @@ public class ProcessorTopologyTest { .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"); } + private Topology createAddHeaderTopology() { + return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor-1", define(new AddHeaderProcessor()), "source-1") + .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1"); + } + /** * A processor that simply forwards all messages to all children. */ @@ -478,6 +533,14 @@ public class ProcessorTopologyTest { } } + protected static class AddHeaderProcessor extends AbstractProcessor<String, String> { + @Override + public void process(final String key, final String value) { + context().headers().add(HEADER); + context().forward(key, value); + } + } + /** * A processor that removes custom timestamp information from messages and forwards modified messages to each child. * A message contains custom timestamp information if the value is in ".*@[0-9]+" format. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 8a2f171..e439372 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -27,6 +27,10 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Sum; @@ -85,14 +89,16 @@ public class RecordCollectorTest { new Metrics().sensor("skipped-records") ); - collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); + final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); - collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer); + + collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer); final Map<TopicPartition, Long> offsets = collector.offsets(); @@ -101,9 +107,9 @@ public class RecordCollectorTest { assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); // ignore StreamPartitioner - collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", null, 1, null, stringSerializer, stringSerializer); + collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer); assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); @@ -121,17 +127,19 @@ public class RecordCollectorTest { new Metrics().sensor("skipped-records") ); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner); + final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); final Map<TopicPartition, Long> offsets = collector.offsets(); @@ -155,7 +163,7 @@ public class RecordCollectorTest { new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); } @SuppressWarnings("unchecked") @@ -174,10 +182,10 @@ public class RecordCollectorTest { new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); try { - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); fail("Should have thrown StreamsException"); } catch (final StreamsException expected) { /* ok */ } } @@ -198,9 +206,9 @@ public class RecordCollectorTest { new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); } @SuppressWarnings("unchecked") @@ -223,7 +231,7 @@ public class RecordCollectorTest { logContext, new AlwaysContinueProductionExceptionHandler(), sensor); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); assertEquals(1.0, metrics.metrics().get(metricName).metricValue()); assertTrue(logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error.")); LogCaptureAppender.unregister(logCaptureAppender); @@ -245,7 +253,7 @@ public class RecordCollectorTest { new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); try { collector.flush(); @@ -269,7 +277,7 @@ public class RecordCollectorTest { new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.flush(); } @@ -290,7 +298,7 @@ public class RecordCollectorTest { new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); try { collector.close(); @@ -314,7 +322,7 @@ public class RecordCollectorTest { new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.close(); } @@ -334,7 +342,7 @@ public class RecordCollectorTest { logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); } @SuppressWarnings("unchecked") @@ -352,6 +360,6 @@ public class RecordCollectorTest { logContext, new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java index 0af5e17..7afd51e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java @@ -16,25 +16,37 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Headers; + public class RecordContextStub implements RecordContext { private final long offset; private long timestamp; private final int partition; private final String topic; + private final Headers headers; public RecordContextStub() { - this(-1, -1, -1, ""); + this(-1, -1, -1, "", null); } public RecordContextStub(final long offset, final long timestamp, final int partition, - final String topic) { + final String topic, + final Headers headers) { this.offset = offset; this.timestamp = timestamp; this.partition = partition; this.topic = topic; + this.headers = headers; + } + + public RecordContextStub(final long offset, + final long timestamp, + final int partition, + final String topic) { + this(offset, timestamp, partition, topic, null); } @Override @@ -61,4 +73,9 @@ public class RecordContextStub implements RecordContext { public int partition() { return partition; } + + @Override + public Headers headers() { + return headers; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index de8e17b..36988c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -17,7 +17,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; @@ -29,16 +32,18 @@ import static org.junit.Assert.assertEquals; public class RecordDeserializerTest { + private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic", 1, 1, 10, TimestampType.LOG_APPEND_TIME, - 5, + 5L, 3, 5, new byte[0], - new byte[0]); + new byte[0], + headers); @SuppressWarnings("deprecation") @@ -63,6 +68,7 @@ public class RecordDeserializerTest { assertEquals("value", record.value()); assertEquals(rawRecord.timestamp(), record.timestamp()); assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(rawRecord.headers(), record.headers()); } static class TheSourceNode extends SourceNode<Object, Object> { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 4e80fa7..4b9e6a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -198,6 +199,7 @@ public class KeyValueStoreTestDriver<K, V> { public <K1, V1> void send(final String topic, final K1 key, final V1 value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K1> keySerializer, @@ -214,6 +216,7 @@ public class KeyValueStoreTestDriver<K, V> { public <K1, V1> void send(final String topic, final K1 key, final V1 value, + final Headers headers, final Long timestamp, final Serializer<K1> keySerializer, final Serializer<V1> valueSerializer, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 3e0241e..2f6aac7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -75,7 +75,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache); topic = "topic"; - context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic)); + context.setRecordContext( + new ProcessorRecordContext(10, 0, 0, topic, null)); store.init(context, null); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index b77f4e9..baa9ee4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -76,7 +76,7 @@ public class CachingSessionStoreTest { ); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); - context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); + context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null)); cachingStore.init(context, cachingStore); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index a87b2e4..b8808ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -81,7 +81,7 @@ public class CachingWindowStoreTest { cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); - context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic)); + context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null)); cachingStore.init(context, cachingStore); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 5bb0de7..7f5a08e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -57,6 +58,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { public <K, V> void send(final String topic, K key, V value, + Headers headers, Integer partition, Long timestamp, Serializer<K> keySerializer, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index a658186..edcaa05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; @@ -49,6 +50,7 @@ public class ChangeLoggingSessionBytesStoreTest { public <K, V> void send(final String topic, K key, V value, + Headers headers, Integer partition, Long timestamp, Serializer<K> keySerializer, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index 956172e..e56887e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; @@ -47,6 +48,7 @@ public class ChangeLoggingWindowBytesStoreTest { public <K, V> void send(final String topic, K key, V value, + Headers headers, Integer partition, Long timestamp, Serializer<K> keySerializer, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 9ae0feb..92653ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -42,6 +46,7 @@ import static org.junit.Assert.assertSame; public class NamedCacheTest { + private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private NamedCache cache; private StreamsMetricsImpl metrics; private final String taskIDString = "0.0"; @@ -64,7 +69,7 @@ public class NamedCacheTest { for (int i = 0; i < toInsert.size(); i++) { byte[] key = toInsert.get(i).key.getBytes(); byte[] value = toInsert.get(i).value.getBytes(); - cache.put(Bytes.wrap(key), new LRUCacheEntry(value, true, 1, 1, 1, "")); + cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, "")); LRUCacheEntry head = cache.first(); LRUCacheEntry tail = cache.last(); assertEquals(new String(head.value), toInsert.get(i).value); @@ -170,9 +175,9 @@ public class NamedCacheTest { @Test public void shouldFlushDirtEntriesOnEviction() { final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>(); - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, "")); cache.setListener(new ThreadCache.DirtyEntryFlushListener() { @Override @@ -185,6 +190,7 @@ public class NamedCacheTest { assertEquals(2, flushed.size()); assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key()); + assertEquals(headers, flushed.get(0).recordContext().headers()); assertArrayEquals(new byte[] {10}, flushed.get(0).newValue()); assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key()); assertArrayEquals(new byte[] {30}, flushed.get(1).newValue()); @@ -193,9 +199,9 @@ public class NamedCacheTest { @Test public void shouldGetRangeIteratorOverKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2})); assertEquals(Bytes.wrap(new byte[]{1}), iterator.next()); @@ -205,9 +211,9 @@ public class NamedCacheTest { @Test public void shouldGetIteratorOverAllKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); final Iterator<Bytes> iterator = cache.allKeys(); assertEquals(Bytes.wrap(new byte[]{0}), iterator.next()); @@ -223,8 +229,8 @@ public class NamedCacheTest { @Test(expected = IllegalStateException.class) public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, null, false, 0, 0, 0, "")); } @Test @@ -235,8 +241,8 @@ public class NamedCacheTest { // no-op } }); - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, "")); + cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, null, true, 0, 0, 0, "")); cache.flush(); assertEquals(1, cache.size()); assertNotNull(cache.get(Bytes.wrap(new byte[]{1}))); @@ -244,7 +250,7 @@ public class NamedCacheTest { @Test public void shouldBeReentrantAndNotBreakLRU() { - final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, ""); + final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, ""); final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); cache.put(Bytes.wrap(new byte[]{0}), dirty); cache.put(Bytes.wrap(new byte[]{1}), clean); @@ -290,7 +296,7 @@ public class NamedCacheTest { @Test public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() { - final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, ""); + final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, ""); final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); final Bytes key = Bytes.wrap(new byte[] {3}); cache.setListener(new ThreadCache.DirtyEntryFlushListener() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 92edbd8..be4ede8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -90,6 +91,7 @@ public class RocksDBWindowStoreTest { public <K1, V1> void send(final String topic, final K1 key, final V1 value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K1> keySerializer, @@ -160,7 +162,7 @@ public class RocksDBWindowStoreTest { } private ProcessorRecordContext createRecordContext(final long time) { - return new ProcessorRecordContext(time, 0, 0, "topic"); + return new ProcessorRecordContext(time, 0, 0, "topic", null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 6bacd91..5afe14f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; @@ -38,6 +40,7 @@ public class StoreChangeLoggerTest { private final String topic = "topic"; private final Map<Integer, String> logged = new HashMap<>(); + private final Map<Integer, Headers> loggedHeaders = new HashMap<>(); private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) { @@ -45,17 +48,20 @@ public class StoreChangeLoggerTest { public <K1, V1> void send(final String topic, final K1 key, final V1 value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K1> keySerializer, final Serializer<V1> valueSerializer) { logged.put((Integer) key, (String) value); + loggedHeaders.put((Integer) key, headers); } @Override public <K1, V1> void send(final String topic, final K1 key, final V1 value, + final Headers headers, final Long timestamp, final Serializer<K1> keySerializer, final Serializer<V1> valueSerializer, @@ -80,6 +86,13 @@ public class StoreChangeLoggerTest { changeLogger.logChange(0, null); assertNull(logged.get(0)); + } + + @Test + public void shouldNotSendRecordHeadersToChangelogTopic() { + context.headers().add(new RecordHeader("key", "value".getBytes())); + changeLogger.logChange(0, "zero"); + assertNull(loggedHeaders.get(0)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 164e71e..d100ae5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -59,7 +59,7 @@ public class ThreadCacheTest { for (KeyValue<String, String> kvToInsert : toInsert) { Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); byte[] value = kvToInsert.value.getBytes(); - cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); + cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, "")); } for (KeyValue<String, String> kvToInsert : toInsert) { @@ -89,7 +89,7 @@ public class ThreadCacheTest { String keyStr = "K" + i; Bytes key = Bytes.wrap(keyStr.getBytes()); byte[] value = new byte[valueSizeBytes]; - cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); + cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, "")); } @@ -171,7 +171,7 @@ public class ThreadCacheTest { for (KeyValue<String, String> kvToInsert : toInsert) { final Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); final byte[] value = kvToInsert.value.getBytes(); - cache.put(namespace, key, new LRUCacheEntry(value, true, 1, 1, 1, "")); + cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1, 1, 1, "")); } for (int i = 0; i < expected.size(); i++) { @@ -520,7 +520,7 @@ public class ThreadCacheTest { } private LRUCacheEntry dirtyEntry(final byte[] key) { - return new LRUCacheEntry(key, true, -1, -1, -1, ""); + return new LRUCacheEntry(key, null, true, -1, -1, -1, ""); } private LRUCacheEntry cleanEntry(final byte[] key) { diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index eb72e13..e5571eb 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; @@ -242,7 +244,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple // and also not throwing exceptions if record context is not available. public void setTime(final long timestamp) { if (recordContext != null) { - recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic()); + recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic(), recordContext.headers()); } this.timestamp = timestamp; } @@ -279,6 +281,14 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple return recordContext.offset(); } + @Override + public Headers headers() { + if (recordContext == null) { + return new RecordHeaders(); + } + return recordContext.headers(); + } + Map<String, StateStore> allStateStores() { return Collections.unmodifiableMap(storeMap); } diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 2c3461a..698cdc7 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -90,7 +91,7 @@ public class KStreamTestDriver extends ExternalResource { final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics())); context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", null)); // init global topology first as it will add stores to the // store map that are required for joins etc. @@ -229,7 +230,7 @@ public class KStreamTestDriver extends ExternalResource { } private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) { - return new ProcessorRecordContext(timestamp, -1, -1, topicName); + return new ProcessorRecordContext(timestamp, -1, -1, topicName, null); } private class MockRecordCollector extends RecordCollectorImpl { @@ -241,6 +242,7 @@ public class KStreamTestDriver extends ExternalResource { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, @@ -255,6 +257,7 @@ public class KStreamTestDriver extends ExternalResource { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java index 66271a0..893d356 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -30,6 +31,7 @@ public class NoOpRecordCollector implements RecordCollector { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, @@ -39,6 +41,7 @@ public class NoOpRecordCollector implements RecordCollector { public <K, V> void send(final String topic, final K key, final V value, + final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index c237ca7..e46ec6a 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -227,7 +227,7 @@ public class TopologyTestDriver implements Closeable { * @param config the configuration for the topology */ TopologyTestDriver(final InternalTopologyBuilder builder, - final Properties config) { + final Properties config) { this(builder, config, System.currentTimeMillis()); } @@ -382,14 +382,16 @@ public class TopologyTestDriver implements Closeable { offset, consumerRecord.timestamp(), consumerRecord.timestampType(), - ConsumerRecord.NULL_CHECKSUM, + (long) ConsumerRecord.NULL_CHECKSUM, consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), consumerRecord.key(), - consumerRecord.value()))); + consumerRecord.value(), + consumerRecord.headers()))); // Process the record ... - ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName)); + ((InternalProcessorContext) task.context()).setRecordContext( + new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName, consumerRecord.headers())); task.process(); task.maybePunctuateStreamTime(); task.commit(); @@ -407,11 +409,12 @@ public class TopologyTestDriver implements Closeable { offset, consumerRecord.timestamp(), consumerRecord.timestampType(), - ConsumerRecord.NULL_CHECKSUM, + (long) ConsumerRecord.NULL_CHECKSUM, consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), consumerRecord.key(), - consumerRecord.value())); + consumerRecord.value(), + consumerRecord.headers())); globalStateTask.flushState(); } } @@ -467,7 +470,8 @@ public class TopologyTestDriver implements Closeable { serializedKey == null ? 0 : serializedKey.length, serializedValue == null ? 0 : serializedValue.length, serializedKey, - serializedValue)); + serializedValue, + record.headers())); } } } @@ -536,7 +540,7 @@ public class TopologyTestDriver implements Closeable { } final K key = keyDeserializer.deserialize(record.topic(), record.key()); final V value = valueDeserializer.deserialize(record.topic(), record.value()); - return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value); + return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers()); } /** diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 3e29cde..b14a791 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; @@ -61,6 +62,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S private String topic; private Integer partition; private Long offset; + private Headers headers; private Long timestamp; // mocks ================================================ @@ -250,10 +252,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S * @param offset A record offset * @param timestamp A record timestamp */ - public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) { + public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) { this.topic = topic; this.partition = partition; this.offset = offset; + this.headers = headers; this.timestamp = timestamp; } @@ -289,6 +292,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S this.offset = offset; } + public void setHeaders(final Headers headers) { + this.headers = headers; + } /** * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, @@ -325,6 +331,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override + public Headers headers() { + return headers; + } + + @Override public long timestamp() { if (timestamp == null) { throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp()."); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java index b0ccd61..507249d 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; @@ -153,20 +155,23 @@ public class ConsumerRecordFactory<K, V> { } /** - * Create a {@link ConsumerRecord} with the given topic name, key, value, and timestamp. + * Create a {@link ConsumerRecord} with the given topic name, key, value, headers, and timestamp. * Does not auto advance internally tracked time. * * @param topicName the topic name * @param key the record key * @param value the record value + * @param headers the record headers * @param timestampMs the record timestamp * @return the generated {@link ConsumerRecord} */ public ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value, + final Headers headers, final long timestampMs) { Objects.requireNonNull(topicName, "topicName cannot be null."); + Objects.requireNonNull(headers, "headers cannot be null."); final byte[] serializedKey = keySerializer.serialize(topicName, key); final byte[] serializedValue = valueSerializer.serialize(topicName, value); return new ConsumerRecord<>( @@ -175,11 +180,29 @@ public class ConsumerRecordFactory<K, V> { -1L, timestampMs, TimestampType.CREATE_TIME, - ConsumerRecord.NULL_CHECKSUM, + (long) ConsumerRecord.NULL_CHECKSUM, serializedKey == null ? 0 : serializedKey.length, serializedValue == null ? 0 : serializedValue.length, serializedKey, - serializedValue); + serializedValue, + headers); + } + + /** + * Create a {@link ConsumerRecord} with the given topic name and given topic, key, value, and timestamp. + * Does not auto advance internally tracked time. + * + * @param topicName the topic name + * @param key the record key + * @param value the record value + * @param timestampMs the record timestamp + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final String topicName, + final K key, + final V value, + final long timestampMs) { + return create(topicName, key, value, new RecordHeaders(), timestampMs); } /** @@ -194,16 +217,33 @@ public class ConsumerRecordFactory<K, V> { public ConsumerRecord<byte[], byte[]> create(final K key, final V value, final long timestampMs) { + return create(key, value, new RecordHeaders(), timestampMs); + } + + /** + * Create a {@link ConsumerRecord} with default topic name and given key, value, headers, and timestamp. + * Does not auto advance internally tracked time. + * + * @param key the record key + * @param value the record value + * @param headers the record headers + * @param timestampMs the record timestamp + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final K key, + final V value, + final Headers headers, + final long timestampMs) { if (topicName == null) { throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " + "Use #create(String topicName, K key, V value, long timestampMs) instead."); } - return create(topicName, key, value, timestampMs); + return create(topicName, key, value, headers, timestampMs); } /** * Create a {@link ConsumerRecord} with the given topic name, key, and value. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param topicName the topic name * @param key the record key @@ -215,12 +255,31 @@ public class ConsumerRecordFactory<K, V> { final V value) { final long timestamp = timeMs; timeMs += advanceMs; - return create(topicName, key, value, timestamp); + return create(topicName, key, value, new RecordHeaders(), timestamp); + } + + /** + * Create a {@link ConsumerRecord} with the given topic name, key, value, and headers. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. + * + * @param topicName the topic name + * @param key the record key + * @param value the record value + * @param headers the record headers + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final String topicName, + final K key, + final V value, + final Headers headers) { + final long timestamp = timeMs; + timeMs += advanceMs; + return create(topicName, key, value, headers, timestamp); } /** * Create a {@link ConsumerRecord} with default topic name and given key and value. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param key the record key * @param value the record value @@ -228,46 +287,110 @@ public class ConsumerRecordFactory<K, V> { */ public ConsumerRecord<byte[], byte[]> create(final K key, final V value) { + return create(key, value, new RecordHeaders()); + } + + /** + * Create a {@link ConsumerRecord} with default topic name and given key, value, and headers. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. + * + * @param key the record key + * @param value the record value + * @param headers the record headers + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final K key, + final V value, + final Headers headers) { if (topicName == null) { throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " + "Use #create(String topicName, K key, V value) instead."); } - return create(topicName, key, value); + return create(topicName, key, value, headers); } /** * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp. + * Does not auto advance internally tracked time. + * + * @param topicName the topic name + * @param value the record value + * @param timestampMs the record timestamp + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final String topicName, + final V value, + final long timestampMs) { + return create(topicName, null, value, new RecordHeaders(), timestampMs); + } + + /** + * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, headers, and timestamp. + * Does not auto advance internally tracked time. * * @param topicName the topic name * @param value the record value + * @param headers the record headers * @param timestampMs the record timestamp * @return the generated {@link ConsumerRecord} */ public ConsumerRecord<byte[], byte[]> create(final String topicName, final V value, + final Headers headers, final long timestampMs) { - return create(topicName, null, value, timestampMs); + return create(topicName, null, value, headers, timestampMs); } /** * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp. + * Does not auto advance internally tracked time. + * + * @param value the record value + * @param timestampMs the record timestamp + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final V value, + final long timestampMs) { + return create(value, new RecordHeaders(), timestampMs); + } + + /** + * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value, headers, and timestamp. + * Does not auto advance internally tracked time. * * @param value the record value + * @param headers the record headers * @param timestampMs the record timestamp * @return the generated {@link ConsumerRecord} */ public ConsumerRecord<byte[], byte[]> create(final V value, + final Headers headers, final long timestampMs) { if (topicName == null) { throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " + "Use #create(String topicName, V value, long timestampMs) instead."); } - return create(topicName, value, timestampMs); + return create(topicName, value, headers, timestampMs); + } + + /** + * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and headers. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. + * + * @param topicName the topic name + * @param value the record value + * @param headers the record headers + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final String topicName, + final V value, + final Headers headers) { + return create(topicName, null, value, headers); } /** * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param topicName the topic name * @param value the record value @@ -275,27 +398,40 @@ public class ConsumerRecordFactory<K, V> { */ public ConsumerRecord<byte[], byte[]> create(final String topicName, final V value) { - return create(topicName, null, value); + return create(topicName, null, value, new RecordHeaders()); } /** * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param value the record value * @return the generated {@link ConsumerRecord} */ public ConsumerRecord<byte[], byte[]> create(final V value) { + return create(value, new RecordHeaders()); + } + + /** + * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value and headers. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. + * + * @param value the record value + * @param headers the record headers + * @return the generated {@link ConsumerRecord} + */ + public ConsumerRecord<byte[], byte[]> create(final V value, + final Headers headers) { if (topicName == null) { throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " + "Use #create(String topicName, V value, long timestampMs) instead."); } - return create(topicName, value); + return create(topicName, value, headers); } /** * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param topicName the topic name * @param keyValues the record keys and values @@ -314,7 +450,7 @@ public class ConsumerRecordFactory<K, V> { /** * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values. - * The timestamp will be generated from the constructor provided and time will auto advance. + * The timestamp will be generated based on the constructor provided start time and time will auto advance. * * @param keyValues the record keys and values * @return the generated {@link ConsumerRecord consumer records} @@ -350,7 +486,7 @@ public class ConsumerRecordFactory<K, V> { long timestamp = startTimestamp; for (final KeyValue<K, V> keyValue : keyValues) { - records.add(create(topicName, keyValue.key, keyValue.value, timestamp)); + records.add(create(topicName, keyValue.key, keyValue.value, new RecordHeaders(), timestamp)); timestamp += advanceMs; } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java index 09ed294..aedb910 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.test; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.TopologyTestDriver; import java.util.Objects; @@ -238,4 +239,202 @@ public class OutputVerifier { compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp()); } + /** + * Compares a {@link ProducerRecord} with the provided value and headers and throws an {@link AssertionError} if + * the {@code ProducerRecord}'s value or headers is not equal to the expected value or headers. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedValue the expected value of the {@code ProducerRecord} + * @param expectedHeaders the expected headers of the {@code ProducerRecord} + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders} + */ + public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record, + final V expectedValue, + final Headers expectedHeaders) throws AssertionError { + Objects.requireNonNull(record); + + final V recordValue = record.value(); + final Headers recordHeaders = record.headers(); + final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with headers=" + expectedHeaders + + " but was value=" + recordValue + " with headers=" + recordHeaders); + + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + + if (recordHeaders != null) { + if (!recordHeaders.equals(expectedHeaders)) { + throw error; + } + } else if (expectedHeaders != null) { + throw error; + } + } + + /** + * Compares the values and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the + * values or headers are not equal to each other. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedRecord a {@code ProducerRecord} for verification + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers + */ + public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record, + final ProducerRecord<K, V> expectedRecord) throws AssertionError { + Objects.requireNonNull(expectedRecord); + compareValueHeaders(record, expectedRecord.value(), expectedRecord.headers()); + } + + /** + * Compares a {@link ProducerRecord} with the provided key, value, and headers and throws an + * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or headers is not equal to the expected key, + * value, or headers. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedKey the expected key of the {@code ProducerRecord} + * @param expectedValue the expected value of the {@code ProducerRecord} + * @param expectedHeaders the expected headers of the {@code ProducerRecord} + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey}, + * {@code expectedValue}, or {@code expectedHeaders} + */ + public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record, + final K expectedKey, + final V expectedValue, + final Headers expectedHeaders) throws AssertionError { + Objects.requireNonNull(record); + + final K recordKey = record.key(); + final V recordValue = record.value(); + final Headers recordHeaders = record.headers(); + final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with headers=" + expectedHeaders + + " but was <" + recordKey + ", " + recordValue + "> with headers=" + recordHeaders); + + if (recordKey != null) { + if (!recordKey.equals(expectedKey)) { + throw error; + } + } else if (expectedKey != null) { + throw error; + } + + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + + if (recordHeaders != null) { + if (!recordHeaders.equals(expectedHeaders)) { + throw error; + } + } else if (expectedHeaders != null) { + throw error; + } + } + + /** + * Compares the keys, values, and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if + * the keys, values, or headers are not equal to each other. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedRecord a {@code ProducerRecord} for verification + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to + * {@code expectedRecord}'s key, value, or headers + */ + public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record, + final ProducerRecord<K, V> expectedRecord) throws AssertionError { + Objects.requireNonNull(expectedRecord); + compareKeyValueHeaders(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers()); + } + + /** + * Compares a {@link ProducerRecord} with the provided key, value, headers, and timestamp and throws an + * {@link AssertionError} if the {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to the expected key, + * value, headers, or timestamp. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedKey the expected key of the {@code ProducerRecord} + * @param expectedValue the expected value of the {@code ProducerRecord} + * @param expectedHeaders the expected headers of the {@code ProducerRecord} + * @param expectedTimestamp the expected timestamp of the {@code ProducerRecord} + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey}, + * {@code expectedValue}, or {@code expectedHeaders} + */ + public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record, + final K expectedKey, + final V expectedValue, + final Headers expectedHeaders, + final long expectedTimestamp) throws AssertionError { + Objects.requireNonNull(record); + + final K recordKey = record.key(); + final V recordValue = record.value(); + final Headers recordHeaders = record.headers(); + final long recordTimestamp = record.timestamp(); + final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + ">" + + " with timestamp=" + expectedTimestamp + " and headers=" + expectedHeaders + + " but was <" + recordKey + ", " + recordValue + ">" + + " with timestamp=" + recordTimestamp + " and headers=" + recordHeaders); + + if (recordKey != null) { + if (!recordKey.equals(expectedKey)) { + throw error; + } + } else if (expectedKey != null) { + throw error; + } + + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + + if (recordHeaders != null) { + if (!recordHeaders.equals(expectedHeaders)) { + throw error; + } + } else if (expectedHeaders != null) { + throw error; + } + + if (recordTimestamp != expectedTimestamp) { + throw error; + } + } + + /** + * Compares the keys, values, headers, and timestamp of two {@link ProducerRecord}'s and throws an {@link AssertionError} if + * the keys, values, headers, or timestamps are not equal to each other. + * + * @param record a output {@code ProducerRecord} for verification + * @param expectedRecord a {@code ProducerRecord} for verification + * @param <K> the key type + * @param <V> the value type + * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to + * {@code expectedRecord}'s key, value, headers, or timestamp + */ + public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record, + final ProducerRecord<K, V> expectedRecord) throws AssertionError { + Objects.requireNonNull(expectedRecord); + compareKeyValueHeadersTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers(), expectedRecord.timestamp()); + } } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index dbb26e0..64d5b12 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -286,7 +286,7 @@ public class MockProcessorContextTest { } context.resetForwards(); - context.setRecordMetadata("t1", 0, 0L, 0L); + context.setRecordMetadata("t1", 0, 0L, null, 0L); { processor.process("foo", 5L); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 6cc96a2..2d446d1 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -18,6 +18,10 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -74,10 +78,12 @@ public class TopologyTestDriverTest { new ByteArraySerializer(), new ByteArraySerializer()); + private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + private final byte[] key1 = new byte[0]; private final byte[] value1 = new byte[0]; private final long timestamp1 = 42L; - private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1); + private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, headers, timestamp1); private final byte[] key2 = new byte[0]; private final byte[] value2 = new byte[0]; @@ -107,6 +113,7 @@ public class TopologyTestDriverTest { private long timestamp; private long offset; private String topic; + private Headers headers; Record(final ConsumerRecord consumerRecord) { key = consumerRecord.key(); @@ -114,15 +121,18 @@ public class TopologyTestDriverTest { timestamp = consumerRecord.timestamp(); offset = consumerRecord.offset(); topic = consumerRecord.topic(); + headers = consumerRecord.headers(); } Record(final Object key, final Object value, + final Headers headers, final long timestamp, final long offset, final String topic) { this.key = key; this.value = value; + this.headers = headers; this.timestamp = timestamp; this.offset = offset; this.topic = topic; @@ -146,12 +156,13 @@ public class TopologyTestDriverTest { offset == record.offset && Objects.equals(key, record.key) && Objects.equals(value, record.value) && - Objects.equals(topic, record.topic); + Objects.equals(topic, record.topic) && + Objects.equals(headers, record.headers); } @Override public int hashCode() { - return Objects.hash(key, value, timestamp, offset, topic); + return Objects.hash(key, value, headers, timestamp, offset, topic); } } @@ -201,7 +212,7 @@ public class TopologyTestDriverTest { @Override public void process(Object key, Object value) { - processedRecords.add(new Record(key, value, context.timestamp(), context.offset(), context.topic())); + processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic())); context.forward(key, value); } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java index 469d241..855aa9f 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java @@ -63,12 +63,17 @@ public class ConsumerRecordFactoryTest { } @Test(expected = NullPointerException.class) + public void shouldNotAllowToCreateTopicWithNullHeaders() { + factory.create(topicName, rawKey, value, null, timestamp); + } + + @Test(expected = NullPointerException.class) public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() { factory.create(null, rawKey, value); } @Test(expected = NullPointerException.class) - public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() { + public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey() { factory.create((String) null, value, timestamp); } -- To stop receiving notification emails like this one, please contact mj...@apache.org.