mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ########## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode<?, ?, ?, ?> currentNode; - private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behaviro by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ########## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode<?, ?, ?, ?> currentNode; - private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behavior by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org