mjsax commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2299384167
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
// initialize the task by initializing all its processor nodes in the
topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ time.milliseconds(),
Review Comment:
Not sure if this would be what we want? We would set a single TS, and if
`init()` runs for a longer period of time, all records would get the same TS?
We might need to change `ProcessorRecordContext` to have a "time function"
instead of a hard-coded value? For a record at hand, the time function would be
`() -> record.timestamp()` and return the same value each time, and for this
dummy context, we would use `() -> time.milliseconds()`.
But need to think about this more, and figure out what design we really want
to get to, and what the splash radius of changing `ProcessorRecordContext`
would be :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]