mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
      */
     Map<String, Object> appConfigsWithPrefix(final String prefix);
 
+    /**
+     * Returns current cached wall-clock system timestamp in milliseconds.

Review comment:
       nit: `Return` without `s` -- we use imperative to write JavaDocs.
   
   I would remove `cached` and add a dedicated second sentence about it. Also 
`wall-clock time` and `system time` are synonymous and thus `wall-clock system 
time` sounds a little odd.
   
   What about:
   ```
   Return the current system timestamp (also called wall-clock time) in 
milliseconds.
   <p>
   Note: this method returns the internally cached system timestamp from the 
Kafka Stream runtime. Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
   <p>
   For a global processor, Kafka Streams does not cache system time and thus 
calling this method will return the same value as `System.currentTimeMillis()`.
   
   @return the current system timestamp in milliseconds
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##########
@@ -166,4 +166,14 @@ public long timestamp() {
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return delegate.appConfigsWithPrefix(prefix);
     }
+
+    @Override
+    public long currentSystemTimeMs() {
+        throw new UnsupportedOperationException("this method is not supported 
in ForwardingDisabledProcessor context");

Review comment:
       Why do we through here? It seems to be safe to get the time from 
`delegate` object? We only disable `forward` but nothing else. (same below)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -115,6 +116,16 @@ public void commit() {
         //no-op
     }
 
+    @Override
+    public long currentSystemTimeMs() {
+        return Time.SYSTEM.milliseconds();
+    }
+
+    @Override
+    public long currentStreamTimeMs() {
+        throw new UnsupportedOperationException("this method is not supported 
in global processor context.");

Review comment:
       This makes sense, but we might want to document this somewhere else, ie, 
in the corresponding docs. To be fair, not sure atm, if we have much content 
about it and/or where it add it?
   
   We should for sure document it in the JavaDocs, ie, the `ProcessorContext` 
interface. (cf my comment above)
   
   About the error message:
   ```
   throw new UnsupportedOperationException("There is no concept of stream-time 
for a global processor.");
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
      */
     Map<String, Object> appConfigsWithPrefix(final String prefix);
 
+    /**
+     * Returns current cached wall-clock system timestamp in milliseconds.
+     *
+     * @return the current cached wall-clock system timestamp in milliseconds
+     */
+    long currentSystemTimeMs();
+
+    /**
+     * Returns the maximum timestamp of any record yet processed by the task.

Review comment:
       What about:
   ```
   Return the current stream-time in milliseconds.
   <p>
   Stream-time is the maximum observed {@link TimestampExtractor record 
timestamp} so far
   (including the currently processed record), i.e., it can be considered a 
high-watermark.
   Stream-time is tracked on a per-task basis and is preserved across restarts 
and during task migration.
   <p>
   Note: this method is not supported for global processors (cf. {@link 
Topology#addGlobalStore(...)}
   and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept 
of stream-time for this case.
   Calling this method in a global processor with result in an {@link 
UnsupportedOperationException}.
   
   @return the current stream-time in milliseconds
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##########
@@ -160,4 +160,14 @@ public long timestamp() {
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return delegate.appConfigsWithPrefix(prefix);
     }
+
+    @Override
+    public long currentSystemTimeMs() {
+        throw new UnsupportedOperationException("this method is not supported 
in StoreToProcessorContextAdapter");

Review comment:
       We should align the error message to the ones from above: `"StateStores 
can't access system time."` (same below).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1111,13 +1111,21 @@ RecordCollector recordCollector() {
         return recordCollector;
     }
 
+    public long streamTime() {

Review comment:
       Why is this `public` now? It seems it's only newly called in 
`ProcessorContextImpl` that is in the same package and thus it can stay 
(default) package-private.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -70,7 +70,8 @@
     // visible for testing
     static final byte LATEST_MAGIC_BYTE = 1;
 
-    private final Time time;
+    // This time is wall clock time

Review comment:
       Yes, `Time` is always wall-clock time -- comment seems unnecessary

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1111,13 +1111,21 @@ RecordCollector recordCollector() {
         return recordCollector;
     }
 
+    public long streamTime() {
+        return partitionGroup.streamTime();
+    }
+
+    public long currentSystemTimeMs() {
+        return time.milliseconds(); 
+    }
+
     // below are visible for testing only
     int numBuffered() {
         return partitionGroup.numBuffered();
     }
 
-    long streamTime() {
-        return partitionGroup.streamTime();
+    void setTime(final Time time) {

Review comment:
       I don't think we need this method (cf. my other comment in the test code 
where it is used).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -555,6 +559,12 @@ public void 
shouldThrowUnsupportedOperationExceptionOnRecordContext() {
         );
     }
 
+    @Test
+    public void shouldMatchSystemAndStreamTime() {

Review comment:
       This should be two tests, one for each method.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -70,7 +70,8 @@
     // visible for testing
     static final byte LATEST_MAGIC_BYTE = 1;
 
-    private final Time time;
+    // This time is wall clock time
+    private Time time;

Review comment:
       I think we should keep this as `final` (compare my other comments).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapterTest.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class StoreToProcessorContextAdapterTest {

Review comment:
       Thanks for adding a test!!!
   
   Would you mind to add tests for all other methods of 
`StoreToProcessorContextAdapter`, too, to get full test coverage as a side 
improvement?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -494,7 +494,9 @@ public void process(final Record<Integer, Integer> record) {
 
         // e2e latency = 10
         task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
-        task.process(10L);
+        time = new MockTime(0L, 10L, 0L);
+        task.setTime(time);
+        task.process(time.milliseconds());

Review comment:
       Why do we not keep `task.process(10L);` ? Similar below.

##########
File path: 
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##########
@@ -369,6 +370,16 @@ public long timestamp() {
         return recordContext.timestamp();
     }
 
+    @Override
+    public long currentSystemTimeMs() {
+        return Time.SYSTEM.milliseconds();

Review comment:
       We should pass in a `Time` parameter into the constructor of 
`InternalMockProcessorContext` and return `time.milliseconds()` here so we can 
mock the time.
   
   If this method is unused, it's also ok to just throw an exception for now 
and we add mocking capabilities later when needed.
   
   (Same below.)

##########
File path: streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
##########
@@ -120,6 +120,16 @@ public Cancellable schedule(final Duration interval,
     @Override
     public void commit() {}
 
+    @Override
+    public long currentSystemTimeMs() {
+        throw new UnsupportedOperationException("this method is not supported 
in NoOpProcessorContext");

Review comment:
       Nit: I guess we could add support if needed. Maybe better say `"Not 
implemented yet."` ? (Same below.)

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, 
final TaskId taskId
         super(config, taskId, stateDir);
     }
 
-    @Override
-    public void setSystemTimeMs(long timeMs) {

Review comment:
       The purpose of this class seems to be to allow mocking the time -- given 
that we add proper time support now, I am wondering if we can actually delete 
the whole class?

##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -261,6 +261,16 @@ public TaskId taskId() {
         return config.originalsWithPrefix(prefix);
     }
 
+    @Override
+    public long currentSystemTimeMs() {
+        return timestamp;

Review comment:
       `timestamp` is the current record timestamp and thus should not be 
returned here (same below).
   
   I seems, we need to actually extend this test-util class, what is a small 
change to the KIP. Note, that test-utils are public API and thus those changes 
also need a KIP. You don't need to worry about your KIP, we can just update it 
accordingly.
   
   Seems, we need to have two more internal fields, one for each time. The 
public API change would be to add corresponding setters. We might also rename 
`setTimestamp` to `setRecordTimestamp` for clarity, we we would have three 
setters for record, system, and stream time.

##########
File path: 
streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -1878,4 +1877,33 @@ public void shouldRespectTaskIdling() {
             );
         }
     }
+
+    @Test
+    public void shouldMatchSystemTime() {

Review comment:
       I think we can remove those two new tests.
   
   If you really want to test it, you should rather add a `Processor` to the 
Topology that gets the current system and stream time and verify it this way, 
ie, we should rather add an integration test than a unit test.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -115,6 +116,16 @@ public void commit() {
         //no-op
     }
 
+    @Override
+    public long currentSystemTimeMs() {
+        return Time.SYSTEM.milliseconds();

Review comment:
       We should pass in a `Time` reference to the `GlobalProcessorContextImpl` 
and call `time.milliseconds()` here -- otherwise, we cannot mock the time (cf. 
`TopologyTestDriver`) for testing

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1111,13 +1111,21 @@ RecordCollector recordCollector() {
         return recordCollector;
     }
 
+    public long streamTime() {
+        return partitionGroup.streamTime();
+    }
+
+    public long currentSystemTimeMs() {

Review comment:
       As above: seems this can be package-private?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##########
@@ -179,6 +179,10 @@ public void appConfigsShouldReturnUnrecognizedValues() {
             equalTo("user-supplied-value"));
     }
 
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnCurrentStreamTime() {

Review comment:
       This test seems not to be useful, as it tests the `TestProcessorContext` 
from below.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -494,7 +494,9 @@ public void process(final Record<Integer, Integer> record) {
 
         // e2e latency = 10
         task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
-        task.process(10L);
+        time = new MockTime(0L, 10L, 0L);

Review comment:
       We have already a global `MockTime` object in this test that we should 
reused instead of creating a new one. This way, we can avoid to add `setTime` 
and keep the `Time` member variable `final`. -- Or is there a reason why we 
cannot reuse the global `MockTime` object?
   
   Also, I don't really see any call to the newly added methods in this test, 
so why do we change this test method at all?

##########
File path: 
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##########
@@ -116,7 +117,7 @@ public InternalMockProcessorContext(final 
StreamsMetricsImpl streamsMetrics) {
         );
     }
 
-    public InternalMockProcessorContext(final File stateDir,
+    public  InternalMockProcessorContext(final File stateDir,

Review comment:
       nit: revert insert of additional space

##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -505,7 +506,7 @@ private void setupTask(final StreamsConfig streamsConfig,
                 streamsMetrics
             );
 
-            final InternalProcessorContext context = new ProcessorContextImpl(

Review comment:
       I don't think we need this change. The new test you added below, seem to 
re-test `ProcessorContextImpl` that is already tested in 
`ProcessorContextImplTest` and thus we don't need those.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to