[ 
https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136432#comment-17136432
 ] 

Piotr Smolinski commented on KAFKA-10062:
-----------------------------------------

The motivation is to use the same source of the wall-clock time in execution 
environment and in the tests.

Imagine you have a case when you have to execute an action when message arrives 
based on the current time and then potentially some time later. Right now when 
the message arrives you have access to the message time via timestamp() method 
from the ProcessorContext:

[https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/ProcessorContext.html]

Next you can schedule a punctuator and tell which time should be used, 
wall-clock or stream. The problem is that these three times have different 
semantics. timestamp() just tells you what is message timestamp. Stream time is 
whatever the system thinks is the time based on timestamp(), but in theory it 
should only advance. For example if your timestamps in the topic are mixed (set 
as producer time), the time may jump forth and back. Wall-clock time is also 
ever-advancing time; well, NTP may put it back due to drift, but we should 
expect that this time also only advances.

Now, if you want to implement a solution that uses wall-clock and does actions 
based on it, currently you may follow the short path and just read 
System.currentTimeMillis(). The problem is that in tests you have no control on 
it. In punctuate() call the time comes from the time implementation. 

[https://github.com/apache/kafka/blob/2.5.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L75]

ProcessorContextImpl has already the reference path to this object, but it is 
private:

[https://github.com/apache/kafka/blob/2.5.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L50]

Therefore if you have a reference to ProcessorContext, which is unwrapped 
ProcessorContextImpl, you have the necessary path:

context->ProcessorContextImpl#task->StreamTask#time

The problem is that these fields are private.

That's about system time.

StreamTask has two punctuation queues. One for system and one for stream time. 
The stream time comes from PartitionGroup:

[https://github.com/apache/kafka/blob/2.5.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L841]

The time is advanced before processing a new record that has time higher than 
the currently known streamTime:

[https://github.com/apache/kafka/blob/2.5.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L136]

Therefore the time available as ProcessorContext#timestamp may be earlier than 
the streamTime if the records are not processed in the time order.

To get the streamTime follow the path:

context->ProcessorContextImpl#task->StreamTask#streamTime()

The StreamTask#streamTime() is package private.

My expectation is that we have methods to access these times as public API in 
ProcessorContext and implementation in ProcessorContextImpl:

public long currentSystemTimeMs() {

    return task.systemTime();

}

public long currentStreamTimeMs() {

    return task.streamTime();

}

In GlobalProcessorContext the methods may stay unsupported, esp. because 
GlobalProcessorContext does not support schedule.

> Add a method to retrieve the current timestamp as known by the Streams app
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-10062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10062
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Piotr Smolinski
>            Assignee: William Bottrell
>            Priority: Major
>              Labels: needs-kip, newbie
>
> Please add to the ProcessorContext a method to retrieve current timestamp 
> compatible with Punctuator#punctate(long) method.
> Proposal in ProcessorContext:
> long getTimestamp(PunctuationType type);
> The method should return time value as known by the Punctuator scheduler with 
> the respective PunctuationType.
> The use-case is tracking of a process with timeout-based escalation.
> A transformer receives process events and in case of missing an event execute 
> an action (emit message) after given escalation timeout (several stages). The 
> initial message may already arrive with reference timestamp in the past and 
> may trigger different action upon arrival depending on how far in the past it 
> is.
> If the timeout should be computed against some further time only, Punctuator 
> is perfectly sufficient. The problem is that I have to evaluate the current 
> time-related state once the message arrives.
> I am using wall-clock time. Normally accessing System.currentTimeMillis() is 
> sufficient, but it breaks in unit testing with TopologyTestDriver, where the 
> app wall clock time is different from the system-wide one.
> To access the mentioned clock I am using reflection to access 
> ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to