eduwercamacaro commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2330605579
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
// initialize the task by initializing all its processor nodes in the
topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ time.milliseconds(),
Review Comment:
You are right; if the init method executes for a long time, all records will
have the same timestamp. That’s an excellent point.
I would say that using the same timestamp for every record stored during the
init method execution is not very accurate.
I like your idea of providing a function that returns the current timestamp,
but I believe we should add a specialized class (let’s say
`InitProcessorRecordContext`) that extends from ProcessorRecordContext, so the
normal record processing won’t have any impact because we will use the
InitProcessorRecordContext just for the init method.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
// initialize the task by initializing all its processor nodes in the
topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ time.milliseconds(),
Review Comment:
I'm thinking in something like this:
```java
public class InitProcessorRecordContext extends ProcessorRecordContext {
....
public InitProcessorRecordContext(Time time) {
super(time.milliseconds(), -1, -1, null, new RecordHeaders());
this.time = time;
}
@Override
public long timestamp() {
return time.milliseconds();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]