mjsax commented on code in PR #21755:
URL: https://github.com/apache/kafka/pull/21755#discussion_r2936110075


##########
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java:
##########
@@ -130,7 +131,12 @@ public void shouldPassHeadersToUnderlyingSerializer() {
 
         testSerializer.serialize("dummy", headers, data);
 
-        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+        verify(mockSerializer, times(1)).serialize(anyString(), eq(headers), 
eq(key));
+        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+
+        testSerializer.serializeBaseKey("dummy", headers, data);
+
+        verify(mockSerializer, times(2)).serialize(anyString(), eq(headers), 
eq(key));

Review Comment:
   Same. Should we verify the topic name, too?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java:
##########
@@ -130,7 +131,12 @@ public void shouldPassHeadersToUnderlyingSerializer() {
 
         testSerializer.serialize("dummy", headers, data);
 
-        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+        verify(mockSerializer, times(1)).serialize(anyString(), eq(headers), 
eq(key));
+        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+
+        testSerializer.serializeBaseKey("dummy", headers, data);
+
+        verify(mockSerializer, times(2)).serialize(anyString(), eq(headers), 
eq(key));

Review Comment:
   We know that the topic name is set to `"dummy"`. Might be better to verify, 
too? Similar above.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java:
##########
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Windowed;
 
 public interface WindowedSerializer<T> extends Serializer<Windowed<T>> {
 
-    byte[] serializeBaseKey(String topic, Windowed<T> data);
+    byte[] serializeBaseKey(String topic, Headers headers, Windowed<T> data);

Review Comment:
   This change, and the corresponding related ones, made me realize some other 
issue. Of course, root cause is unrelated to this PR, but this change implies a 
public API change... I filed https://issues.apache.org/jira/browse/KAFKA-20313 
for future cleanup.
   
   But I am wondering if we need to do it differently for backward 
compatibility, and add a new overload which take `headers` with a default impl 
that calls the existing one dropping the headers?
   
   We would also need to documen this on the KIP. \cc @aliehsaeedii 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to