[ 
https://issues.apache.org/jira/browse/KAFKA-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644173#comment-16644173
 ] 

ASF GitHub Bot commented on KAFKA-7483:
---------------------------------------

guozhangwang closed pull request #5751: KAFKA-7483: Allow streams to pass 
headers through Serializer.
URL: https://github.com/apache/kafka/pull/5751
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 7e192973425..5df14ee2815 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -153,8 +153,8 @@ private boolean productionExceptionIsFatal(final Exception 
exception) {
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer) {
         checkForException();
-        final byte[] keyBytes = keySerializer.serialize(topic, key);
-        final byte[] valBytes = valueSerializer.serialize(topic, value);
+        final byte[] keyBytes = keySerializer.serialize(topic, headers, key);
+        final byte[] valBytes = valueSerializer.serialize(topic, headers, 
value);
 
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 4f89a1e756f..c4e58be7c1f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -65,17 +65,14 @@
     );
 
     private final Cluster cluster = new Cluster("cluster", 
Collections.singletonList(Node.noNode()), infos,
-        Collections.<String>emptySet(), Collections.<String>emptySet());
+        Collections.emptySet(), Collections.emptySet());
 
 
     private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
     private final StringSerializer stringSerializer = new StringSerializer();
 
-    private final StreamPartitioner<String, Object> streamPartitioner = new 
StreamPartitioner<String, Object>() {
-        @Override
-        public Integer partition(final String topic, final String key, final 
Object value, final int numPartitions) {
-            return Integer.parseInt(key) % numPartitions;
-        }
+    private final StreamPartitioner<String, Object> streamPartitioner = 
(topic, key, value, numPartitions) -> {
+        return Integer.parseInt(key) % numPartitions;
     };
 
     @Test
@@ -362,4 +359,55 @@ public void 
shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
         });
         collector.send("topic1", "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);
     }
+
+    @Test
+    public void testRecordHeaderPassThroughSerializer() {
+        final CustomStringSerializer keySerializer = new 
CustomStringSerializer();
+        final CustomStringSerializer valueSerializer = new 
CustomStringSerializer();
+        keySerializer.configure(Collections.EMPTY_MAP, true);
+
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+                "test",
+                logContext,
+                new DefaultProductionExceptionHandler(),
+                new Metrics().sensor("skipped-records")
+        );
+        final MockProducer<byte[], byte[]> mockProducer = new 
MockProducer<>(cluster, true, new DefaultPartitioner(),
+                byteArraySerializer, byteArraySerializer);
+        collector.init(mockProducer);
+
+        collector.send("topic1", "3", "0", new RecordHeaders(), null, 
keySerializer, valueSerializer, streamPartitioner);
+
+        final List<ProducerRecord<byte[], byte[]>> recordHistory = 
mockProducer.history();
+        for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) {
+            final Headers headers = sentRecord.headers();
+            assertEquals(2, headers.toArray().length);
+            assertEquals(new RecordHeader("key", "key".getBytes()), 
headers.lastHeader("key"));
+            assertEquals(new RecordHeader("value", "value".getBytes()), 
headers.lastHeader("value"));
+        }
+    }
+
+    private static class CustomStringSerializer extends StringSerializer {
+
+        private boolean isKey;
+
+        private CustomStringSerializer() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+            this.isKey = isKey;
+            super.configure(configs, isKey);
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, 
final String data) {
+            if (isKey) {
+                headers.add(new RecordHeader("key", "key".getBytes()));
+            } else {
+                headers.add(new RecordHeader("value", "value".getBytes()));
+            }
+            return serialize(topic, data);
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 699963395e9..096f792d78d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -206,8 +206,8 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> 
serdes) {
                                       final Serializer<V1> valueSerializer) {
                 // for byte arrays we need to wrap it for comparison
 
-                final K keyTest = 
serdes.keyFrom(keySerializer.serialize(topic, key));
-                final V valueTest = 
serdes.valueFrom(valueSerializer.serialize(topic, value));
+                final K keyTest = 
serdes.keyFrom(keySerializer.serialize(topic, headers, key));
+                final V valueTest = 
serdes.valueFrom(valueSerializer.serialize(topic, headers, value));
 
                 recordFlushed(keyTest, valueTest);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 08f019feffa..cd0f49a7372 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -96,8 +96,8 @@
                                   final Serializer<K1> keySerializer,
                                   final Serializer<V1> valueSerializer) {
             changeLog.add(new KeyValue<>(
-                keySerializer.serialize(topic, key),
-                valueSerializer.serialize(topic, value))
+                keySerializer.serialize(topic, headers, key),
+                valueSerializer.serialize(topic, headers, value))
             );
         }
     };
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java 
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index bc918ea1b2b..244b35f1465 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -61,8 +61,9 @@ public void bufferRecord(final ConsumerRecord<K, V> record) {
         recordBuffer.add(
             new ConsumerRecord<>(record.topic(), record.partition(), 
record.offset(), record.timestamp(),
                                  record.timestampType(), 0L, 0, 0,
-                                 keySerializer.serialize(record.topic(), 
record.key()),
-                                 valueSerializer.serialize(record.topic(), 
record.value())));
+                                 keySerializer.serialize(record.topic(), 
record.headers(), record.key()),
+                                 valueSerializer.serialize(record.topic(), 
record.headers(), record.value()),
+                                 record.headers()));
         endOffset = record.offset();
 
         super.updateEndOffsets(Collections.singletonMap(assignedPartition, 
endOffset));
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index 108dafdfdba..87ec7c1fcc3 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -180,8 +180,8 @@ public void advanceTimeMs(final long advanceMs) {
                                                  final long timestampMs) {
         Objects.requireNonNull(topicName, "topicName cannot be null.");
         Objects.requireNonNull(headers, "headers cannot be null.");
-        final byte[] serializedKey = keySerializer.serialize(topicName, key);
-        final byte[] serializedValue = valueSerializer.serialize(topicName, 
value);
+        final byte[] serializedKey = keySerializer.serialize(topicName, 
headers, key);
+        final byte[] serializedValue = valueSerializer.serialize(topicName, 
headers, value);
         return new ConsumerRecord<>(
             topicName,
             -1,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams should allow headers to be passed to Serializer
> -------------------------------------------------------
>
>                 Key: KAFKA-7483
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7483
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>
> We are storing schema metadata for record key and value in the header. 
> Serializer, includes this metadata in the record header. While doing simple 
> record transformation (x transformed to y) in streams, the same header that 
> was passed from source, pushed to the sink topic. This leads to error while 
> reading the sink topic.
> We should call the overloaded `serialize(topic, headers, object)` method in 
> [RecordCollectorImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L156]
>  which in-turn adds the correct metadata in the record header.
> With this sink topic reader have the option to read all the values for a 
> header key using `Headers#headers`  [or] only the overwritten value using 
> `Headers#lastHeader`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to