mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ########## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map<String, Object> appConfigsWithPrefix(final String prefix); + /** + * Returns current cached wall-clock system timestamp in milliseconds. Review comment: nit: `Return` without `s` -- we use imperative to write JavaDocs. I would remove `cached` and add a dedicated second sentence about it. Also `wall-clock time` and `system time` are synonymous and thus `wall-clock system time` sounds a little odd. What about: ``` Return the current system timestamp (also called wall-clock time) in milliseconds. <p> Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. Thus, it may return a different value compared to `System.currentTimeMillis()`. <p> For a global processor, Kafka Streams does not cache system time and thus calling this method will return the same value as `System.currentTimeMillis()`. @return the current system timestamp in milliseconds ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java ########## @@ -166,4 +166,14 @@ public long timestamp() { public Map<String, Object> appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + + @Override + public long currentSystemTimeMs() { + throw new UnsupportedOperationException("this method is not supported in ForwardingDisabledProcessor context"); Review comment: Why do we through here? It seems to be safe to get the time from `delegate` object? We only disable `forward` but nothing else. (same below) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ########## @@ -115,6 +116,16 @@ public void commit() { //no-op } + @Override + public long currentSystemTimeMs() { + return Time.SYSTEM.milliseconds(); + } + + @Override + public long currentStreamTimeMs() { + throw new UnsupportedOperationException("this method is not supported in global processor context."); Review comment: This makes sense, but we might want to document this somewhere else, ie, in the corresponding docs. To be fair, not sure atm, if we have much content about it and/or where it add it? We should for sure document it in the JavaDocs, ie, the `ProcessorContext` interface. (cf my comment above) About the error message: ``` throw new UnsupportedOperationException("There is no concept of stream-time for a global processor."); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ########## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map<String, Object> appConfigsWithPrefix(final String prefix); + /** + * Returns current cached wall-clock system timestamp in milliseconds. + * + * @return the current cached wall-clock system timestamp in milliseconds + */ + long currentSystemTimeMs(); + + /** + * Returns the maximum timestamp of any record yet processed by the task. Review comment: What about: ``` Return the current stream-time in milliseconds. <p> Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far (including the currently processed record), i.e., it can be considered a high-watermark. Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. <p> Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore(...)} and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept of stream-time for this case. Calling this method in a global processor with result in an {@link UnsupportedOperationException}. @return the current stream-time in milliseconds ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java ########## @@ -160,4 +160,14 @@ public long timestamp() { public Map<String, Object> appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + + @Override + public long currentSystemTimeMs() { + throw new UnsupportedOperationException("this method is not supported in StoreToProcessorContextAdapter"); Review comment: We should align the error message to the ones from above: `"StateStores can't access system time."` (same below). ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -1111,13 +1111,21 @@ RecordCollector recordCollector() { return recordCollector; } + public long streamTime() { Review comment: Why is this `public` now? It seems it's only newly called in `ProcessorContextImpl` that is in the same package and thus it can stay (default) package-private. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -70,7 +70,8 @@ // visible for testing static final byte LATEST_MAGIC_BYTE = 1; - private final Time time; + // This time is wall clock time Review comment: Yes, `Time` is always wall-clock time -- comment seems unnecessary ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -1111,13 +1111,21 @@ RecordCollector recordCollector() { return recordCollector; } + public long streamTime() { + return partitionGroup.streamTime(); + } + + public long currentSystemTimeMs() { + return time.milliseconds(); + } + // below are visible for testing only int numBuffered() { return partitionGroup.numBuffered(); } - long streamTime() { - return partitionGroup.streamTime(); + void setTime(final Time time) { Review comment: I don't think we need this method (cf. my other comment in the test code where it is used). ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java ########## @@ -555,6 +559,12 @@ public void shouldThrowUnsupportedOperationExceptionOnRecordContext() { ); } + @Test + public void shouldMatchSystemAndStreamTime() { Review comment: This should be two tests, one for each method. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -70,7 +70,8 @@ // visible for testing static final byte LATEST_MAGIC_BYTE = 1; - private final Time time; + // This time is wall clock time + private Time time; Review comment: I think we should keep this as `final` (compare my other comments). ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapterTest.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class StoreToProcessorContextAdapterTest { Review comment: Thanks for adding a test!!! Would you mind to add tests for all other methods of `StoreToProcessorContextAdapter`, too, to get full test coverage as a side improvement? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -494,7 +494,9 @@ public void process(final Record<Integer, Integer> record) { // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L))); - task.process(10L); + time = new MockTime(0L, 10L, 0L); + task.setTime(time); + task.process(time.milliseconds()); Review comment: Why do we not keep `task.process(10L);` ? Similar below. ########## File path: streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java ########## @@ -369,6 +370,16 @@ public long timestamp() { return recordContext.timestamp(); } + @Override + public long currentSystemTimeMs() { + return Time.SYSTEM.milliseconds(); Review comment: We should pass in a `Time` parameter into the constructor of `InternalMockProcessorContext` and return `time.milliseconds()` here so we can mock the time. If this method is unused, it's also ok to just throw an exception for now and we add mocking capabilities later when needed. (Same below.) ########## File path: streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java ########## @@ -120,6 +120,16 @@ public Cancellable schedule(final Duration interval, @Override public void commit() {} + @Override + public long currentSystemTimeMs() { + throw new UnsupportedOperationException("this method is not supported in NoOpProcessorContext"); Review comment: Nit: I guess we could add support if needed. Maybe better say `"Not implemented yet."` ? (Same below.) ########## File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java ########## @@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, final TaskId taskId super(config, taskId, stateDir); } - @Override - public void setSystemTimeMs(long timeMs) { Review comment: The purpose of this class seems to be to allow mocking the time -- given that we add proper time support now, I am wondering if we can actually delete the whole class? ########## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ########## @@ -261,6 +261,16 @@ public TaskId taskId() { return config.originalsWithPrefix(prefix); } + @Override + public long currentSystemTimeMs() { + return timestamp; Review comment: `timestamp` is the current record timestamp and thus should not be returned here (same below). I seems, we need to actually extend this test-util class, what is a small change to the KIP. Note, that test-utils are public API and thus those changes also need a KIP. You don't need to worry about your KIP, we can just update it accordingly. Seems, we need to have two more internal fields, one for each time. The public API change would be to add corresponding setters. We might also rename `setTimestamp` to `setRecordTimestamp` for clarity, we we would have three setters for record, system, and stream time. ########## File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java ########## @@ -1878,4 +1877,33 @@ public void shouldRespectTaskIdling() { ); } } + + @Test + public void shouldMatchSystemTime() { Review comment: I think we can remove those two new tests. If you really want to test it, you should rather add a `Processor` to the Topology that gets the current system and stream time and verify it this way, ie, we should rather add an integration test than a unit test. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ########## @@ -115,6 +116,16 @@ public void commit() { //no-op } + @Override + public long currentSystemTimeMs() { + return Time.SYSTEM.milliseconds(); Review comment: We should pass in a `Time` reference to the `GlobalProcessorContextImpl` and call `time.milliseconds()` here -- otherwise, we cannot mock the time (cf. `TopologyTestDriver`) for testing ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -1111,13 +1111,21 @@ RecordCollector recordCollector() { return recordCollector; } + public long streamTime() { + return partitionGroup.streamTime(); + } + + public long currentSystemTimeMs() { Review comment: As above: seems this can be package-private? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java ########## @@ -179,6 +179,10 @@ public void appConfigsShouldReturnUnrecognizedValues() { equalTo("user-supplied-value")); } + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnCurrentStreamTime() { Review comment: This test seems not to be useful, as it tests the `TestProcessorContext` from below. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -494,7 +494,9 @@ public void process(final Record<Integer, Integer> record) { // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L))); - task.process(10L); + time = new MockTime(0L, 10L, 0L); Review comment: We have already a global `MockTime` object in this test that we should reused instead of creating a new one. This way, we can avoid to add `setTime` and keep the `Time` member variable `final`. -- Or is there a reason why we cannot reuse the global `MockTime` object? Also, I don't really see any call to the newly added methods in this test, so why do we change this test method at all? ########## File path: streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java ########## @@ -116,7 +117,7 @@ public InternalMockProcessorContext(final StreamsMetricsImpl streamsMetrics) { ); } - public InternalMockProcessorContext(final File stateDir, + public InternalMockProcessorContext(final File stateDir, Review comment: nit: revert insert of additional space ########## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ########## @@ -505,7 +506,7 @@ private void setupTask(final StreamsConfig streamsConfig, streamsMetrics ); - final InternalProcessorContext context = new ProcessorContextImpl( Review comment: I don't think we need this change. The new test you added below, seem to re-test `ProcessorContextImpl` that is already tested in `ProcessorContextImplTest` and thus we don't need those. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org