vvcephei commented on a change in pull request #10720:
URL: https://github.com/apache/kafka/pull/10720#discussion_r635592113
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -141,16 +144,32 @@ public void close() {
@Override
public void init(final ProcessorContext context) {
+ internalProcessorContext = (InternalProcessorContext) context;
parentGetter.init(context);
valueTransformer.init(new
ForwardingDisabledProcessorContext(context));
}
@Override
public ValueAndTimestamp<V1> get(final K key) {
final ValueAndTimestamp<V> valueAndTimestamp =
parentGetter.get(key);
- return ValueAndTimestamp.make(
+
+ final ProcessorRecordContext currentContext =
internalProcessorContext.recordContext();
+
+ internalProcessorContext.setRecordContext(new
ProcessorRecordContext(
Review comment:
Thanks, all. I share the concern that this change could suddenly break
something. Since the transformer has access to the ProcessorContext, people
well may be accessing these members within the `transform` method. For example,
a transformer may suddenly start to get an NPE on the topic name, and so forth.
This behavior change might be ok on the grounds that the context we
previously provided was actually incorrect, though. If we do go ahead with this
approach, we should be sure to make note of this potential in the upgrade
notes. It wouldn't hurt to have a ticket as well, for visibility's sake.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]