[ 
https://issues.apache.org/jira/browse/KAFKA-6850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484711#comment-16484711
 ] 

ASF GitHub Bot commented on KAFKA-6850:
---------------------------------------

mjsax closed pull request #4955: KAFKA-6850: Add Record Header support to Kafka 
Streams Processor API (KIP-244)
URL: https://github.com/apache/kafka/pull/4955
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79d191ce672..f2a9f64359b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -197,6 +198,12 @@ Cancellable schedule(final long intervalMs,
      */
     long offset();
 
+    /**
+     * Returns the headers of the current input record
+     * @return the headers
+     */
+    Headers headers();
+
     /**
      * Returns the current timestamp.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 96874776e6e..33386696eea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -142,6 +143,15 @@ public long offset() {
         return recordContext.offset();
     }
 
+    @Override
+    public Headers headers() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as 
headers() should only be called while a record is processed");
+        }
+
+        return recordContext.headers();
+    }
+
     /**
      * @throws IllegalStateException if timestamp is null
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 35a0a7ebe1b..7e2610cc8fe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -132,6 +133,11 @@ public long offset() {
         return delegate.offset();
     }
 
+    @Override
+    public Headers headers() {
+        return delegate.headers();
+    }
+
     @Override
     public long timestamp() {
         return delegate.timestamp();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 26bf4933d61..d38771362ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -89,7 +89,8 @@ public void update(final ConsumerRecord<byte[], byte[]> 
record) {
                 new ProcessorRecordContext(deserialized.timestamp(),
                     deserialized.offset(),
                     deserialized.partition(),
-                    deserialized.topic());
+                    deserialized.topic(),
+                    deserialized.headers());
             processorContext.setRecordContext(recordContext);
             
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
             sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), 
deserialized.value());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 92acfc9a50c..c0715259fcf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.util.Objects;
 
 public class ProcessorRecordContext implements RecordContext {
@@ -24,16 +26,19 @@
     private final long offset;
     private final String topic;
     private final int partition;
+    private final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
                                   final int partition,
-                                  final String topic) {
+                                  final String topic,
+                                  final Headers headers) {
 
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
+        this.headers = headers;
     }
 
     public long offset() {
@@ -58,6 +63,11 @@ public int partition() {
         return partition;
     }
 
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
@@ -66,11 +76,12 @@ public boolean equals(final Object o) {
         return timestamp == that.timestamp &&
                 offset == that.offset &&
                 partition == that.partition &&
-                Objects.equals(topic, that.topic);
+                Objects.equals(topic, that.topic) &&
+                Objects.equals(headers, that.headers);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, offset, topic, partition);
+        return Objects.hash(timestamp, offset, topic, partition, headers);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index b0838691814..bf10da2b5e7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
@@ -28,6 +29,7 @@
     <K, V> void send(final String topic,
                      final K key,
                      final V value,
+                     final Headers headers,
                      final Integer partition,
                      final Long timestamp,
                      final Serializer<K> keySerializer,
@@ -36,6 +38,7 @@
     <K, V> void send(final String topic,
                      final K key,
                      final V value,
+                     final Headers headers,
                      final Long timestamp,
                      final Serializer<K> keySerializer,
                      final Serializer<V> valueSerializer,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 8167539f450..1c8b0a09f48 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,6 +34,7 @@
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -77,6 +78,7 @@ public RecordCollectorImpl(final Producer<byte[], byte[]> 
producer,
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer,
@@ -93,7 +95,7 @@ public RecordCollectorImpl(final Producer<byte[], byte[]> 
producer,
             }
         }
 
-        send(topic, key, value, partition, timestamp, keySerializer, 
valueSerializer);
+        send(topic, key, value, headers, partition, timestamp, keySerializer, 
valueSerializer);
     }
 
     private boolean productionExceptionIsFatal(final Exception exception) {
@@ -142,6 +144,7 @@ private boolean productionExceptionIsFatal(final Exception 
exception) {
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Integer partition,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
@@ -150,7 +153,7 @@ private boolean productionExceptionIsFatal(final Exception 
exception) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
         final byte[] valBytes = valueSerializer.serialize(topic, value);
 
-        final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
+        final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
         try {
             producer.send(serializedRecord, new Callback() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dd58f4c02a8..15add71d89a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.Processor;
 
 /**
@@ -47,4 +48,10 @@
      * @return The partition the record was received on
      */
     int partition();
+
+    /**
+     * @return The headers from the record received from Kafka
+     */
+    Headers headers();
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 36e2c9a8e58..ade96649fe4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -63,7 +63,7 @@
                 rawRecord.serializedKeySize(),
                 rawRecord.serializedValueSize(),
                 sourceNode.deserializeKey(rawRecord.topic(), 
rawRecord.headers(), rawRecord.key()),
-                sourceNode.deserializeValue(rawRecord.topic(), 
rawRecord.headers(), rawRecord.value()));
+                sourceNode.deserializeValue(rawRecord.topic(), 
rawRecord.headers(), rawRecord.value()), rawRecord.headers());
         } catch (final Exception deserializationException) {
             final 
DeserializationExceptionHandler.DeserializationHandlerResponse response;
             try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 0fbd6dce62f..7711905423e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -84,7 +84,7 @@ public void process(final K key, final V value) {
         }
 
         try {
-            collector.send(topic, key, value, timestamp, keySerializer, 
valSerializer, partitioner);
+            collector.send(topic, key, value, context.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
         } catch (final ClassCastException e) {
             final String keyClass = key == null ? "unknown because key is 
null" : key.getClass().getName();
             final String valueClass = value == null ? "unknown because value 
is null" : value.getClass().getName();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
index 243c41a8e8b..aa9b79d2c13 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
 
 public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
 
@@ -44,6 +45,10 @@ public long offset() {
         return value.offset();
     }
 
+    public Headers headers() {
+        return value.headers();
+    }
+
     @Override
     public String toString() {
         return value.toString() + ", timestamp = " + timestamp;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 6aeca44af08..14f986c1c9b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
@@ -40,6 +41,7 @@
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Integer partition,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
@@ -50,6 +52,7 @@
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 633e7ad295b..e2be3e29172 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -343,7 +343,13 @@ public void punctuate(final ProcessorNode node, final long 
timestamp, final Punc
     }
 
     private void updateProcessorContext(final StampedRecord record, final 
ProcessorNode currNode) {
-        processorContext.setRecordContext(new 
ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), 
record.topic()));
+        processorContext.setRecordContext(
+            new ProcessorRecordContext(
+                record.timestamp,
+                record.offset(),
+                record.partition(),
+                record.topic(),
+                record.headers()));
         processorContext.setCurrentNode(currNode);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 525e92df22f..285bde5aedf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -224,8 +224,17 @@ public void put(final Bytes key, final byte[] value) {
     }
 
     private void putInternal(final Bytes key, final byte[] value) {
-        cache.put(cacheName, key, new LRUCacheEntry(value, true, 
context.offset(),
-              context.timestamp(), context.partition(), context.topic()));
+        cache.put(
+            cacheName,
+            key,
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
+                context.topic()));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 1bb2ea750c5..c099fafc269 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -141,8 +141,15 @@ public void remove(final Windowed<Bytes> sessionKey) {
     public void put(final Windowed<Bytes> key, byte[] value) {
         validateStoreOpen();
         final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key));
-        final LRUCacheEntry entry = new LRUCacheEntry(value, true, 
context.offset(),
-                                                      key.window().end(), 
context.partition(), context.topic());
+        final LRUCacheEntry entry =
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                key.window().end(),
+                context.partition(),
+                context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 7e58b684f23..ca24ffd4e4d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -150,8 +150,15 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long tim
         validateStoreOpen();
         
         final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, 
timestamp, 0);
-        final LRUCacheEntry entry = new LRUCacheEntry(value, true, 
context.offset(),
-                                                      timestamp, 
context.partition(), context.topic());
+        final LRUCacheEntry entry =
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                timestamp,
+                context.partition(),
+                context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index af7059b30e3..78c0331a5af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 
 /**
@@ -24,6 +25,7 @@
 class LRUCacheEntry implements RecordContext {
 
     public final byte[] value;
+    private final Headers headers;
     private final long offset;
     private final String topic;
     private final int partition;
@@ -33,13 +35,18 @@
     private boolean isDirty;
 
     LRUCacheEntry(final byte[] value) {
-        this(value, false, -1, -1, -1, "");
+        this(value, null, false, -1, -1, -1, "");
     }
 
-    LRUCacheEntry(final byte[] value, final boolean isDirty,
-                  final long offset, final long timestamp, final int partition,
+    LRUCacheEntry(final byte[] value,
+                  final Headers headers,
+                  final boolean isDirty,
+                  final long offset,
+                  final long timestamp,
+                  final int partition,
                   final String topic) {
         this.value = value;
+        this.headers = headers;
         this.partition = partition;
         this.topic = topic;
         this.offset = offset;
@@ -78,6 +85,11 @@ public int partition() {
         return partition;
     }
 
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
     void markClean() {
         isDirty = false;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 1055df56534..a8a04c67470 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -55,7 +55,8 @@ void logChange(final K key, final V value) {
         if (collector != null) {
             final Serializer<K> keySerializer = serialization.keySerializer();
             final Serializer<V> valueSerializer = 
serialization.valueSerializer();
-            collector.send(this.topic, key, value, this.partition, 
context.timestamp(), keySerializer, valueSerializer);
+            // Sending null headers to changelog topics (KIP-244)
+            collector.send(this.topic, key, value, null, this.partition, 
context.timestamp(), keySerializer, valueSerializer);
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d306ee4607e..fe897c7ac30 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -91,6 +92,12 @@ public static void purgeLocalStreamsState(final Properties 
streamsConfiguration)
         IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, time, false);
     }
 
+    public static <K, V> void produceKeyValuesSynchronously(
+        final String topic, final Collection<KeyValue<K, V>> records, final 
Properties producerConfig, final Headers headers, final Time time)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, headers, time, false);
+    }
+
     /**
      * @param topic               Kafka topic to write the data records to
      * @param records             Data records to write to Kafka
@@ -102,10 +109,21 @@ public static void purgeLocalStreamsState(final 
Properties streamsConfiguration)
     public static <K, V> void produceKeyValuesSynchronously(
         final String topic, final Collection<KeyValue<K, V>> records, final 
Properties producerConfig, final Time time, final boolean enableTransactions)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, null, time, enableTransactions);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronously(final String topic,
+                                                            final 
Collection<KeyValue<K, V>> records,
+                                                            final Properties 
producerConfig,
+                                                            final Headers 
headers,
+                                                            final Time time,
+                                                            final boolean 
enableTransactions)
+        throws ExecutionException, InterruptedException {
         for (final KeyValue<K, V> record : records) {
             produceKeyValuesSynchronouslyWithTimestamp(topic,
                 Collections.singleton(record),
                 producerConfig,
+                headers,
                 time.milliseconds(),
                 enableTransactions);
             time.sleep(1L);
@@ -120,23 +138,42 @@ public static void purgeLocalStreamsState(final 
Properties streamsConfiguration)
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, 
records, producerConfig, timestamp, false);
     }
 
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
+                                                                         final 
Collection<KeyValue<K, V>> records,
+                                                                         final 
Properties producerConfig,
+                                                                         final 
Headers headers,
+                                                                         final 
Long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, 
records, producerConfig, headers, timestamp, false);
+    }
+
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
                                                                          final 
Collection<KeyValue<K, V>> records,
                                                                          final 
Properties producerConfig,
                                                                          final 
Long timestamp,
                                                                          final 
boolean enableTransactions)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, 
records, producerConfig, null, timestamp, enableTransactions);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
+                                                                         final 
Collection<KeyValue<K, V>> records,
+                                                                         final 
Properties producerConfig,
+                                                                         final 
Headers headers,
+                                                                         final 
Long timestamp,
+                                                                         final 
boolean enabledTransactions)
+        throws ExecutionException, InterruptedException {
         try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
-            if (enableTransactions) {
+            if (enabledTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
                 final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value));
+                    new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
                 f.get();
             }
-            if (enableTransactions) {
+            if (enabledTransactions) {
                 producer.commitTransaction();
             }
             producer.flush();
@@ -194,6 +231,12 @@ public static void waitForCompletion(final KafkaStreams 
streams,
         }
     }
 
+    public static <K, V> List<ConsumerRecord<K, V>> 
waitUntilMinRecordsReceived(final Properties consumerConfig,
+                                                                               
 final String topic,
+                                                                               
 final int expectedNumRecords) throws InterruptedException {
+        return waitUntilMinRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
+    }
+
     public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                
   final String topic,
                                                                                
   final int expectedNumRecords) throws InterruptedException {
@@ -232,6 +275,27 @@ public boolean conditionMet() {
         return accumData;
     }
 
+    public static <K, V> List<ConsumerRecord<K, V>> 
waitUntilMinRecordsReceived(final Properties consumerConfig,
+                                                                               
 final String topic,
+                                                                               
 final int expectedNumRecords,
+                                                                               
 final long waitTime) throws InterruptedException {
+        final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<ConsumerRecord<K, V>> readData =
+                        readRecords(topic, consumer, waitTime, 
expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + 
expectedNumRecords + " records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final 
Properties consumerConfig,
                                                                 final String 
topic,
                                                                 final int 
expectedNumRecords) throws InterruptedException {
@@ -380,21 +444,33 @@ public boolean conditionMet() {
                                                              final Consumer<K, 
V> consumer,
                                                              final long 
waitTime,
                                                              final int 
maxMessages) {
-        final List<KeyValue<K, V>> consumedValues;
+        final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+        final List<ConsumerRecord<K, V>> records = readRecords(topic, 
consumer, waitTime, maxMessages);
+        for (final ConsumerRecord<K, V> record : records) {
+            consumedValues.add(new KeyValue<>(record.key(), record.value()));
+        }
+        return consumedValues;
+    }
+
+    private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String 
topic,
+                                                                 final 
Consumer<K, V> consumer,
+                                                                 final long 
waitTime,
+                                                                 final int 
maxMessages) {
+        final List<ConsumerRecord<K, V>> consumerRecords;
         consumer.subscribe(Collections.singletonList(topic));
         final int pollIntervalMs = 100;
-        consumedValues = new ArrayList<>();
+        consumerRecords = new ArrayList<>();
         int totalPollTimeMs = 0;
         while (totalPollTimeMs < waitTime &&
-            continueConsuming(consumedValues.size(), maxMessages)) {
+            continueConsuming(consumerRecords.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = 
consumer.poll(pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
-                consumedValues.add(new KeyValue<>(record.key(), 
record.value()));
+                consumerRecords.add(record);
             }
         }
-        return consumedValues;
+        return consumerRecords;
     }
 
     private static boolean continueConsuming(final int messagesConsumed, final 
int maxMessages) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index e5acd01e051..1517f0e9b6b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -531,7 +531,7 @@ private void processData(final TopologyTestDriver driver) {
         driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
         driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
         driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
-        driver.pipeInput(recordFactory.create(TOPIC, "3", null));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", (String) null));
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> 
results) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index ae76362c71a..d5c5a54c0b4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -106,7 +106,7 @@ private void pushToGlobalTable(final int messageCount, 
final String valuePrefix)
     private void pushNullValueToGlobalTable(final int messageCount) {
         final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + 
expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + 
expectedKeys[i], (String) null));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 95fe8b97099..248c3eed138 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -108,7 +108,7 @@ private void pushToGlobalTable(final int messageCount, 
final String valuePrefix)
     private void pushNullValueToGlobalTable(final int messageCount) {
         final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + 
expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + 
expectedKeys[i], (String) null));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 43eaf3b2e94..6ffce04c837 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -97,7 +97,7 @@ private void pushToTable(final int messageCount, final String 
valuePrefix) {
 
     private void pushNullValueToTable() {
         for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], 
null));
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], 
(String) null));
         }
     }
 
@@ -205,7 +205,7 @@ public void shouldLogAndMeterWhenSkippingNullLeftKey() {
     @Test
     public void shouldLogAndMeterWhenSkippingNullLeftValue() {
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
-        driver.pipeInput(recordFactory.create(streamTopic, 1, null));
+        driver.pipeInput(recordFactory.create(streamTopic, 1, (String) null));
         LogCaptureAppender.unregister(appender);
 
         assertEquals(1.0, getMetricByName(driver.metrics(), 
"skipped-records-total", "stream-metrics").metricValue());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 251e58e3a9e..1c3e027ff9f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -93,7 +93,7 @@ private void pushToTable(final int messageCount, final String 
valuePrefix) {
 
     private void pushNullValueToTable(final int messageCount) {
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], 
null));
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], 
(String) null));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index afc9be12c3f..6b5e5773b20 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -308,7 +308,7 @@ public void 
shouldImmediatelyForwardRemovedSessionsWhenMerging() {
     public void shouldLogAndMeterWhenSkippingNullKey() {
         initStore(false);
         processor.init(context);
-        context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, 
"topic"));
+        context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, 
"topic", null));
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         processor.process(null, "1");
         LogCaptureAppender.unregister(appender);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 0c5653171b2..cd29b50f668 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -361,7 +361,7 @@ public void 
shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index ef64f758b71..9be61899946 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -424,7 +424,7 @@ public void 
shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index e897ec30a93..3995fcf8f93 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -366,7 +366,7 @@ public void 
shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index d7411cb38d7..d4805a20897 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -41,7 +41,7 @@ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() 
{
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 7c12dad2e64..be20c864a35 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -325,7 +325,7 @@ public void shouldTransformValuesWithKey() {
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
-        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 
0L));
 
         assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
         assertThat("Store should not be materialized", 
driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
@@ -349,7 +349,7 @@ public void shouldTransformValuesWithKeyAndMaterialize() {
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
-        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 
0L));
 
         assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!"));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 86806b2fe5a..9aaa8a6147a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
@@ -44,7 +48,8 @@
     private final MockStreamsMetrics metrics = new MockStreamsMetrics(new 
Metrics());
     private final AbstractProcessorContext context = new 
TestProcessorContext(metrics);
     private final MockStateStore stateStore = new MockStateStore("store", 
false);
-    private final RecordContext recordContext = new RecordContextStub(10, 
System.currentTimeMillis(), 1, "foo");
+    private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
+    private final RecordContext recordContext = new RecordContextStub(10, 
System.currentTimeMillis(), 1, "foo", headers);
 
     @Before
     public void before() {
@@ -141,6 +146,27 @@ public void shouldReturnTimestampFromRecordContext() {
         assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
     }
 
+    @Test
+    public void shouldReturnHeadersFromRecordContext() {
+        assertThat(context.headers(), equalTo(recordContext.headers()));
+    }
+
+    @Test
+    public void shouldReturnNullIfHeadersAreNotSet() {
+        context.setRecordContext(new RecordContextStub(0, 0, 0, 
AbstractProcessorContext.NONEXIST_TOPIC));
+        assertThat(context.headers(), nullValue());
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() {
+        context.setRecordContext(null);
+        try {
+            context.headers();
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void appConfigsShouldReturnParsedValues() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index e2476479cee..033c0e13825 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -18,6 +18,10 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -64,6 +68,9 @@
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
+    private static final Header HEADER = new RecordHeader("key", 
"value".getBytes());
+    private static final Headers HEADERS = new RecordHeaders(new 
Header[]{HEADER});
+
     private final TopologyWrapper topology = new TopologyWrapper();
     private final MockProcessorSupplier mockProcessorSupplier = new 
MockProcessorSupplier();
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
@@ -71,6 +78,7 @@
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
 
+
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this 
test, enabling running tests in parallel ...
@@ -338,10 +346,33 @@ public void shouldConsiderModifiedTimeStamps() {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 
40L);
     }
 
+    @Test
+    public void shouldConsiderHeaders() {
+        final int partition = 10;
+        driver = new TopologyTestDriver(createSimpleTopology(partition), 
props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 
HEADERS, 10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 
HEADERS, 20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 
HEADERS, 30L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 
partition, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 
partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 
partition, 30L);
+    }
+
+    @Test
+    public void shouldAddHeaders() {
+        driver = new TopologyTestDriver(createAddHeaderTopology(), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 
10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 
20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 
30L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
+    }
+
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value) {
-        assertNextOutputRecord(topic, key, value, null, 0L);
+        assertNextOutputRecord(topic, key, value, (Integer) null, 0L);
     }
 
     private void assertNextOutputRecord(final String topic,
@@ -351,17 +382,35 @@ private void assertNextOutputRecord(final String topic,
         assertNextOutputRecord(topic, key, value, partition, 0L);
     }
 
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Headers headers,
+                                        final Long timestamp) {
+        assertNextOutputRecord(topic, key, value, headers, null, timestamp);
+    }
+
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value,
                                         final Integer partition,
                                         final Long timestamp) {
+        assertNextOutputRecord(topic, key, value, new RecordHeaders(), 
partition, timestamp);
+    }
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Headers headers,
+                                        final Integer partition,
+                                        final Long timestamp) {
         final ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
         assertEquals(value, record.value());
         assertEquals(partition, record.partition());
         assertEquals(timestamp, record.timestamp());
+        assertEquals(headers, record.headers());
     }
 
     private void assertNoOutputRecord(final String topic) {
@@ -458,6 +507,12 @@ private Topology createSimpleMultiSourceTopology(int 
partition) {
                 .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2");
     }
 
+    private Topology createAddHeaderTopology() {
+        return topology.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+                .addProcessor("processor-1", define(new AddHeaderProcessor()), 
"source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1");
+    }
+
     /**
      * A processor that simply forwards all messages to all children.
      */
@@ -478,6 +533,14 @@ public void process(final String key, final String value) {
         }
     }
 
+    protected static class AddHeaderProcessor extends 
AbstractProcessor<String, String> {
+        @Override
+        public void process(final String key, final String value) {
+            context().headers().add(HEADER);
+            context().forward(key, value);
+        }
+    }
+
     /**
      * A processor that removes custom timestamp information from messages and 
forwards modified messages to each child.
      * A message contains custom timestamp information if the value is in 
".*@[0-9]+" format.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 8a2f1713fe9..e439372d130 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -27,6 +27,10 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Sum;
@@ -85,14 +89,16 @@ public void testSpecificPartition() {
             new Metrics().sensor("skipped-records")
         );
 
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, 
stringSerializer);
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, 
stringSerializer);
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, 
stringSerializer);
+        final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
 
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, 
stringSerializer);
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, 
stringSerializer);
 
-        collector.send("topic1", "999", "0", 2, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", headers, 1, null, 
stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", headers, 1, null, 
stringSerializer, stringSerializer);
+
+        collector.send("topic1", "999", "0", headers, 2, null, 
stringSerializer, stringSerializer);
 
         final Map<TopicPartition, Long> offsets = collector.offsets();
 
@@ -101,9 +107,9 @@ public void testSpecificPartition() {
         assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
 
         // ignore StreamPartitioner
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, 
stringSerializer);
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, 
stringSerializer);
-        collector.send("topic1", "999", "0", 2, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", null, 1, null, stringSerializer, 
stringSerializer);
+        collector.send("topic1", "999", "0", headers, 2, null, 
stringSerializer, stringSerializer);
 
         assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
         assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -121,17 +127,19 @@ public void testStreamPartitioner() {
             new Metrics().sensor("skipped-records")
         );
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "9", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "27", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "81", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "243", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
+
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "9", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "27", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "81", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "243", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "28", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "82", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
-        collector.send("topic1", "244", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "28", "0", headers, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "82", "0", headers, null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "244", "0", headers, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "245", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "245", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         final Map<TopicPartition, Long> offsets = collector.offsets();
 
@@ -155,7 +163,7 @@ public void 
shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException(
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -174,10 +182,10 @@ public void 
shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultEx
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         try {
-            collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+            collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
             fail("Should have thrown StreamsException");
         } catch (final StreamsException expected) { /* ok */ }
     }
@@ -198,9 +206,9 @@ public void 
shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -223,7 +231,7 @@ public void 
shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExcepti
             logContext,
             new AlwaysContinueProductionExceptionHandler(),
             sensor);
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
         assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
         assertTrue(logCaptureAppender.getMessages().contains("test Error 
sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and 
partition=[0]; The exception handler chose to CONTINUE processing in spite of 
this error."));
         LogCaptureAppender.unregister(logCaptureAppender);
@@ -245,7 +253,7 @@ public void 
shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionH
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         try {
             collector.flush();
@@ -269,7 +277,7 @@ public void 
shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExcept
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         collector.flush();
     }
@@ -290,7 +298,7 @@ public void 
shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionH
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         try {
             collector.close();
@@ -314,7 +322,7 @@ public void 
shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExcept
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
 
         collector.close();
     }
@@ -334,7 +342,7 @@ public void 
shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() {
             logContext,
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -352,6 +360,6 @@ public void 
shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
             logContext,
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 0af5e17980c..7afd51eb06d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -16,25 +16,37 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
+
 public class RecordContextStub implements RecordContext {
 
     private final long offset;
     private long timestamp;
     private final int partition;
     private final String topic;
+    private final Headers headers;
 
     public RecordContextStub() {
-        this(-1, -1, -1, "");
+        this(-1, -1, -1, "", null);
     }
 
     public RecordContextStub(final long offset,
                              final long timestamp,
                              final int partition,
-                             final String topic) {
+                             final String topic,
+                             final Headers headers) {
         this.offset = offset;
         this.timestamp = timestamp;
         this.partition = partition;
         this.topic = topic;
+        this.headers = headers;
+    }
+
+    public RecordContextStub(final long offset,
+                             final long timestamp,
+                             final int partition,
+                             final String topic) {
+        this(offset, timestamp, partition, topic, null);
     }
 
     @Override
@@ -61,4 +73,9 @@ public String topic() {
     public int partition() {
         return partition;
     }
+
+    @Override
+    public Headers headers() {
+        return headers;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index de8e17bb954..36988c0175b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -17,7 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
@@ -29,16 +32,18 @@
 
 public class RecordDeserializerTest {
 
+    private final RecordHeaders headers = new RecordHeaders(new Header[] {new 
RecordHeader("key", "value".getBytes())});
     private final ConsumerRecord<byte[], byte[]> rawRecord = new 
ConsumerRecord<>("topic",
         1,
         1,
         10,
         TimestampType.LOG_APPEND_TIME,
-        5,
+        5L,
         3,
         5,
         new byte[0],
-        new byte[0]);
+        new byte[0],
+        headers);
 
 
     @SuppressWarnings("deprecation")
@@ -63,6 +68,7 @@ public void 
shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions()
         assertEquals("value", record.value());
         assertEquals(rawRecord.timestamp(), record.timestamp());
         assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+        assertEquals(rawRecord.headers(), record.headers());
     }
 
     static class TheSourceNode extends SourceNode<Object, Object> {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4e80fa7d81d..4b9e6a16eaf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -198,6 +199,7 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> 
serdes) {
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Integer partition,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
@@ -214,6 +216,7 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> 
serdes) {
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 3e0241e3043..2f6aac79d0b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -75,7 +75,8 @@ public void setUp() {
         cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(null, null, null, 
(RecordCollector) null, cache);
         topic = "topic";
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
+        context.setRecordContext(
+            new ProcessorRecordContext(10, 0, 0, topic, null));
         store.init(context, null);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index b77f4e979d5..baa9ee49b44 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -76,7 +76,7 @@ public void setUp() {
                                                  );
         cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), 
null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic"));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic", null));
         cachingStore.init(context, cachingStore);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index a87b2e41890..b8808caf475 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -81,7 +81,7 @@ public void setUp() {
         cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), 
null, null, (RecordCollector) null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, topic));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, topic, null));
         cachingStore.init(context, cachingStore);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5bb0de7a0eb..7f5a08e598d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -57,6 +58,7 @@ public void before() {
             public <K, V> void send(final String topic,
                                     K key,
                                     V value,
+                                    Headers headers,
                                     Integer partition,
                                     Long timestamp,
                                     Serializer<K> keySerializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index a658186931f..edcaa05e32b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -49,6 +50,7 @@
         public <K, V> void send(final String topic,
                                 K key,
                                 V value,
+                                Headers headers,
                                 Integer partition,
                                 Long timestamp,
                                 Serializer<K> keySerializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 956172ebd46..e56887e410a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -47,6 +48,7 @@
         public <K, V> void send(final String topic,
                                 K key,
                                 V value,
+                                Headers headers,
                                 Integer partition,
                                 Long timestamp,
                                 Serializer<K> keySerializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 9ae0feb1850..92653ce69b2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -42,6 +46,7 @@
 
 public class NamedCacheTest {
 
+    private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
     private StreamsMetricsImpl metrics;
     private final String taskIDString = "0.0";
@@ -64,7 +69,7 @@ public void 
shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOExcepti
         for (int i = 0; i < toInsert.size(); i++) {
             byte[] key = toInsert.get(i).key.getBytes();
             byte[] value = toInsert.get(i).value.getBytes();
-            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, true, 1, 1, 1, 
""));
+            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 
1, 1, ""));
             LRUCacheEntry head = cache.first();
             LRUCacheEntry tail = cache.last();
             assertEquals(new String(head.value), toInsert.get(i).value);
@@ -170,9 +175,9 @@ public void shouldEvictEldestEntry() {
     @Test
     public void shouldFlushDirtEntriesOnEviction() {
         final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
headers, true, 0, 0, 0, ""));
 
         cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -185,6 +190,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
 
         assertEquals(2, flushed.size());
         assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
+        assertEquals(headers, flushed.get(0).recordContext().headers());
         assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
         assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
         assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());
@@ -193,9 +199,9 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
 
     @Test
     public void shouldGetRangeIteratorOverKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
null, true, 0, 0, 0, ""));
 
         final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new 
byte[]{1}), Bytes.wrap(new byte[]{2}));
         assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
@@ -205,9 +211,9 @@ public void shouldGetRangeIteratorOverKeys() {
 
     @Test
     public void shouldGetIteratorOverAllKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
null, true, 0, 0, 0, ""));
 
         final Iterator<Bytes> iterator = cache.allKeys();
         assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
@@ -223,8 +229,8 @@ public void 
shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() {
 
     @Test(expected = IllegalStateException.class)
     public void 
shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() 
{
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
false, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
null, false, 0, 0, 0, ""));
     }
 
     @Test
@@ -235,8 +241,8 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
                 // no-op
             }
         });
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 
0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, 
true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, 
null, true, 0, 0, 0, ""));
         cache.flush();
         assertEquals(1, cache.size());
         assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -244,7 +250,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
 
     @Test
     public void shouldBeReentrantAndNotBreakLRU() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 
0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, 
true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         cache.put(Bytes.wrap(new byte[]{0}), dirty);
         cache.put(Bytes.wrap(new byte[]{1}), clean);
@@ -290,7 +296,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
 
     @Test
     public void 
shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey()
 {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 
0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, 
true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         final Bytes key = Bytes.wrap(new byte[] {3});
         cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 92edbd8d31c..be4ede8ae91 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -90,6 +91,7 @@
         public <K1, V1> void send(final String topic,
                                   final K1 key,
                                   final V1 value,
+                                  final Headers headers,
                                   final Integer partition,
                                   final Long timestamp,
                                   final Serializer<K1> keySerializer,
@@ -160,7 +162,7 @@ public void shouldOnlyIterateOpenSegments() {
     }
 
     private ProcessorRecordContext createRecordContext(final long time) {
-        return new ProcessorRecordContext(time, 0, 0, "topic");
+        return new ProcessorRecordContext(time, 0, 0, "topic", null);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 6bacd910cce..5afe14f8a0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -38,6 +40,7 @@
     private final String topic = "topic";
 
     private final Map<Integer, String> logged = new HashMap<>();
+    private final Map<Integer, Headers> loggedHeaders = new HashMap<>();
 
     private final InternalMockProcessorContext context = new 
InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, 
String.class),
         new RecordCollectorImpl(null, "StoreChangeLoggerTest", new 
LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), 
new Metrics().sensor("skipped-records")) {
@@ -45,17 +48,20 @@
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Integer partition,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer) {
                 logged.put((Integer) key, (String) value);
+                loggedHeaders.put((Integer) key, headers);
             }
 
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer,
@@ -80,6 +86,13 @@ public void testAddRemove() {
 
         changeLogger.logChange(0, null);
         assertNull(logged.get(0));
+    }
+
+    @Test
+    public void shouldNotSendRecordHeadersToChangelogTopic() {
+        context.headers().add(new RecordHeader("key", "value".getBytes()));
+        changeLogger.logChange(0, "zero");
 
+        assertNull(loggedHeaders.get(0));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 164e71ed664..d100ae5372b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -59,7 +59,7 @@ public void basicPutGet() throws IOException {
         for (KeyValue<String, String> kvToInsert : toInsert) {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 
1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 
1L, 1, ""));
         }
 
         for (KeyValue<String, String> kvToInsert : toInsert) {
@@ -89,7 +89,7 @@ private void checkOverheads(double entryFactor, double 
systemFactor, long desire
             String keyStr = "K" + i;
             Bytes key = Bytes.wrap(keyStr.getBytes());
             byte[] value = new byte[valueSizeBytes];
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 
1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 
1L, 1, ""));
         }
 
 
@@ -171,7 +171,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
         for (KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1, 1, 1, 
""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1, 
1, 1, ""));
         }
 
         for (int i = 0; i < expected.size(); i++) {
@@ -520,7 +520,7 @@ public void shouldCalculateSizeInBytes() {
     }
 
     private LRUCacheEntry dirtyEntry(final byte[] key) {
-        return new LRUCacheEntry(key, true, -1, -1, -1, "");
+        return new LRUCacheEntry(key, null, true, -1, -1, -1, "");
     }
 
     private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index eb72e13d80b..e5571eb43c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
@@ -242,7 +244,7 @@ public void commit() { }
     // and also not throwing exceptions if record context is not available.
     public void setTime(final long timestamp) {
         if (recordContext != null) {
-            recordContext = new ProcessorRecordContext(timestamp, 
recordContext.offset(), recordContext.partition(), recordContext.topic());
+            recordContext = new ProcessorRecordContext(timestamp, 
recordContext.offset(), recordContext.partition(), recordContext.topic(), 
recordContext.headers());
         }
         this.timestamp = timestamp;
     }
@@ -279,6 +281,14 @@ public long offset() {
         return recordContext.offset();
     }
 
+    @Override
+    public Headers headers() {
+        if (recordContext == null) {
+            return new RecordHeaders();
+        }
+        return recordContext.headers();
+    }
+
     Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2c3461a8ad4..698cdc7ff85 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -90,7 +91,7 @@ public void setUp(final StreamsBuilder builder,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new 
MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(stateDir, keySerde, 
valSerde, new MockRecordCollector(), cache);
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", 
null));
 
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
@@ -229,7 +230,7 @@ private void closeState() {
     }
 
     private ProcessorRecordContext createRecordContext(final String topicName, 
final long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, topicName);
+        return new ProcessorRecordContext(timestamp, -1, -1, topicName, null);
     }
 
     private class MockRecordCollector extends RecordCollectorImpl {
@@ -241,6 +242,7 @@ private ProcessorRecordContext createRecordContext(final 
String topicName, final
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer,
@@ -255,6 +257,7 @@ private ProcessorRecordContext createRecordContext(final 
String topicName, final
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Integer partition,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 66271a0115e..893d3566c6a 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -30,6 +31,7 @@
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Integer partition,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
@@ -39,6 +41,7 @@
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c237ca77e1d..e46ec6a35d0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -227,7 +227,7 @@ public TopologyTestDriver(final Topology topology,
      * @param config the configuration for the topology
      */
     TopologyTestDriver(final InternalTopologyBuilder builder,
-                              final Properties config) {
+                       final Properties config) {
         this(builder, config,  System.currentTimeMillis());
 
     }
@@ -382,14 +382,16 @@ public void pipeInput(final ConsumerRecord<byte[], 
byte[]> consumerRecord) {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                ConsumerRecord.NULL_CHECKSUM,
+                (long) ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
-                consumerRecord.value())));
+                consumerRecord.value(),
+                consumerRecord.headers())));
 
             // Process the record ...
-            ((InternalProcessorContext) task.context()).setRecordContext(new 
ProcessorRecordContext(consumerRecord.timestamp(), offset, 
topicPartition.partition(), topicName));
+            ((InternalProcessorContext) task.context()).setRecordContext(
+                    new ProcessorRecordContext(consumerRecord.timestamp(), 
offset, topicPartition.partition(), topicName, consumerRecord.headers()));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
@@ -407,11 +409,12 @@ public void pipeInput(final ConsumerRecord<byte[], 
byte[]> consumerRecord) {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                ConsumerRecord.NULL_CHECKSUM,
+                (long) ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
-                consumerRecord.value()));
+                consumerRecord.value(),
+                consumerRecord.headers()));
             globalStateTask.flushState();
         }
     }
@@ -467,7 +470,8 @@ private void captureOutputRecords() {
                     serializedKey == null ? 0 : serializedKey.length,
                     serializedValue == null ? 0 : serializedValue.length,
                     serializedKey,
-                    serializedValue));
+                    serializedValue,
+                    record.headers()));
             }
         }
     }
@@ -536,7 +540,7 @@ public void advanceWallClockTime(final long advanceMs) {
         }
         final K key = keyDeserializer.deserialize(record.topic(), 
record.key());
         final V value = valueDeserializer.deserialize(record.topic(), 
record.value());
-        return new ProducerRecord<>(record.topic(), record.partition(), 
record.timestamp(), key, value);
+        return new ProducerRecord<>(record.topic(), record.partition(), 
record.timestamp(), key, value, record.headers());
     }
 
     /**
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 3e29cdeabd1..b14a7915dc9 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
@@ -61,6 +62,7 @@
     private String topic;
     private Integer partition;
     private Long offset;
+    private Headers headers;
     private Long timestamp;
 
     // mocks ================================================
@@ -250,10 +252,11 @@ public StreamsMetrics metrics() {
      * @param offset    A record offset
      * @param timestamp A record timestamp
      */
-    public void setRecordMetadata(final String topic, final int partition, 
final long offset, final long timestamp) {
+    public void setRecordMetadata(final String topic, final int partition, 
final long offset, final Headers headers, final long timestamp) {
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
+        this.headers = headers;
         this.timestamp = timestamp;
     }
 
@@ -289,6 +292,9 @@ public void setOffset(final long offset) {
         this.offset = offset;
     }
 
+    public void setHeaders(final Headers headers) {
+        this.headers = headers;
+    }
 
     /**
      * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
@@ -324,6 +330,11 @@ public long offset() {
         return offset;
     }
 
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
     @Override
     public long timestamp() {
         if (timestamp == null) {
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index b0ccd61500e..507249d0d2e 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -18,6 +18,8 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
@@ -153,20 +155,23 @@ public void advanceTimeMs(final long advanceMs) {
     }
 
     /**
-     * Create a {@link ConsumerRecord} with the given topic name, key, value, 
and timestamp.
+     * Create a {@link ConsumerRecord} with the given topic name, key, value, 
headers, and timestamp.
      * Does not auto advance internally tracked time.
      *
      * @param topicName the topic name
      * @param key the record key
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
         Objects.requireNonNull(topicName, "topicName cannot be null.");
+        Objects.requireNonNull(headers, "headers cannot be null.");
         final byte[] serializedKey = keySerializer.serialize(topicName, key);
         final byte[] serializedValue = valueSerializer.serialize(topicName, 
value);
         return new ConsumerRecord<>(
@@ -175,11 +180,29 @@ public void advanceTimeMs(final long advanceMs) {
             -1L,
             timestampMs,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
+            (long) ConsumerRecord.NULL_CHECKSUM,
             serializedKey == null ? 0 : serializedKey.length,
             serializedValue == null ? 0 : serializedValue.length,
             serializedKey,
-            serializedValue);
+            serializedValue,
+            headers);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name and given 
topic, key, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value,
+                                                 final long timestampMs) {
+        return create(topicName, key, value, new RecordHeaders(), timestampMs);
     }
 
     /**
@@ -194,16 +217,33 @@ public void advanceTimeMs(final long advanceMs) {
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final long timestampMs) {
+        return create(key, value, new RecordHeaders(), timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key, 
value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value,
+                                                 final Headers headers,
+                                                 final long timestampMs) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created 
without defaultTopicName. " +
                 "Use #create(String topicName, K key, V value, long 
timestampMs) instead.");
         }
-        return create(topicName, key, value, timestampMs);
+        return create(topicName, key, value, headers, timestampMs);
     }
 
     /**
      * Create a {@link ConsumerRecord} with the given topic name, key, and 
value.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param topicName the topic name
      * @param key the record key
@@ -215,12 +255,31 @@ public void advanceTimeMs(final long advanceMs) {
                                                  final V value) {
         final long timestamp = timeMs;
         timeMs += advanceMs;
-        return create(topicName, key, value, timestamp);
+        return create(topicName, key, value, new RecordHeaders(), timestamp);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name, key, value, 
and headers.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value,
+                                                 final Headers headers) {
+        final long timestamp = timeMs;
+        timeMs += advanceMs;
+        return create(topicName, key, value, headers, timestamp);
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and given key 
and value.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param key the record key
      * @param value the record value
@@ -228,46 +287,110 @@ public void advanceTimeMs(final long advanceMs) {
      */
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value) {
+        return create(key, value, new RecordHeaders());
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key, 
value, and headers.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value,
+                                                 final Headers headers) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created 
without defaultTopicName. " +
                 "Use #create(String topicName, K key, V value) instead.");
         }
-        return create(topicName, key, value);
+        return create(topicName, key, value, headers);
     }
 
     /**
      * Create a {@link ConsumerRecord} with {@code null}-key and the given 
topic name, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value,
+                                                 final long timestampMs) {
+        return create(topicName, null, value, new RecordHeaders(), 
timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given 
topic name, value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
      *
      * @param topicName the topic name
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
-        return create(topicName, null, value, timestampMs);
+        return create(topicName, null, value, headers, timestampMs);
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and {@code 
null}-key as well as given value and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final long timestampMs) {
+        return create(value, new RecordHeaders(), timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code 
null}-key as well as given value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
      *
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created 
without defaultTopicName. " +
                 "Use #create(String topicName, V value, long timestampMs) 
instead.");
         }
-        return create(topicName, value, timestampMs);
+        return create(topicName, value, headers, timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given 
topic name, value, and headers.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value,
+                                                 final Headers headers) {
+        return create(topicName, null, value, headers);
     }
 
     /**
      * Create a {@link ConsumerRecord} with {@code null}-key and the given 
topic name and value.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param topicName the topic name
      * @param value the record value
@@ -275,27 +398,40 @@ public void advanceTimeMs(final long advanceMs) {
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value) {
-        return create(topicName, null, value);
+        return create(topicName, null, value, new RecordHeaders());
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and {@code 
null}-key was well as given value.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final V value) {
+        return create(value, new RecordHeaders());
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code 
null}-key was well as given value and headers.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
+     *
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final Headers headers) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created 
without defaultTopicName. " +
                 "Use #create(String topicName, V value, long timestampMs) 
instead.");
         }
-        return create(topicName, value);
+        return create(topicName, value, headers);
     }
 
     /**
      * Creates {@link ConsumerRecord consumer records} with the given topic 
name, keys, and values.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param topicName the topic name
      * @param keyValues the record keys and values
@@ -314,7 +450,7 @@ public void advanceTimeMs(final long advanceMs) {
 
     /**
      * Creates {@link ConsumerRecord consumer records} with default topic name 
as well as given keys and values.
-     * The timestamp will be generated from the constructor provided and time 
will auto advance.
+     * The timestamp will be generated based on the constructor provided start 
time and time will auto advance.
      *
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
@@ -350,7 +486,7 @@ public void advanceTimeMs(final long advanceMs) {
 
         long timestamp = startTimestamp;
         for (final KeyValue<K, V> keyValue : keyValues) {
-            records.add(create(topicName, keyValue.key, keyValue.value, 
timestamp));
+            records.add(create(topicName, keyValue.key, keyValue.value, new 
RecordHeaders(), timestamp));
             timestamp += advanceMs;
         }
 
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index 09ed29439ba..aedb910e28c 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.test;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.TopologyTestDriver;
 
 import java.util.Objects;
@@ -238,4 +239,202 @@
         compareKeyValueTimestamp(record, expectedRecord.key(), 
expectedRecord.value(), expectedRecord.timestamp());
     }
 
+    /**
+     * Compares a {@link ProducerRecord} with the provided value and headers 
and throws an {@link AssertionError} if
+     * the {@code ProducerRecord}'s value or headers is not equal to the 
expected value or headers.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code 
ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or headers is 
not equal to {@code expectedValue} or {@code expectedHeaders}
+     */
+    public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> 
record,
+                                                  final V expectedValue,
+                                                  final Headers 
expectedHeaders) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final AssertionError error = new AssertionError("Expected value=" + 
expectedValue + " with headers=" + expectedHeaders +
+                " but was value=" + recordValue + " with headers=" + 
recordHeaders);
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the values and headers of two {@link ProducerRecord}'s and 
throws an {@link AssertionError} if the
+     * values or headers are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or headers is 
not equal to {@code expectedRecord}'s value or headers
+     */
+    public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> 
record,
+                                                  final ProducerRecord<K, V> 
expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareValueHeaders(record, expectedRecord.value(), 
expectedRecord.headers());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key, value, and 
headers and throws an
+     * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or 
headers is not equal to the expected key,
+     * value, or headers.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code 
ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers 
is not equal to {@code expectedKey},
+     *                        {@code expectedValue}, or {@code expectedHeaders}
+     */
+    public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, 
V> record,
+                                                     final K expectedKey,
+                                                     final V expectedValue,
+                                                     final Headers 
expectedHeaders) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final AssertionError error = new AssertionError("Expected <" + 
expectedKey + ", " + expectedValue + "> with headers=" + expectedHeaders +
+                " but was <" + recordKey + ", " + recordValue + "> with 
headers=" + recordHeaders);
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys, values, and headers of two {@link ProducerRecord}'s 
and throws an {@link AssertionError} if
+     * the keys, values, or headers are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, or 
headers is not equal to
+     *                        {@code expectedRecord}'s key, value, or headers
+     */
+    public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, 
V> record,
+                                                     final ProducerRecord<K, 
V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValueHeaders(record, expectedRecord.key(), 
expectedRecord.value(), expectedRecord.headers());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key, value, 
headers, and timestamp and throws an
+     * {@link AssertionError} if the {@code ProducerRecord}'s key, value, 
headers, or timestamp is not equal to the expected key,
+     * value, headers, or timestamp.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code 
ProducerRecord}
+     * @param expectedTimestamp the expected timestamp of the {@code 
ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers 
is not equal to {@code expectedKey},
+     *                        {@code expectedValue}, or {@code expectedHeaders}
+     */
+    public static <K, V> void compareKeyValueHeadersTimestamp(final 
ProducerRecord<K, V> record,
+                                                              final K 
expectedKey,
+                                                              final V 
expectedValue,
+                                                              final Headers 
expectedHeaders,
+                                                              final long 
expectedTimestamp) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + 
expectedKey + ", " + expectedValue + ">" +
+                " with timestamp=" + expectedTimestamp + " and headers=" + 
expectedHeaders +
+                " but was <" + recordKey + ", " + recordValue + ">" +
+                " with timestamp=" + recordTimestamp + " and headers=" + 
recordHeaders);
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys, values, headers, and timestamp of two {@link 
ProducerRecord}'s and throws an {@link AssertionError} if
+     * the keys, values, headers, or timestamps are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, 
or timestamp is not equal to
+     *                        {@code expectedRecord}'s key, value, headers, or 
timestamp
+     */
+    public static <K, V> void compareKeyValueHeadersTimestamp(final 
ProducerRecord<K, V> record,
+                                                              final 
ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValueHeadersTimestamp(record, expectedRecord.key(), 
expectedRecord.value(), expectedRecord.headers(), expectedRecord.timestamp());
+    }
 }
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index dbb26e0e8bd..64d5b12dc0b 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -286,7 +286,7 @@ public void process(final String key, final Object value) {
         }
 
         context.resetForwards();
-        context.setRecordMetadata("t1", 0, 0L, 0L);
+        context.setRecordMetadata("t1", 0, 0L, null, 0L);
 
         {
             processor.process("foo", 5L);
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 6cc96a23f0e..2d446d1de2c 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -18,6 +18,10 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -74,10 +78,12 @@
         new ByteArraySerializer(),
         new ByteArraySerializer());
 
+    private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
+
     private final byte[] key1 = new byte[0];
     private final byte[] value1 = new byte[0];
     private final long timestamp1 = 42L;
-    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = 
consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1);
+    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = 
consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, headers, timestamp1);
 
     private final byte[] key2 = new byte[0];
     private final byte[] value2 = new byte[0];
@@ -107,6 +113,7 @@
         private long timestamp;
         private long offset;
         private String topic;
+        private Headers headers;
 
         Record(final ConsumerRecord consumerRecord) {
             key = consumerRecord.key();
@@ -114,15 +121,18 @@
             timestamp = consumerRecord.timestamp();
             offset = consumerRecord.offset();
             topic = consumerRecord.topic();
+            headers = consumerRecord.headers();
         }
 
         Record(final Object key,
                final Object value,
+               final Headers headers,
                final long timestamp,
                final long offset,
                final String topic) {
             this.key = key;
             this.value = value;
+            this.headers = headers;
             this.timestamp = timestamp;
             this.offset = offset;
             this.topic = topic;
@@ -146,12 +156,13 @@ public boolean equals(final Object o) {
                 offset == record.offset &&
                 Objects.equals(key, record.key) &&
                 Objects.equals(value, record.value) &&
-                Objects.equals(topic, record.topic);
+                Objects.equals(topic, record.topic) &&
+                Objects.equals(headers, record.headers);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(key, value, timestamp, offset, topic);
+            return Objects.hash(key, value, headers, timestamp, offset, topic);
         }
     }
 
@@ -201,7 +212,7 @@ public void init(ProcessorContext context) {
 
         @Override
         public void process(Object key, Object value) {
-            processedRecords.add(new Record(key, value, context.timestamp(), 
context.offset(), context.topic()));
+            processedRecords.add(new Record(key, value, context.headers(), 
context.timestamp(), context.offset(), context.topic()));
             context.forward(key, value);
         }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
index 469d241fa73..855aa9f0865 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
@@ -62,13 +62,18 @@ public void shouldNotAllowToCreateTopicWithNullTopicName() {
         factory.create(null, rawKey, value, timestamp);
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullHeaders() {
+        factory.create(topicName, rawKey, value, null, timestamp);
+    }
+
     @Test(expected = NullPointerException.class)
     public void 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
         factory.create(null, rawKey, value);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() {
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey() {
         factory.create((String) null, value, timestamp);
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-244: Add Record Header support to Kafka Streams Processor API
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6850
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6850
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Jorge Quilcate
>            Assignee: Jorge Quilcate
>            Priority: Major
>              Labels: kip
>
> Add support for headers on Streams Processor API.
> KIP documentation: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to