mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2934498413
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
);
+
+ // Create new headers object to isolate delete operation from input
record
+ final Headers newHeaders = oldAggregationWithHeaders != null
+ ? new RecordHeaders(headers(oldAggregationWithHeaders))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
Review Comment:
Should we move this inside try-catch-block?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
super(inner);
}
+ @Override
+ public byte[] delete(final Bytes key) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+ final byte[] oldValue = wrapped().get(key);
+
+ // Copy headers to the new headers object:
+ // - If old value exists, use its headers
+ // - Otherwise, use current context's headers
+ // Key doesn't exist - use current context headers
+ final RecordHeaders newHeaders = oldValue != null
+ ? new RecordHeaders(headers(oldValue))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers to avoid polluting input
record's context
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ oldValue != null ? timestamp(oldValue) :
currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
+
+ try {
+ final byte[] deletedValue = wrapped().delete(key);
+
+ // Log with null value - will use temporary.headers() which we
prepared above
+ log(key, null, temporaryContext.timestamp(),
temporaryContext.headers());
Review Comment:
```suggestion
log(key, null, temporaryContext.timestamp(), newHeaders);
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
super(inner);
}
+ @Override
+ public byte[] delete(final Bytes key) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+ final byte[] oldValue = wrapped().get(key);
+
+ // Copy headers to the new headers object:
+ // - If old value exists, use its headers
+ // - Otherwise, use current context's headers
+ // Key doesn't exist - use current context headers
+ final RecordHeaders newHeaders = oldValue != null
+ ? new RecordHeaders(headers(oldValue))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers to avoid polluting input
record's context
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ oldValue != null ? timestamp(oldValue) :
currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
Review Comment:
move inside `try` -- I guess also applies to other code?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
Review Comment:
Not sure if we really want to get the headers from the old value? I think
rather no? Don't see the need semantically, and it makes the operation twice as
expensive?
Let's hear what others think.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,20 +46,59 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
super(inner);
}
+ @Override
+ public byte[] delete(final Bytes key) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+ final byte[] oldValue = wrapped().get(key);
+
+ // Copy headers to the new headers object:
+ // - If old value exists, use its headers
+ // - Otherwise, use current context's headers
+ // Key doesn't exist - use current context headers
+ final RecordHeaders newHeaders = oldValue != null
+ ? new RecordHeaders(headers(oldValue))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers to avoid polluting input
record's context
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ oldValue != null ? timestamp(oldValue) :
currentContext.timestamp(),
Review Comment:
I don't think we should fetch the old value, and can just use
`currentContext.timestamp()` (you also did it this way in
`ChangeLoggingSessionBytesStoreWithHeaders`)
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
);
+
+ // Create new headers object to isolate delete operation from input
record
+ final Headers newHeaders = oldAggregationWithHeaders != null
+ ? new RecordHeaders(headers(oldAggregationWithHeaders))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
+
+ try {
+ wrapped().put(sessionKey, null);
Review Comment:
Wondering if using `remove(sessionKey)` would be nicer? Not sure if there is
a actual difference?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
);
+
+ // Create new headers object to isolate delete operation from input
record
+ final Headers newHeaders = oldAggregationWithHeaders != null
+ ? new RecordHeaders(headers(oldAggregationWithHeaders))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
+
+ try {
+ wrapped().put(sessionKey, null);
+
+ // Log change - will use temporaryContext.headers()
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
Review Comment:
This call shows the a problem in the current PR -- what we need to fix is,
that when the "user key" (ie, the `key` wrapped by `Windowed<>`) gets
serializer, we need to pass in headers into the serializer. Thus, we needthe
fix in the metered-layer which does the serialization. Here, we already have
`Windowed<Bytes> sessionKey` at hand and passing headers into `toBinary`
doesn't make sense, as nothing get really serialized any longer.
All your code changes must go into metered-layer (plus passing the header
into the key-serializer), and changelogging layer can stay unmodified as it
already does the right thing: it uses `internalContext.logChange(...,
iternalContext.recordContext().headers(),...` in `remove()` and also for
`put(key, null)` case.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
);
+
+ // Create new headers object to isolate delete operation from input
record
+ final Headers newHeaders = oldAggregationWithHeaders != null
+ ? new RecordHeaders(headers(oldAggregationWithHeaders))
+ : new RecordHeaders(currentContext.headers());
Review Comment:
Interesting idea -- I originally thought it would be sufficient to just use
`new RecordHeaders()`. But making a deep copy of the current context headers is
neat. I like it.
Also curious what other think.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,29 +46,70 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
- wrapped().remove(sessionKey);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- null,
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
- wrapped().getPosition()
- );
+ handleDelete(sessionKey);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregationWithHeaders) {
- wrapped().put(sessionKey, aggregationWithHeaders);
- internalContext.logChange(
- name(),
- SessionKeySchema.toBinary(sessionKey),
- rawAggregation(aggregationWithHeaders),
- internalContext.recordContext().timestamp(),
- aggregationWithHeaders == null
- ? internalContext.recordContext().headers()
- : headers(aggregationWithHeaders),
- wrapped().getPosition()
+ if (aggregationWithHeaders == null) {
+ // Deletion path (put with null) - use same logic as remove()
+ handleDelete(sessionKey);
+ } else {
+ // Normal put path
+ wrapped().put(sessionKey, aggregationWithHeaders);
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ rawAggregation(aggregationWithHeaders),
+ internalContext.recordContext().timestamp(),
+ headers(aggregationWithHeaders),
+ wrapped().getPosition()
+ );
+ }
+ }
+
+ private void handleDelete(final Windowed<Bytes> sessionKey) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Fetch old value to extract its headers (if exists)
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
);
+
+ // Create new headers object to isolate delete operation from input
record
+ final Headers newHeaders = oldAggregationWithHeaders != null
+ ? new RecordHeaders(headers(oldAggregationWithHeaders))
+ : new RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext =
+ new ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ internalContext.setRecordContext(temporaryContext);
+
+ try {
+ wrapped().put(sessionKey, null);
+
+ // Log change - will use temporaryContext.headers()
+ internalContext.logChange(
+ name(),
+ SessionKeySchema.toBinary(sessionKey),
+ null,
+ temporaryContext.timestamp(),
+ temporaryContext.headers(),
Review Comment:
```suggestion
newHeaders,
```
We can save a method call :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]