Stephane Maarek created KAFKA-6092:
--------------------------------------

             Summary: Time passed in punctuate call is currentTime, not 
punctuate schedule time. 
                 Key: KAFKA-6092
                 URL: https://issues.apache.org/jira/browse/KAFKA-6092
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.0
            Reporter: Stephane Maarek


The java doc specifies that for a Transformer, calling context.schedule calls 
punctuate every 1000ms. This is not entirely accurate, as if no data is 
received for a while, punctuate won't be called.

{code}
     *             void init(ProcessorContext context) {
     *                 this.context = context;
     *                 this.state = context.getStateStore("myTransformState");
     *                 context.schedule(1000); // call #punctuate() each 1000ms
     *             }
{code}

When you receive new data say after 20 seconds, punctuate will play catch up 
and will be called 20 times at reception of the new data. 

the signature of punctuate is
{code}
*             KeyValue punctuate(long timestamp) {
     *                 // can access this.state
     *                 // can emit as many new KeyValue pairs as required via 
this.context#forward()
     *                 return null; // don't return result -- can also be "new 
KeyValue()"
     *             }
{code}

but the timestamp being passed is currentTimestamp at the time of the call to 
punctuate, not at the time the punctuate was scheduled. It is very confusing 
and I think the timestamp should represent the one at which the punctuate 
should have been scheduled. Getting the current timestamp is not adding much 
information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to