[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-03-04 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r587996458



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) {
  * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
  * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
  *
- * @param timestamp A record timestamp
+ * @param recordTimestamp A record timestamp
  */
 @SuppressWarnings({"WeakerAccess", "unused"})
-public void setTimestamp(final long timestamp) {
-this.timestamp = timestamp;
+public void setRecordTimestamp(final long recordTimestamp) {
+this.recordTimestamp = recordTimestamp;
+}
+
+public void setCurrentSystemTimeMs(final long currentSystemTimeMs) {
+this.currentSystemTimeMs = currentSystemTimeMs;
+}
+
+public void setCurrentStreamTimeMs(final long currentStreamTimeMs) {
+this.currentStreamTimeMs = currentStreamTimeMs;

Review comment:
   @mjsax added the method setTimestamp back which sets record timestamp 
and added Deprecated annotation. Also updated the 
[kip](https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-03-04 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r587996458



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -353,11 +371,19 @@ public void setHeaders(final Headers headers) {
  * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
  * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
  *
- * @param timestamp A record timestamp
+ * @param recordTimestamp A record timestamp
  */
 @SuppressWarnings({"WeakerAccess", "unused"})
-public void setTimestamp(final long timestamp) {
-this.timestamp = timestamp;
+public void setRecordTimestamp(final long recordTimestamp) {
+this.recordTimestamp = recordTimestamp;
+}
+
+public void setCurrentSystemTimeMs(final long currentSystemTimeMs) {
+this.currentSystemTimeMs = currentSystemTimeMs;
+}
+
+public void setCurrentStreamTimeMs(final long currentStreamTimeMs) {
+this.currentStreamTimeMs = currentStreamTimeMs;

Review comment:
   @mjsax added the method setTimestamp back which sets record timestamp 
and added Deprecated annotation. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-06 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r571532911



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax yes, thank you for the explanation. made changes as per review. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-05 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax can you please explain again what is expected now as for me this 
is contrary to initial KIP. What I have understood from KIP, we wanted to 
return system time from Stream Task. Considering changes in this pr, 
   
   1. We added one time field in InternalMockProcessorContext which we return 
when currentSystemTime() is called. 
   2. When ProcessorContextImpl's currentSystemTime() method is called, we 
return time from streamTask's time field. 
   3. We also added time field in GlobalProcessorContextImpl which we return 
from currentSystemTime().
   4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field? Are earlier changes not valid to return time 
from StreamTask?






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax can you please explain again what is expected now as for me this 
is contrary to initial KIP. What I have understood from KIP, we wanted to 
return system time from Stream Task. Considering changes in this pr, 
   
   1. We added one time field in InternalMockProcessorContext which we return 
when currentSystemTime() is called. 
   2. When ProcessorContextImpl's currentSystemTime() method is called, we 
return time from streamTask's time field. 
   3. We also added time field in GlobalProcessorContextImpl which we return 
from currentSystemTime().
   4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field? Are earlier changes not valid to return time 
from StreamTask?






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-30 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r567356968



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax also I am confused about what time do we want to return when 
currentSystemTimeMs() is called from the ProcessorContext hierarchy. If it's 
called from AbstractProcessorContext object reference, should we return 
cachedSystemTimeMs, and if it's called from ProcessorContextImpl object 
reference, should we return streamTask.currentSystemTimeMs()? Or we want 
completely new method to return the cached time when 
ProcessorContextImpl#forwardInternal() is called?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add 
setSystemTimeMs back to 
[InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)
 and set the system time like it was done before? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add 
setSystemTimeMs back to 
[InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)?
 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs)? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-23 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r563247511



##
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##
@@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, 
final TaskId taskId
 super(config, taskId, stateDir);
 }
 
-@Override
-public void setSystemTimeMs(long timeMs) {

Review comment:
   @mjsax should i try to do this in another pr? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-03 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r551055293



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -494,7 +494,9 @@ public void process(final Record record) {
 
 // e2e latency = 10
 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
-task.process(10L);
+time = new MockTime(0L, 10L, 0L);

Review comment:
   @mjsax The reason I had to change StreamTaskTest is because of following 
issue:
   StreamTask -> process(wallClockTime) method updates processor context's 
system time in this method: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L685
   AbstractProcessorContect has another systemTime field: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java#L48
 which is updated in streamTask's process method. I removed this field from 
AbstractProcessorContect as we want to fetch the time from StreamTask. But the 
latency is measured 
[here](https://github.com/apache/kafka/blob/b9dfc196aaee91f4fa23fe6563261c6f2d01ac33/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L284)
 which is streamTask's time filed now which is not updated.
   I couldn't find any other way to change time field in StreamTask unless I 
add setTime method in it. If i don't change time in StreamTask object, 
subsequent metric tests fail as stream-time is not updated. 
   Do you have recommendation about how i can fix it without adding setTime 
method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-03 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r551055293



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -494,7 +494,9 @@ public void process(final Record record) {
 
 // e2e latency = 10
 task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
-task.process(10L);
+time = new MockTime(0L, 10L, 0L);

Review comment:
   The reason I had to change StreamTaskTest is because of following issue:
   StreamTask -> process(wallClockTime) method updates processor context's 
system time in this method: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L685
   AbstractProcessorContect has another systemTime field: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java#L48
 which is updated in streamTask's process method. I removed this field from 
AbstractProcessorContect as we want to fetch the time from StreamTask. But the 
latency is measured 
[here](https://github.com/apache/kafka/blob/b9dfc196aaee91f4fa23fe6563261c6f2d01ac33/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L284)
 which is streamTask's time filed now which is not updated.
   I couldn't find any other way to change time field in StreamTask unless I 
add setTime method in it. If i don't change time in StreamTask object, 
subsequent metric tests fail as stream-time is not updated. 
   Do you have recommendation about how i can fix it without adding setTime 
method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org