[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r564992729 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. Review comment: nit: ``` `System.currentTimeMillis()` -> {@code System.currentTimeMillis()} or: {link System#currentTimeMillis()} ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @rohitrmd did you see Guozhang comment? Seems we should not remove this variable (what is a little unfortunate, but I guess desirable behavior) -- maybe we can rename it to `cachedSystemTimeMs`? I guess we want the caching for the `GlobalProcessorContext`, too? (For this case, we would need to update the JavaDocs of the `ProcessorContext` interface accordingly.) We should also add a test for the corresponding contest-test classes that they don't advance system time automatically, but only in the advance is triggered explicitly. ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java ## @@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, final TaskId taskId super(config, taskId, stateDir); } -@Override -public void setSystemTimeMs(long timeMs) { Review comment: I guess a follow up PR works, too. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. + * + * + * For a global processor, Kafka Streams does not cache system time and thus calling this method will return + * the same value as `System.currentTimeMillis()`. Review comment: as above ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. + * + * + * For a global processor, Kafka Streams does not cache system time and thus calling this method will return + * the same value as `System.currentTimeMillis()`. + * + * @return the current system timestamp in milliseconds + */ +long currentSystemTimeMs(); + +/** + * Return the current stream-time in milliseconds. + * + * + * Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far + * (including the currently processed record), i.e., it can be considered a high-watermark. + * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. + * + * + * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) + * and {@link StreamsBuilder#addGlobalStore} (...), + * because there is no concept of stream-time for this case. + * Calling this method in a global processor with result in an {@link UnsupportedOperationException}. Review comment: typo: `with` -> `will` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r568389885 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: If there is an advantage to add the method to the `InternalProcessorContext ` interface, sure why not. Don't think it's a big difference, and because it all internal code, we can change it anytime in the future. I think we should always return the cached time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r568389885 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: If there is an advantage to add the method to the `InternalProcessorContext ` interface, sure why not. Don't think it's a big difference, and because it all internal code, we can change it anytime in the future. I think we should always return the cached time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behaviro by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behavior by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behaviro by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behavior by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r586727033 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to {@code System.currentTimeMillis()} . + * Review comment: nit. can be removed ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) { * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. * - * @param timestamp A record timestamp + * @param recordTimestamp A record timestamp */ @SuppressWarnings({"WeakerAccess", "unused"}) -public void setTimestamp(final long timestamp) { -this.timestamp = timestamp; +public void setRecordTimestamp(final long recordTimestamp) { +this.recordTimestamp = recordTimestamp; +} + +public void setCurrentSystemTimeMs(final long currentSystemTimeMs) { +this.currentSystemTimeMs = currentSystemTimeMs; +} + +public void setCurrentStreamTimeMs(final long currentStreamTimeMs) { +this.currentStreamTimeMs = currentStreamTimeMs; Review comment: This class is public API, so we cannot remove `setTimestamp` but can only deprecate it. We also need to update the KIP to mention the deprecation and the newly added methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r590766321 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to {@code System.currentTimeMillis()} Review comment: ```suggestion * Thus, it may return a different value compared to {@code System.currentTimeMillis()}. ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to {@code System.currentTimeMillis()} + * Review comment: ```suggestion ``` ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -354,10 +372,31 @@ public void setHeaders(final Headers headers) { * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. * * @param timestamp A record timestamp + * @deprecated Use {@link MockProcessorContext#setRecordTimestamp(long)} instead. Review comment: ```suggestion * @deprecated Since 3.0.0; use {@link MockProcessorContext#setRecordTimestamp(long)} instead. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Returns current cached wall-clock system timestamp in milliseconds. Review comment: nit: `Return` without `s` -- we use imperative to write JavaDocs. I would remove `cached` and add a dedicated second sentence about it. Also `wall-clock time` and `system time` are synonymous and thus `wall-clock system time` sounds a little odd. What about: ``` Return the current system timestamp (also called wall-clock time) in milliseconds. Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. Thus, it may return a different value compared to `System.currentTimeMillis()`. For a global processor, Kafka Streams does not cache system time and thus calling this method will return the same value as `System.currentTimeMillis()`. @return the current system timestamp in milliseconds ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java ## @@ -166,4 +166,14 @@ public long timestamp() { public Map appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + +@Override +public long currentSystemTimeMs() { +throw new UnsupportedOperationException("this method is not supported in ForwardingDisabledProcessor context"); Review comment: Why do we through here? It seems to be safe to get the time from `delegate` object? We only disable `forward` but nothing else. (same below) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ## @@ -115,6 +116,16 @@ public void commit() { //no-op } +@Override +public long currentSystemTimeMs() { +return Time.SYSTEM.milliseconds(); +} + +@Override +public long currentStreamTimeMs() { +throw new UnsupportedOperationException("this method is not supported in global processor context."); Review comment: This makes sense, but we might want to document this somewhere else, ie, in the corresponding docs. To be fair, not sure atm, if we have much content about it and/or where it add it? We should for sure document it in the JavaDocs, ie, the `ProcessorContext` interface. (cf my comment above) About the error message: ``` throw new UnsupportedOperationException("There is no concept of stream-time for a global processor."); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Returns current cached wall-clock system timestamp in milliseconds. + * + * @return the current cached wall-clock system timestamp in milliseconds + */ +long currentSystemTimeMs(); + +/** + * Returns the maximum timestamp of any record yet processed by the task. Review comment: What about: ``` Return the current stream-time in milliseconds. Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far (including the currently processed record), i.e., it can be considered a high-watermark. Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore(...)} and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept of stream-time for this case. Calling this method in a global processor with result in an {@link UnsupportedOperationException}. @return the current stream-time in milliseconds ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java ## @@ -160,4 +160,14 @@ public long timestamp() { public Map appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + +@Override +public long currentSystemTimeMs() { +throw new UnsupportedOperationException("this method is not supported in StoreToProcessorContextAdapter"); Review comment: We should align the error message to the ones from above: `"StateStores can't access system time."` (same below). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -,13 +,21 @@ RecordCollector recordCollector() {
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r549547331 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java ## @@ -166,4 +166,14 @@ public long timestamp() { public Map appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + +@Override +public long currentSystemTimeMs() { +throw new UnsupportedOperationException("this method is not supported in ForwardingDisabledProcessor context"); Review comment: Why do we throw here? It seems to be safe to get the time from `delegate` object? We only disable `forward` but nothing else. (same below) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r556206559 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -494,7 +494,9 @@ public void process(final Record record) { // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L))); -task.process(10L); +time = new MockTime(0L, 10L, 0L); Review comment: In L159, we define a global mock time object: ``` private MockTime time = new MockTime(); ``` that is passed into `StreamTask`. You can call `time.sleep(...)` to advance the mock time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r556207151 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ## @@ -39,9 +39,7 @@ private ProcessorContextUtils() {} * removing the need for this method. */ public static long getCurrentSystemTime(final ProcessorContext context) { Review comment: nit (not introduced in this PR, but would be a nice side cleanup): rename to `currentSystemTime(...)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r556207831 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @guozhangwang -- I am just wondering if removing this variable (what I think conceptually make sense) might have an perf impact as it seems `currentSystemTimeMs` acts as some kind of cache -- and instead of using the cached time, we would call `time.currentTimeMillis()` in `ProcessorContextImpl#forwardInternal()` now. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r556207831 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @guozhangwang -- I am just wondering if removing this variable (what I think conceptually makes sense) might have an perf impact as it seems `currentSystemTimeMs` acts as some kind of cache -- and instead of using the cached time, we would call `time.currentTimeMillis()` in `ProcessorContextImpl#forwardInternal()` now. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org