[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r587996458 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) { * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. * - * @param timestamp A record timestamp + * @param recordTimestamp A record timestamp */ @SuppressWarnings({"WeakerAccess", "unused"}) -public void setTimestamp(final long timestamp) { -this.timestamp = timestamp; +public void setRecordTimestamp(final long recordTimestamp) { +this.recordTimestamp = recordTimestamp; +} + +public void setCurrentSystemTimeMs(final long currentSystemTimeMs) { +this.currentSystemTimeMs = currentSystemTimeMs; +} + +public void setCurrentStreamTimeMs(final long currentStreamTimeMs) { +this.currentStreamTimeMs = currentStreamTimeMs; Review comment: @mjsax added the method setTimestamp back which sets record timestamp and added Deprecated annotation. Also updated the [kip](https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r587996458 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) { * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. * - * @param timestamp A record timestamp + * @param recordTimestamp A record timestamp */ @SuppressWarnings({"WeakerAccess", "unused"}) -public void setTimestamp(final long timestamp) { -this.timestamp = timestamp; +public void setRecordTimestamp(final long recordTimestamp) { +this.recordTimestamp = recordTimestamp; +} + +public void setCurrentSystemTimeMs(final long currentSystemTimeMs) { +this.currentSystemTimeMs = currentSystemTimeMs; +} + +public void setCurrentStreamTimeMs(final long currentStreamTimeMs) { +this.currentStreamTimeMs = currentStreamTimeMs; Review comment: @mjsax added the method setTimestamp back which sets record timestamp and added Deprecated annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r571532911 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax yes, thank you for the explanation. made changes as per review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax can you please explain again what is expected now as for me this is contrary to initial KIP. What I have understood from KIP, we wanted to return system time from Stream Task. Considering changes in this pr, 1. We added one time field in InternalMockProcessorContext which we return when currentSystemTime() is called. 2. When ProcessorContextImpl's currentSystemTime() method is called, we return time from streamTask's time field. 3. We also added time field in GlobalProcessorContextImpl which we return from currentSystemTime(). 4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Are earlier changes not valid to return time from StreamTask? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax can you please explain again what is expected now as for me this is contrary to initial KIP. What I have understood from KIP, we wanted to return system time from Stream Task. Considering changes in this pr, 1. We added one time field in InternalMockProcessorContext which we return when currentSystemTime() is called. 2. When ProcessorContextImpl's currentSystemTime() method is called, we return time from streamTask's time field. 3. We also added time field in GlobalProcessorContextImpl which we return from currentSystemTime(). 4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Are earlier changes not valid to return time from StreamTask? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r567356968 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax also I am confused about what time do we want to return when currentSystemTimeMs() is called from the ProcessorContext hierarchy. If it's called from AbstractProcessorContext object reference, should we return cachedSystemTimeMs, and if it's called from ProcessorContextImpl object reference, should we return streamTask.currentSystemTimeMs()? Or we want completely new method to return the cached time when ProcessorContextImpl#forwardInternal() is called? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add setSystemTimeMs back to [InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49) and set the system time like it was done before? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add setSystemTimeMs back to [InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r563247511 ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java ## @@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, final TaskId taskId super(config, taskId, stateDir); } -@Override -public void setSystemTimeMs(long timeMs) { Review comment: @mjsax should i try to do this in another pr? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r551055293 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -494,7 +494,9 @@ public void process(final Record record) { // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L))); -task.process(10L); +time = new MockTime(0L, 10L, 0L); Review comment: @mjsax The reason I had to change StreamTaskTest is because of following issue: StreamTask -> process(wallClockTime) method updates processor context's system time in this method: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L685 AbstractProcessorContect has another systemTime field: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java#L48 which is updated in streamTask's process method. I removed this field from AbstractProcessorContect as we want to fetch the time from StreamTask. But the latency is measured [here](https://github.com/apache/kafka/blob/b9dfc196aaee91f4fa23fe6563261c6f2d01ac33/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L284) which is streamTask's time filed now which is not updated. I couldn't find any other way to change time field in StreamTask unless I add setTime method in it. If i don't change time in StreamTask object, subsequent metric tests fail as stream-time is not updated. Do you have recommendation about how i can fix it without adding setTime method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r551055293 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -494,7 +494,9 @@ public void process(final Record record) { // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L))); -task.process(10L); +time = new MockTime(0L, 10L, 0L); Review comment: The reason I had to change StreamTaskTest is because of following issue: StreamTask -> process(wallClockTime) method updates processor context's system time in this method: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L685 AbstractProcessorContect has another systemTime field: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java#L48 which is updated in streamTask's process method. I removed this field from AbstractProcessorContect as we want to fetch the time from StreamTask. But the latency is measured [here](https://github.com/apache/kafka/blob/b9dfc196aaee91f4fa23fe6563261c6f2d01ac33/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L284) which is streamTask's time filed now which is not updated. I couldn't find any other way to change time field in StreamTask unless I add setTime method in it. If i don't change time in StreamTask object, subsequent metric tests fail as stream-time is not updated. Do you have recommendation about how i can fix it without adding setTime method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org