mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2934498413


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
         );
+
+        // Create new headers object to isolate delete operation from input 
record
+        final Headers newHeaders = oldAggregationWithHeaders != null
+            ? new RecordHeaders(headers(oldAggregationWithHeaders))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);

Review Comment:
   Should we move this inside try-catch-block?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         super(inner);
     }
 
+    @Override
+    public byte[] delete(final Bytes key) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+        final byte[] oldValue = wrapped().get(key);
+
+        // Copy headers to the new headers object:
+        // - If old value exists, use its headers
+        // - Otherwise, use current context's headers
+        // Key doesn't exist - use current context headers
+        final RecordHeaders newHeaders = oldValue != null
+            ? new RecordHeaders(headers(oldValue))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers to avoid polluting input 
record's context
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                oldValue != null ? timestamp(oldValue) : 
currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);
+
+        try {
+            final byte[] deletedValue = wrapped().delete(key);
+
+            // Log with null value - will use temporary.headers() which we 
prepared above
+            log(key, null, temporaryContext.timestamp(), 
temporaryContext.headers());

Review Comment:
   ```suggestion
               log(key, null, temporaryContext.timestamp(), newHeaders);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         super(inner);
     }
 
+    @Override
+    public byte[] delete(final Bytes key) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+        final byte[] oldValue = wrapped().get(key);
+
+        // Copy headers to the new headers object:
+        // - If old value exists, use its headers
+        // - Otherwise, use current context's headers
+        // Key doesn't exist - use current context headers
+        final RecordHeaders newHeaders = oldValue != null
+            ? new RecordHeaders(headers(oldValue))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers to avoid polluting input 
record's context
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                oldValue != null ? timestamp(oldValue) : 
currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);

Review Comment:
   move inside `try` -- I guess also applies to other code?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(

Review Comment:
   Not sure if we really want to get the headers from the old value? I think 
rather no? Don't see the need semantically, and it makes the operation twice as 
expensive?
   
   Let's hear what others think.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         super(inner);
     }
 
+    @Override
+    public byte[] delete(final Bytes key) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+        final byte[] oldValue = wrapped().get(key);
+
+        // Copy headers to the new headers object:
+        // - If old value exists, use its headers
+        // - Otherwise, use current context's headers
+        // Key doesn't exist - use current context headers
+        final RecordHeaders newHeaders = oldValue != null
+            ? new RecordHeaders(headers(oldValue))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers to avoid polluting input 
record's context
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                oldValue != null ? timestamp(oldValue) : 
currentContext.timestamp(),

Review Comment:
   I don't think we should fetch the old value, and can just use 
`currentContext.timestamp()` (you also did it this way in 
`ChangeLoggingSessionBytesStoreWithHeaders`)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
         );
+
+        // Create new headers object to isolate delete operation from input 
record
+        final Headers newHeaders = oldAggregationWithHeaders != null
+            ? new RecordHeaders(headers(oldAggregationWithHeaders))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);
+
+        try {
+            wrapped().put(sessionKey, null);

Review Comment:
   Wondering if using `remove(sessionKey)` would be nicer? Not sure if there is 
a actual difference?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
         );
+
+        // Create new headers object to isolate delete operation from input 
record
+        final Headers newHeaders = oldAggregationWithHeaders != null
+            ? new RecordHeaders(headers(oldAggregationWithHeaders))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);
+
+        try {
+            wrapped().put(sessionKey, null);
+
+            // Log change - will use temporaryContext.headers()
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),

Review Comment:
   This call shows the a problem in the current PR -- what we need to fix is, 
that when the "user key" (ie, the `key` wrapped by `Windowed<>`) gets 
serializer, we need to pass in headers into the serializer. Thus, we needthe 
fix in the metered-layer which does the serialization. Here, we already have 
`Windowed<Bytes> sessionKey` at hand and passing headers into `toBinary` 
doesn't make sense, as nothing get really serialized any longer.
   
   All your code changes must go into metered-layer (plus passing the header 
into the key-serializer), and changelogging layer can stay unmodified as it 
already does the right thing: it uses `internalContext.logChange(..., 
iternalContext.recordContext().headers(),...` in `remove()` and also for 
`put(key, null)` case.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
         );
+
+        // Create new headers object to isolate delete operation from input 
record
+        final Headers newHeaders = oldAggregationWithHeaders != null
+            ? new RecordHeaders(headers(oldAggregationWithHeaders))
+            : new RecordHeaders(currentContext.headers());

Review Comment:
   Interesting idea -- I originally thought it would be sufficient to just use 
`new RecordHeaders()`. But making a deep copy of the current context headers is 
neat. I like it.
   
   Also curious what other think.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
-        wrapped().remove(sessionKey);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            null,
-            internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
-            wrapped().getPosition()
-        );
+        handleDelete(sessionKey);
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
-        wrapped().put(sessionKey, aggregationWithHeaders);
-        internalContext.logChange(
-            name(),
-            SessionKeySchema.toBinary(sessionKey),
-            rawAggregation(aggregationWithHeaders),
-            internalContext.recordContext().timestamp(),
-            aggregationWithHeaders == null
-                ? internalContext.recordContext().headers()
-                : headers(aggregationWithHeaders),
-            wrapped().getPosition()
+        if (aggregationWithHeaders == null) {
+            // Deletion path (put with null) - use same logic as remove()
+            handleDelete(sessionKey);
+        } else {
+            // Normal put path
+            wrapped().put(sessionKey, aggregationWithHeaders);
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                rawAggregation(aggregationWithHeaders),
+                internalContext.recordContext().timestamp(),
+                headers(aggregationWithHeaders),
+                wrapped().getPosition()
+            );
+        }
+    }
+
+    private void handleDelete(final Windowed<Bytes> sessionKey) {
+        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+        // Fetch old value to extract its headers (if exists)
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
         );
+
+        // Create new headers object to isolate delete operation from input 
record
+        final Headers newHeaders = oldAggregationWithHeaders != null
+            ? new RecordHeaders(headers(oldAggregationWithHeaders))
+            : new RecordHeaders(currentContext.headers());
+
+        // Create temporary context with new headers
+        final ProcessorRecordContext temporaryContext =
+            new ProcessorRecordContext(
+                currentContext.timestamp(),
+                currentContext.offset(),
+                currentContext.partition(),
+                currentContext.topic(),
+                newHeaders
+            );
+
+        internalContext.setRecordContext(temporaryContext);
+
+        try {
+            wrapped().put(sessionKey, null);
+
+            // Log change - will use temporaryContext.headers()
+            internalContext.logChange(
+                name(),
+                SessionKeySchema.toBinary(sessionKey),
+                null,
+                temporaryContext.timestamp(),
+                temporaryContext.headers(),

Review Comment:
   ```suggestion
                   newHeaders,
   ```
   We can save a method call :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to