This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3edb406f980 KAFKA-16505: Add source raw key and value (#18739)
3edb406f980 is described below

commit 3edb406f980ef9c230e76e1de860afcc3b935b87
Author: Loïc GREFFIER <[email protected]>
AuthorDate: Thu Jun 5 09:35:03 2025 +0100

    KAFKA-16505: Add source raw key and value (#18739)
    
    This PR is part of the KIP-1034.
    
    It brings the support for the source raw key and the source raw
    value in the `ErrorHandlerContext`. Required by the routing to DLQ 
implemented
    by https://github.com/apache/kafka/pull/17942.
    
    Reviewers: Bruno Cadonna <[email protected]>
    
    Co-authored-by: Damien Gasparina <[email protected]>
---
 .../ProcessingExceptionHandlerIntegrationTest.java | 155 +++++++++++++++++++++
 .../kafka/streams/errors/ErrorHandlerContext.java  |  34 +++++
 .../internals/DefaultErrorHandlerContext.java      |  16 ++-
 .../kafka/streams/processor/RecordContext.java     |  27 ++++
 .../processor/internals/ProcessorContextImpl.java  |   5 +-
 .../streams/processor/internals/ProcessorNode.java |   4 +-
 .../internals/ProcessorRecordContext.java          |  40 +++++-
 .../processor/internals/RecordCollectorImpl.java   |  18 ++-
 .../processor/internals/RecordDeserializer.java    |   5 +-
 .../streams/processor/internals/RecordQueue.java   |   2 +-
 .../streams/processor/internals/StampedRecord.java |  32 +++++
 .../streams/processor/internals/StreamTask.java    |   8 +-
 .../state/internals/CachingKeyValueStore.java      |   4 +-
 .../state/internals/CachingSessionStore.java       |   4 +-
 .../state/internals/CachingWindowStore.java        |   4 +-
 .../streams/state/internals/LRUCacheEntry.java     |  16 ++-
 .../internals/TimeOrderedCachingWindowStore.java   |   8 +-
 .../processor/internals/ProcessorNodeTest.java     |   9 +-
 .../processor/internals/RecordCollectorTest.java   |  64 +++++++++
 .../streams/state/internals/NamedCacheTest.java    |  20 +--
 .../streams/state/internals/ThreadCacheTest.java   |  10 +-
 ...imeOrderedCachingPersistentWindowStoreTest.java |   5 +-
 .../internals/TimeOrderedWindowStoreTest.java      |   4 +-
 .../kafka/test/InternalMockProcessorContext.java   |   5 +-
 24 files changed, 464 insertions(+), 35 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 13e291e887c..38711093ff8 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -16,14 +16,17 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
 import 
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
@@ -31,14 +34,22 @@ import 
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockProcessorSupplier;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -48,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -385,6 +397,131 @@ public class ProcessingExceptionHandlerIntegrationTest {
         }
     }
 
+    static Stream<Arguments> sourceRawRecordTopologyTestCases() {
+        // Validate source raw key and source raw value for fully stateless 
topology
+        final List<ProducerRecord<String, String>> statelessTopologyEvent = 
List.of(new ProducerRecord<>("TOPIC_NAME", "ID123-1", "ID123-A1"));
+        final StreamsBuilder statelessTopologyBuilder = new StreamsBuilder();
+        statelessTopologyBuilder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .selectKey((key, value) -> "newKey")
+            .mapValues(value -> {
+                throw new RuntimeException("Error");
+            });
+
+        // Validate source raw key and source raw value for processing 
exception in aggregator with caching enabled
+        final List<ProducerRecord<String, String>> 
cacheAggregateExceptionInAggregatorEvent = List.of(new 
ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
+        final StreamsBuilder 
cacheAggregateExceptionInAggregatorTopologyBuilder = new StreamsBuilder();
+        cacheAggregateExceptionInAggregatorTopologyBuilder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), 
Serdes.String()))
+            .aggregate(() -> "initialValue",
+                (key, value, aggregate) -> {
+                    throw new RuntimeException("Error");
+                },
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("aggregate")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+                    .withCachingEnabled());
+
+        // Validate source raw key and source raw value for processing 
exception after aggregation with caching enabled
+        final List<ProducerRecord<String, String>> 
cacheAggregateExceptionAfterAggregationEvent = List.of(new 
ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
+        final StreamsBuilder 
cacheAggregateExceptionAfterAggregationTopologyBuilder = new StreamsBuilder();
+        cacheAggregateExceptionAfterAggregationTopologyBuilder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), 
Serdes.String()))
+            .aggregate(() -> "initialValue",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("aggregate")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+                    .withCachingEnabled())
+            .mapValues(value -> {
+                throw new RuntimeException("Error");
+            });
+
+        // Validate source raw key and source raw value for processing 
exception after aggregation with caching disabled
+        final List<ProducerRecord<String, String>> 
noCacheAggregateExceptionAfterAggregationEvents = List.of(new 
ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
+        final StreamsBuilder 
noCacheAggregateExceptionAfterAggregationTopologyBuilder = new StreamsBuilder();
+        noCacheAggregateExceptionAfterAggregationTopologyBuilder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), 
Serdes.String()))
+            .aggregate(() -> "initialValue",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("aggregate")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+                    .withCachingDisabled())
+            .mapValues(value -> {
+                throw new RuntimeException("Error");
+            });
+
+        // Validate source raw key and source raw value for processing 
exception after table creation with caching enabled
+        final List<ProducerRecord<String, String>> cacheTableEvents = 
List.of(new ProducerRecord<>("TOPIC_NAME", "ID123-1", "ID123-A1"));
+        final StreamsBuilder cacheTableTopologyBuilder = new StreamsBuilder();
+        cacheTableTopologyBuilder
+            .table("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+                .withCachingEnabled())
+            .mapValues(value -> {
+                throw new RuntimeException("Error");
+            });
+
+        // Validate source raw key and source raw value for processing 
exception in join
+        final List<ProducerRecord<String, String>> joinEvents = List.of(
+            new ProducerRecord<>("TOPIC_NAME_2", "INITIAL-KEY123-1", 
"ID123-A1"),
+            new ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-2", "ID123-A1")
+        );
+        final StreamsBuilder joinTopologyBuilder = new StreamsBuilder();
+        joinTopologyBuilder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .selectKey((key, value) -> "ID123-1")
+            .leftJoin(joinTopologyBuilder.stream("TOPIC_NAME_2", 
Consumed.with(Serdes.String(), Serdes.String()))
+                    .selectKey((key, value) -> "ID123-1"),
+                (key, left, right) -> {
+                    throw new RuntimeException("Error");
+                },
+                JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), 
Duration.ofMinutes(1)),
+                StreamJoined.with(
+                        Serdes.String(), Serdes.String(), Serdes.String())
+                    .withName("join-rekey")
+                    .withStoreName("join-store"));
+
+        return Stream.of(
+            Arguments.of(statelessTopologyEvent, 
statelessTopologyBuilder.build()),
+            Arguments.of(cacheAggregateExceptionInAggregatorEvent, 
cacheAggregateExceptionInAggregatorTopologyBuilder.build()),
+            Arguments.of(cacheAggregateExceptionAfterAggregationEvent, 
noCacheAggregateExceptionAfterAggregationTopologyBuilder.build()),
+            Arguments.of(noCacheAggregateExceptionAfterAggregationEvents, 
cacheAggregateExceptionInAggregatorTopologyBuilder.build()),
+            Arguments.of(cacheTableEvents, cacheTableTopologyBuilder.build()),
+            Arguments.of(joinEvents, joinTopologyBuilder.build())
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("sourceRawRecordTopologyTestCases")
+    public void 
shouldVerifySourceRawKeyAndSourceRawValuePresentOrNotInErrorHandlerContext(final
 List<ProducerRecord<String, String>> events,
+                                                                               
            final Topology topology) {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                AssertSourceRawRecordProcessingExceptionHandlerMockTest.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, properties, Instant.ofEpochMilli(0L))) {
+            for (final ProducerRecord<String, String> event : events) {
+                final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(event.topic(), new StringSerializer(), new 
StringSerializer());
+
+                final String key = event.key();
+                final String value = event.value();
+
+                if (event.topic().equals("TOPIC_NAME")) {
+                    assertThrows(StreamsException.class, () -> 
inputTopic.pipeInput(key, value, TIMESTAMP));
+                } else {
+                    inputTopic.pipeInput(event.key(), event.value(), 
TIMESTAMP);
+                }
+            }
+        }
+    }
+
     public static class ContinueProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
         @Override
         public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
@@ -422,10 +559,28 @@ public class ProcessingExceptionHandlerIntegrationTest {
         assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) 
record.value()));
         assertEquals("TOPIC_NAME", context.topic());
         assertEquals("KSTREAM-PROCESSOR-0000000003", 
context.processorNodeId());
+        assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), 
context.sourceRawKey())
+            || Arrays.equals("ID123-5-ERR".getBytes(), 
context.sourceRawKey()));
+        assertTrue(Arrays.equals("ID123-A2".getBytes(), 
context.sourceRawValue())
+            || Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
         assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
         assertTrue(exception.getMessage().contains("Exception should be 
handled by processing exception handler"));
     }
 
+    public static class 
AssertSourceRawRecordProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
+        @Override
+        public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+            assertEquals("ID123-1", 
Serdes.String().deserializer().deserialize("topic", context.sourceRawKey()));
+            assertEquals("ID123-A1", 
Serdes.String().deserializer().deserialize("topic", context.sourceRawValue()));
+            return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+            // No-op
+        }
+    }
+
     /**
      * Metric name for dropped records total.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index d471673a48e..59ccab6fbf3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
      * @return The timestamp.
      */
     long timestamp();
+
+    /**
+     * Return the non-deserialized byte[] of the input message key if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return null.
+     *
+     * <p> If this method is invoked in a sub-topology due to a repartition, 
the returned key would be one sent
+     * to the repartition topic.
+     *
+     * <p> Always returns null if this method is invoked within a
+     * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, 
Exception)
+     *
+     * @return the raw byte of the key of the source message
+     */
+    byte[] sourceRawKey();
+
+    /**
+     * Return the non-deserialized byte[] of the input message value if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return {@code null}.
+     *
+     * <p> If this method is invoked in a sub-topology due to a repartition, 
the returned key would be one sent
+     * to the repartition topic.
+     *
+     * <p> Always returns null if this method is invoked within a
+     * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, 
Exception)
+     *
+     * @return the raw byte of the value of the source message
+     */
+    byte[] sourceRawValue();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
index efaa6d57e7a..0e85ce68c03 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
@@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
     private final Headers headers;
     private final String processorNodeId;
     private final TaskId taskId;
+    private final byte[] sourceRawKey;
+    private final byte[] sourceRawValue;
 
     private final long timestamp;
     private final ProcessorContext processorContext;
@@ -44,7 +46,9 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
                                       final Headers headers,
                                       final String processorNodeId,
                                       final TaskId taskId,
-                                      final long timestamp) {
+                                      final long timestamp,
+                                      final byte[] sourceRawKey,
+                                      final byte[] sourceRawValue) {
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
@@ -53,6 +57,8 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
         this.taskId = taskId;
         this.processorContext = processorContext;
         this.timestamp = timestamp;
+        this.sourceRawKey = sourceRawKey;
+        this.sourceRawValue = sourceRawValue;
     }
 
     @Override
@@ -90,6 +96,14 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
         return timestamp;
     }
 
+    public byte[] sourceRawKey() {
+        return sourceRawKey;
+    }
+
+    public byte[] sourceRawValue() {
+        return sourceRawValue;
+    }
+
     @Override
     public String toString() {
         // we do exclude headers on purpose, to not accidentally log user data
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
index 6b6fd91c853..f77d4f454ce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
@@ -110,4 +110,31 @@ public interface RecordContext {
      */
     Headers headers();
 
+    /**
+     * Return the non-deserialized byte[] of the input message key if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return {@code null}.
+     *
+     * <p> If this method is invoked in a sub-topology due to a repartition, 
the returned key would be one sent
+     * to the repartition topic.
+     *
+     * @return the raw byte of the key of the source message
+     */
+    byte[] sourceRawKey();
+
+    /**
+     * Return the non-deserialized byte[] of the input message value if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return {@code null}.
+     *
+     * <p> If this method is invoked in a sub-topology due to a repartition, 
the returned key would be one sent
+     * to the repartition topic.
+     *
+     * @return the raw byte of the value of the source message
+     */
+    byte[] sourceRawValue();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 8f739d0c056..93961daf97b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -260,7 +260,10 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
                     recordContext.offset(),
                     recordContext.partition(),
                     recordContext.topic(),
-                    record.headers());
+                    record.headers(),
+                    recordContext.sourceRawKey(),
+                    recordContext.sourceRawValue()
+                );
             }
 
             if (childName == null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 5d245ef5f30..1dddc55ca3c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -215,7 +215,9 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                 internalProcessorContext.recordContext().headers(),
                 internalProcessorContext.currentNode().name(),
                 internalProcessorContext.taskId(),
-                internalProcessorContext.recordContext().timestamp()
+                internalProcessorContext.recordContext().timestamp(),
+                internalProcessorContext.recordContext().sourceRawKey(),
+                internalProcessorContext.recordContext().sourceRawValue()
             );
 
             final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 839baaad875..8198645eb65 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Objects;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -37,6 +38,8 @@ public class ProcessorRecordContext implements RecordContext, 
RecordMetadata {
     private final String topic;
     private final int partition;
     private final Headers headers;
+    private byte[] sourceRawKey;
+    private byte[] sourceRawValue;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
@@ -48,6 +51,24 @@ public class ProcessorRecordContext implements 
RecordContext, RecordMetadata {
         this.topic = topic;
         this.partition = partition;
         this.headers = Objects.requireNonNull(headers);
+        this.sourceRawKey = null;
+        this.sourceRawValue = null;
+    }
+
+    public ProcessorRecordContext(final long timestamp,
+                                  final long offset,
+                                  final int partition,
+                                  final String topic,
+                                  final Headers headers,
+                                  final byte[] sourceRawKey,
+                                  final byte[] sourceRawValue) {
+        this.timestamp = timestamp;
+        this.offset = offset;
+        this.topic = topic;
+        this.partition = partition;
+        this.headers = Objects.requireNonNull(headers);
+        this.sourceRawKey = sourceRawKey;
+        this.sourceRawValue = sourceRawValue;
     }
 
     @Override
@@ -75,6 +96,16 @@ public class ProcessorRecordContext implements 
RecordContext, RecordMetadata {
         return headers;
     }
 
+    @Override
+    public byte[] sourceRawKey() {
+        return sourceRawKey;
+    }
+
+    @Override
+    public byte[] sourceRawValue() {
+        return sourceRawValue;
+    }
+
     public long residentMemorySizeEstimate() {
         long size = 0;
         size += Long.BYTES; // value.context.timestamp
@@ -176,6 +207,11 @@ public class ProcessorRecordContext implements 
RecordContext, RecordMetadata {
         return new ProcessorRecordContext(timestamp, offset, partition, topic, 
headers);
     }
 
+    public void freeRawRecord() {
+        this.sourceRawKey = null;
+        this.sourceRawValue = null;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -189,7 +225,9 @@ public class ProcessorRecordContext implements 
RecordContext, RecordMetadata {
             offset == that.offset &&
             partition == that.partition &&
             Objects.equals(topic, that.topic) &&
-            Objects.equals(headers, that.headers);
+            Objects.equals(headers, that.headers) &&
+            Arrays.equals(sourceRawKey, that.sourceRawKey) &&
+            Arrays.equals(sourceRawValue, that.sourceRawValue);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d47db7ea942..89cbf4d4c7d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -259,6 +259,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
 
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
+        // As many records could be in-flight,
+        // freeing raw records in the context to reduce memory pressure
+        freeRawInputRecordFromContext(context);
+
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
             try {
                 // if there's already an exception record, skip logging 
offsets or new exceptions
@@ -311,6 +315,12 @@ public class RecordCollectorImpl implements 
RecordCollector {
         });
     }
 
+    private static void freeRawInputRecordFromContext(final 
InternalProcessorContext<Void, Void> context) {
+        if (context != null && context.recordContext() != null) {
+            context.recordContext().freeRawRecord();
+        }
+    }
+
     private <K, V> void handleException(final 
ProductionExceptionHandler.SerializationExceptionOrigin origin,
                                         final String topic,
                                         final K key,
@@ -388,7 +398,9 @@ public class RecordCollectorImpl implements RecordCollector 
{
                 recordContext.headers(),
                 processorNodeId,
                 taskId,
-                recordContext.timestamp()
+                recordContext.timestamp(),
+                context.recordContext().sourceRawKey(),
+                context.recordContext().sourceRawValue()
             ) :
             new DefaultErrorHandlerContext(
                 context,
@@ -398,7 +410,9 @@ public class RecordCollectorImpl implements RecordCollector 
{
                 new RecordHeaders(),
                 processorNodeId,
                 taskId,
-                -1L
+                -1L,
+                null,
+                null
             );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 6f9fe989552..153ca2e02f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -95,7 +95,10 @@ public class RecordDeserializer {
             rawRecord.headers(),
             sourceNodeName,
             processorContext.taskId(),
-            rawRecord.timestamp());
+            rawRecord.timestamp(),
+            rawRecord.key(),
+            rawRecord.value()
+        );
 
         final DeserializationHandlerResponse response;
         try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index d38d7b625ae..faa90572ca5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -243,7 +243,7 @@ public class RecordQueue {
                 lastCorruptedRecord = raw;
                 continue;
             }
-            headRecord = new StampedRecord(deserialized, timestamp);
+            headRecord = new StampedRecord(deserialized, timestamp, raw.key(), 
raw.value());
             headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
index c8ed35a9a8f..dd0a1298b67 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
@@ -23,8 +23,22 @@ import java.util.Optional;
 
 public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
 
+    private final byte[] rawKey;
+    private final byte[] rawValue;
+
     public StampedRecord(final ConsumerRecord<?, ?> record, final long 
timestamp) {
         super(record, timestamp);
+        this.rawKey = null;
+        this.rawValue = null;
+    }
+
+    public StampedRecord(final ConsumerRecord<?, ?> record,
+                         final long timestamp,
+                         final byte[] rawKey,
+                         final byte[] rawValue) {
+        super(record, timestamp);
+        this.rawKey = rawKey;
+        this.rawValue = rawValue;
     }
 
     public String topic() {
@@ -55,8 +69,26 @@ public class StampedRecord extends Stamped<ConsumerRecord<?, 
?>> {
         return value.headers();
     }
 
+    public byte[] rawKey() {
+        return rawKey;
+    }
+
+    public byte[] rawValue() {
+        return rawValue;
+    }
+
     @Override
     public String toString() {
         return value.toString() + ", timestamp = " + timestamp;
     }
+
+    @Override
+    public boolean equals(final Object other) {
+        return super.equals(other);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 93737d82289..82e9c8d7fb1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -856,7 +856,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             record.offset(),
             record.partition(),
             record.topic(),
-            record.headers()
+            record.headers(),
+            record.rawKey(),
+            record.rawValue()
         );
         updateProcessorContext(currNode, wallClockTime, recordContext);
 
@@ -938,7 +940,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 recordContext.headers(),
                 node.name(),
                 id(),
-                recordContext.timestamp()
+                recordContext.timestamp(),
+                recordContext.sourceRawKey(),
+                recordContext.sourceRawValue()
             );
 
             final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f59271920f5..83343d04494 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -277,7 +277,9 @@ public class CachingKeyValueStore
                     internalContext.recordContext().offset(),
                     internalContext.recordContext().timestamp(),
                     internalContext.recordContext().partition(),
-                    internalContext.recordContext().topic()
+                    internalContext.recordContext().topic(),
+                    internalContext.recordContext().sourceRawKey(),
+                    internalContext.recordContext().sourceRawValue()
                 )
             );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 00dbaa5589b..ec0c1bd077d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -140,7 +140,9 @@ class CachingSessionStore
                 internalContext.recordContext().offset(),
                 internalContext.recordContext().timestamp(),
                 internalContext.recordContext().partition(),
-                internalContext.recordContext().topic()
+                internalContext.recordContext().topic(),
+                internalContext.recordContext().sourceRawKey(),
+                internalContext.recordContext().sourceRawValue()
             );
         internalContext.cache().put(cacheName, 
cacheFunction.cacheKey(binaryKey), entry);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f138ff9202a..0432c1726cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -158,7 +158,9 @@ class CachingWindowStore
                 internalContext.recordContext().offset(),
                 internalContext.recordContext().timestamp(),
                 internalContext.recordContext().partition(),
-                internalContext.recordContext().topic()
+                internalContext.recordContext().topic(),
+                internalContext.recordContext().sourceRawKey(),
+                internalContext.recordContext().sourceRawValue()
             );
         internalContext.cache().put(cacheName, 
cacheFunction.cacheKey(keyBytes), entry);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index f4233c7cb11..0cbd79714cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -32,7 +32,7 @@ class LRUCacheEntry {
 
 
     LRUCacheEntry(final byte[] value) {
-        this(value, new RecordHeaders(), false, -1, -1, -1, "");
+        this(value, new RecordHeaders(), false, -1, -1, -1, "", null, null);
     }
 
     LRUCacheEntry(final byte[] value,
@@ -41,8 +41,18 @@ class LRUCacheEntry {
                   final long offset,
                   final long timestamp,
                   final int partition,
-                  final String topic) {
-        final ProcessorRecordContext context = new 
ProcessorRecordContext(timestamp, offset, partition, topic, headers);
+                  final String topic,
+                  final byte[] rawKey,
+                  final byte[] rawValue) {
+        final ProcessorRecordContext context = new ProcessorRecordContext(
+            timestamp,
+            offset,
+            partition,
+            topic,
+            headers,
+            rawKey,
+            rawValue
+        );
 
         this.record = new ContextualRecord(
             value,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 7f443c3e32c..646cbf2ca35 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -261,7 +261,9 @@ class TimeOrderedCachingWindowStore
                 internalContext.recordContext().offset(),
                 internalContext.recordContext().timestamp(),
                 internalContext.recordContext().partition(),
-                internalContext.recordContext().topic()
+                internalContext.recordContext().topic(),
+                internalContext.recordContext().sourceRawKey(),
+                internalContext.recordContext().sourceRawValue()
             );
 
         // Put to index first so that base can be evicted later
@@ -279,7 +281,9 @@ class TimeOrderedCachingWindowStore
                     internalContext.recordContext().offset(),
                     internalContext.recordContext().timestamp(),
                     internalContext.recordContext().partition(),
-                    ""
+                    "",
+                    internalContext.recordContext().sourceRawKey(),
+                    internalContext.recordContext().sourceRawValue()
                 );
             final Bytes indexKey = 
KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
             internalContext.cache().put(cacheName, 
indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 86f617e7f34..5341cd25f0d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -80,6 +80,8 @@ public class ProcessorNodeTest {
     private static final String NAME = "name";
     private static final String KEY = "key";
     private static final String VALUE = "value";
+    private static final byte[] RAW_KEY = KEY.getBytes();
+    private static final byte[] RAW_VALUE = VALUE.getBytes();
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
@@ -331,7 +333,9 @@ public class ProcessorNodeTest {
                 OFFSET,
                 PARTITION,
                 TOPIC,
-                new RecordHeaders()));
+                new RecordHeaders(),
+                RAW_KEY,
+                RAW_VALUE));
         when(internalProcessorContext.currentNode()).thenReturn(new 
ProcessorNode<>(NAME));
 
         return internalProcessorContext;
@@ -359,6 +363,9 @@ public class ProcessorNodeTest {
             assertEquals(internalProcessorContext.currentNode().name(), 
context.processorNodeId());
             assertEquals(internalProcessorContext.taskId(), context.taskId());
             assertEquals(internalProcessorContext.recordContext().timestamp(), 
context.timestamp());
+            
assertEquals(internalProcessorContext.recordContext().sourceRawKey(), 
context.sourceRawKey());
+            
assertEquals(internalProcessorContext.recordContext().sourceRawValue(), 
context.sourceRawValue());
+
             assertEquals(KEY, record.key());
             assertEquals(VALUE, record.value());
             assertInstanceOf(RuntimeException.class, exception);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index b01b87ed85f..4fb5f91ba75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -100,6 +100,8 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -1890,6 +1892,68 @@ public class RecordCollectorTest {
         ));
     }
 
+    @Test
+    public void shouldFreeRawRecordsInContextBeforeSending() {
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final byte[][] sourceRawData = new byte[][]{new byte[]{}, new 
byte[]{}};
+
+        final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                getExceptionalStreamsProducerOnSend(exception),
+                new ProductionExceptionHandler() {
+                    @Override
+                    public void configure(final Map<String, ?> configs) {
+
+                    }
+
+                    @Override
+                    public ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final 
Exception exception) {
+                        sourceRawData[0] = context.sourceRawKey();
+                        sourceRawData[1] = context.sourceRawValue();
+                        return ProductionExceptionHandlerResponse.CONTINUE;
+                    }
+                },
+                streamsMetrics,
+                topology
+        );
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, sinkNodeName, context, streamPartitioner);
+
+        assertNull(sourceRawData[0]);
+        assertNull(sourceRawData[1]);
+    }
+
+
+    @Test
+    public void shouldHaveRawDataDuringExceptionInSerialization() {
+        final byte[][] sourceRawData = new byte[][]{new byte[]{}, new 
byte[]{}};
+        try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
+            final RecordCollector collector = newRecordCollector(
+                    new ProductionExceptionHandler() {
+                        @Override
+                        @SuppressWarnings({"rawtypes", "unused"})
+                        public ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context, final 
ProducerRecord record, final Exception exception, final 
SerializationExceptionOrigin origin) {
+                            sourceRawData[0] = context.sourceRawKey();
+                            sourceRawData[1] = context.sourceRawValue();
+                            return ProductionExceptionHandlerResponse.CONTINUE;
+                        }
+
+                        @Override
+                        public void configure(final Map<String, ?> configs) {
+
+                        }
+                    }
+            );
+            collector.initialize();
+
+            collector.send(topic, "hello", "val", null, 0, null, (Serializer) 
errorSerializer, stringSerializer, sinkNodeName, context);
+
+            assertNotNull(sourceRawData[0]);
+            assertNotNull(sourceRawData[1]);
+        }
+    }
+
     private RecordCollector newRecordCollector(final 
ProductionExceptionHandler productionExceptionHandler) {
         return new RecordCollectorImpl(
             logContext,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 9a68e258c5d..fd138c7e714 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -44,6 +44,8 @@ public class NamedCacheTest {
 
     private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
+    private final byte[] rawKey = new byte[]{0};
+    private final byte[] rawValue = new byte[]{0};
 
     @BeforeEach
     public void setUp() {
@@ -64,7 +66,7 @@ public class NamedCacheTest {
             final byte[] key = stringStringKeyValue.key.getBytes();
             final byte[] value = stringStringKeyValue.value.getBytes();
             cache.put(Bytes.wrap(key),
-                new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, 
""));
+                new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, 
"", rawKey, rawValue));
             final LRUCacheEntry head = cache.first();
             final LRUCacheEntry tail = cache.last();
             assertEquals(new String(head.value()), stringStringKeyValue.value);
@@ -152,9 +154,9 @@ public class NamedCacheTest {
     @Test
     public void shouldFlushDirtEntriesOnEviction() {
         final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, "", rawKey, rawValue));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, 
headers, true, 0, 0, 0, "", rawKey, rawValue));
 
         cache.setListener(flushed::addAll);
 
@@ -176,16 +178,16 @@ public class NamedCacheTest {
 
     @Test
     public void 
shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() 
{
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, 
headers, true, 0, 0, 0, "", rawKey, rawValue));
         assertThrows(IllegalStateException.class, () -> 
cache.put(Bytes.wrap(new byte[]{0}),
-            new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0, 
0, 0, "")));
+            new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0, 
0, 0, "", rawKey, rawValue)));
     }
 
     @Test
     public void shouldRemoveDeletedValuesOnFlush() {
         cache.setListener(dirty -> { /* no-op */ });
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, 
true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, 
new RecordHeaders(), true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, 
true, 0, 0, 0, "", rawKey, rawValue));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, 
new RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue));
         cache.flush();
         assertEquals(1, cache.size());
         assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -193,7 +195,7 @@ public class NamedCacheTest {
 
     @Test
     public void shouldBeReentrantAndNotBreakLRU() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new 
RecordHeaders(), true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new 
RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue);
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         cache.put(Bytes.wrap(new byte[]{0}), dirty);
         cache.put(Bytes.wrap(new byte[]{1}), clean);
@@ -236,7 +238,7 @@ public class NamedCacheTest {
 
     @Test
     public void 
shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey()
 {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new 
RecordHeaders(), true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new 
RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue);
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         final Bytes key = Bytes.wrap(new byte[] {3});
         cache.setListener(dirty1 -> cache.put(key, clean));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 9e904a2ab2c..a1cc0cec6fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -48,6 +48,8 @@ public class ThreadCacheTest {
     final String namespace2 = "0.2-namespace";
     private final LogContext logContext = new LogContext("testCache ");
     private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, 
{6}, {7}, {8}, {9}, {10}};
+    private final byte[] rawKey = new byte[]{0};
+    private final byte[] rawValue = new byte[]{0};
 
     @Test
     public void basicPutGet() {
@@ -65,7 +67,7 @@ public class ThreadCacheTest {
         for (final KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1L, 1L, 1, "", rawKey, rawValue));
         }
 
         for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -98,7 +100,7 @@ public class ThreadCacheTest {
             final String keyStr = "K" + i;
             final Bytes key = Bytes.wrap(keyStr.getBytes());
             final byte[] value = new byte[valueSizeBytes];
-            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1L, 1L, 1, "", rawKey, rawValue));
         }
 
 
@@ -176,7 +178,7 @@ public class ThreadCacheTest {
         for (final KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1, 1, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new 
RecordHeaders(), true, 1, 1, 1, "", rawKey, rawValue));
         }
 
         for (int i = 0; i < expected.size(); i++) {
@@ -617,7 +619,7 @@ public class ThreadCacheTest {
     }
 
     private LRUCacheEntry dirtyEntry(final byte[] key) {
-        return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1, 
"");
+        return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1, 
"", rawKey, rawValue);
     }
 
     private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index 7aba2434457..ffa509d5188 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -938,7 +938,10 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
                 context.recordContext().offset(),
                 context.recordContext().timestamp(),
                 context.recordContext().partition(),
-                "")
+                "",
+                context.recordContext().sourceRawKey(),
+                context.recordContext().sourceRawValue()
+            )
         );
 
         underlyingStore.put(key, value, 1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 9eb9ec21b5e..9d0db9bae0f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -944,7 +944,9 @@ public class TimeOrderedWindowStoreTest {
                 context.recordContext().offset(),
                 context.recordContext().timestamp(),
                 context.recordContext().partition(),
-                ""
+                "",
+                context.recordContext().sourceRawKey(),
+                context.recordContext().sourceRawValue()
             )
         );
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 228df8d63a1..ed68c86c490 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -56,6 +56,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -244,7 +245,9 @@ public class InternalMockProcessorContext<KOut, VOut>
                 0,
                 0,
                 "topic",
-                new RecordHeaders()
+                new RecordHeaders(),
+                "sourceKey".getBytes(StandardCharsets.UTF_8),
+                "sourceValue".getBytes(StandardCharsets.UTF_8)
         );
     }
 

Reply via email to