mjsax commented on a change in pull request #10720:
URL: https://github.com/apache/kafka/pull/10720#discussion_r634859119



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -141,16 +144,32 @@ public void close() {
 
         @Override
         public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
             parentGetter.init(context);
             valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
         }
 
         @Override
         public ValueAndTimestamp<V1> get(final K key) {
             final ValueAndTimestamp<V> valueAndTimestamp = 
parentGetter.get(key);
-            return ValueAndTimestamp.make(
+
+            final ProcessorRecordContext currentContext = 
internalProcessorContext.recordContext();

Review comment:
       This is the actual fix. Before calling `valueTransformer.transform()` we 
need to set a different record context to allow `Transform` to access the 
correct metadata of the record it processed.
   
   Before this fix, the context would contain metadata (in particular record 
timestamp) or the "currently processed record" that triggers the lookup. This 
breaks the applied `Transformer` if it accessed the record timestamp via 
`context.timestamp()` that is supposed to return the timestamp of the record 
`Transform` processes (ie, the timestamp store next to the value in the state 
store).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to