[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-03-09 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r590766321



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to {@code 
System.currentTimeMillis()}

Review comment:
   ```suggestion
* Thus, it may return a different value compared to {@code 
System.currentTimeMillis()}.
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to {@code 
System.currentTimeMillis()}
+ * 

Review comment:
   ```suggestion
   ```

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -354,10 +372,31 @@ public void setHeaders(final Headers headers) {
  * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
  *
  * @param timestamp A record timestamp
+ * @deprecated Use {@link MockProcessorContext#setRecordTimestamp(long)} 
instead.

Review comment:
   ```suggestion
* @deprecated Since 3.0.0; use {@link 
MockProcessorContext#setRecordTimestamp(long)} instead.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-03-03 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r586727033



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,33 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to {@code 
System.currentTimeMillis()} .
+ * 

Review comment:
   nit. can be removed

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) {
  * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
  * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
  *
- * @param timestamp A record timestamp
+ * @param recordTimestamp A record timestamp
  */
 @SuppressWarnings({"WeakerAccess", "unused"})
-public void setTimestamp(final long timestamp) {
-this.timestamp = timestamp;
+public void setRecordTimestamp(final long recordTimestamp) {
+this.recordTimestamp = recordTimestamp;
+}
+
+public void setCurrentSystemTimeMs(final long currentSystemTimeMs) {
+this.currentSystemTimeMs = currentSystemTimeMs;
+}
+
+public void setCurrentStreamTimeMs(final long currentStreamTimeMs) {
+this.currentStreamTimeMs = currentStreamTimeMs;

Review comment:
   This class is public API, so we cannot remove `setTimestamp` but can 
only deprecate it.
   
   We also need to update the KIP to mention the deprecation and the newly 
added methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-05 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behaviro by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behavior by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behavior by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behaviro by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-02 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r568389885



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   If there is an advantage to add the method to the 
`InternalProcessorContext ` interface, sure why not. Don't think it's a big 
difference, and because it all internal code, we can change it anytime in the 
future.
   
   I think we should always return the cached time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-01 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r568389885



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   If there is an advantage to add the method to the 
`InternalProcessorContext ` interface, sure why not. Don't think it's a big 
difference, and because it all internal code, we can change it anytime in the 
future.
   
   I think we should always return the cached time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-26 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r564992729



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.

Review comment:
   nit:
   ```
   `System.currentTimeMillis()` -> {@code System.currentTimeMillis()} 
   
   or: {link System#currentTimeMillis()}
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @rohitrmd did you see Guozhang comment? Seems we should not remove this 
variable (what is a little unfortunate, but I guess desirable behavior) -- 
maybe we can rename it to `cachedSystemTimeMs`?
   
   I guess we want the caching for the `GlobalProcessorContext`, too? (For this 
case, we would need to update the JavaDocs of the `ProcessorContext` interface 
accordingly.)
   
   We should also add a test for the corresponding contest-test classes that 
they don't advance system time automatically, but only in the advance is 
triggered explicitly.

##
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##
@@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, 
final TaskId taskId
 super(config, taskId, stateDir);
 }
 
-@Override
-public void setSystemTimeMs(long timeMs) {

Review comment:
   I guess a follow up PR works, too.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
+ * 
+ *
+ * For a global processor, Kafka Streams does not cache system time and 
thus calling this method will return
+ * the same value as `System.currentTimeMillis()`.

Review comment:
   as above

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ * 
+ * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+ * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
+ * 
+ *
+ * For a global processor, Kafka Streams does not cache system time and 
thus calling this method will return
+ * the same value as `System.currentTimeMillis()`.
+ *
+ * @return the current system timestamp in milliseconds
+ */
+long currentSystemTimeMs();
+
+/**
+ * Return the current stream-time in milliseconds.
+ *
+ * 
+ * Stream-time is the maximum observed {@link TimestampExtractor record 
timestamp} so far
+ * (including the currently processed record), i.e., it can be considered 
a high-watermark.
+ * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
+ * 
+ *
+ * Note: this method is not supported for global processors (cf. {@link 
Topology#addGlobalStore} (...)
+ * and {@link StreamsBuilder#addGlobalStore} (...),
+ * because there is no concept of stream-time for this case.
+ * Calling this method in a global processor with result in an {@link 
UnsupportedOperationException}.

Review comment:
   typo: `with` -> `will`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-12 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r556207831



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @guozhangwang -- I am just wondering if removing this variable (what I 
think conceptually makes sense) might have an perf impact as it seems 
`currentSystemTimeMs` acts as some kind of cache -- and instead of using the 
cached time, we would call `time.currentTimeMillis()` in 
`ProcessorContextImpl#forwardInternal()` now.
   
   Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-12 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r556207831



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @guozhangwang -- I am just wondering if removing this variable (what I 
think conceptually make sense) might have an perf impact as it seems 
`currentSystemTimeMs` acts as some kind of cache -- and instead of using the 
cached time, we would call `time.currentTimeMillis()` in 
`ProcessorContextImpl#forwardInternal()` now.
   
   Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-12 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r556207151



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -39,9 +39,7 @@ private ProcessorContextUtils() {}
  * removing the need for this method.
  */
 public static long getCurrentSystemTime(final ProcessorContext context) {

Review comment:
   nit (not introduced in this PR, but would be a nice side cleanup): 
rename to `currentSystemTime(...)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-12 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r556206559



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -494,7 +494,9 @@ public void process(final Record record) {
 
 // e2e latency = 10
 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
-task.process(10L);
+time = new MockTime(0L, 10L, 0L);

Review comment:
   In L159, we define a global mock time object:
   ```
   private MockTime time = new MockTime();
   ```
   
   that is passed into `StreamTask`. You can call `time.sleep(...)` to advance 
the mock time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-12 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r549547331



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##
@@ -166,4 +166,14 @@ public long timestamp() {
 public Map appConfigsWithPrefix(final String prefix) {
 return delegate.appConfigsWithPrefix(prefix);
 }
+
+@Override
+public long currentSystemTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in ForwardingDisabledProcessor context");

Review comment:
   Why do we throw here? It seems to be safe to get the time from 
`delegate` object? We only disable `forward` but nothing else. (same below)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Returns current cached wall-clock system timestamp in milliseconds.

Review comment:
   nit: `Return` without `s` -- we use imperative to write JavaDocs.
   
   I would remove `cached` and add a dedicated second sentence about it. Also 
`wall-clock time` and `system time` are synonymous and thus `wall-clock system 
time` sounds a little odd.
   
   What about:
   ```
   Return the current system timestamp (also called wall-clock time) in 
milliseconds.
   
   Note: this method returns the internally cached system timestamp from the 
Kafka Stream runtime. Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
   
   For a global processor, Kafka Streams does not cache system time and thus 
calling this method will return the same value as `System.currentTimeMillis()`.
   
   @return the current system timestamp in milliseconds
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##
@@ -166,4 +166,14 @@ public long timestamp() {
 public Map appConfigsWithPrefix(final String prefix) {
 return delegate.appConfigsWithPrefix(prefix);
 }
+
+@Override
+public long currentSystemTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in ForwardingDisabledProcessor context");

Review comment:
   Why do we through here? It seems to be safe to get the time from 
`delegate` object? We only disable `forward` but nothing else. (same below)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##
@@ -115,6 +116,16 @@ public void commit() {
 //no-op
 }
 
+@Override
+public long currentSystemTimeMs() {
+return Time.SYSTEM.milliseconds();
+}
+
+@Override
+public long currentStreamTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in global processor context.");

Review comment:
   This makes sense, but we might want to document this somewhere else, ie, 
in the corresponding docs. To be fair, not sure atm, if we have much content 
about it and/or where it add it?
   
   We should for sure document it in the JavaDocs, ie, the `ProcessorContext` 
interface. (cf my comment above)
   
   About the error message:
   ```
   throw new UnsupportedOperationException("There is no concept of stream-time 
for a global processor.");
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Returns current cached wall-clock system timestamp in milliseconds.
+ *
+ * @return the current cached wall-clock system timestamp in milliseconds
+ */
+long currentSystemTimeMs();
+
+/**
+ * Returns the maximum timestamp of any record yet processed by the task.

Review comment:
   What about:
   ```
   Return the current stream-time in milliseconds.
   
   Stream-time is the maximum observed {@link TimestampExtractor record 
timestamp} so far
   (including the currently processed record), i.e., it can be considered a 
high-watermark.
   Stream-time is tracked on a per-task basis and is preserved across restarts 
and during task migration.
   
   Note: this method is not supported for global processors (cf. {@link 
Topology#addGlobalStore(...)}
   and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept 
of stream-time for this case.
   Calling this method in a global processor with result in an {@link 
UnsupportedOperationException}.
   
   @return the current stream-time in milliseconds
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##
@@ -160,4 +160,14 @@ public long timestamp() {
 public Map appConfigsWithPrefix(final String prefix) {
 return delegate.appConfigsWithPrefix(prefix);
 }
+
+@Override
+public long currentSystemTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in StoreToProcessorContextAdapter");

Review comment:
   We should align the error message to the ones from above: `"StateStores 
can't access system time."` (same below).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -,13 +,21 @@ RecordCollector recordCollector() {