mjsax commented on a change in pull request #10720: URL: https://github.com/apache/kafka/pull/10720#discussion_r634859119
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java ########## @@ -141,16 +144,32 @@ public void close() { @Override public void init(final ProcessorContext context) { + internalProcessorContext = (InternalProcessorContext) context; parentGetter.init(context); valueTransformer.init(new ForwardingDisabledProcessorContext(context)); } @Override public ValueAndTimestamp<V1> get(final K key) { final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key); - return ValueAndTimestamp.make( + + final ProcessorRecordContext currentContext = internalProcessorContext.recordContext(); Review comment: This is the actual fix. Before calling `valueTransformer.transform()` we need to set a different record context to allow `Transform` to access the correct metadata of the record it processed. Before this fix, the context would contain metadata (in particular record timestamp) or the "currently processed record" that triggers the lookup. This breaks the applied `Transformer` if it accessed the record timestamp via `context.timestamp()` that is supposed to return the timestamp of the record `Transform` processes (ie, the timestamp store next to the value in the state store). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org