Hi Michal,

yes, I know its beyond the scope of KIP-138, but from previous messages 
from Matthias I thought that he will create a new ticket, but it seems 
that instead he added it to KAFKA-3514. I will update that ticket with my 
thoughts.

Thanks,
Sini



From:   Michal Borowiecki <michal.borowie...@openbet.com>
To:     users@kafka.apache.org
Date:   2017/05/17 10:15
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Hi Sini, 

This is beyond the score of KIP-138 but 
https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such 
improvements

Thanks, 

Michal

On 17 May 2017 5:10 p.m., Peter Sinoros Szabo 
<peter.sinoros-sz...@hu.ibm.com> wrote:

Hi,

I see, now its clear why the repeated punctuations use the same time value 

in that case.

Do you have a JIRA ticket to track improvement ideas for that?

It would be great to have an option to:
- advance the stream time before calling the process() on a new record  - 
this would prevent to process a message in the wrong punctuation 
"segment".
- use fine grained advance of stream time for the "missed" punctuations  - 

this would ease the processing of burst messages after some silence. I do 
not see if KIP-138 may solve this or not.

Regards

-Sini



From:   "Matthias J. Sax" <matth...@confluent.io>
To:     users@kafka.apache.org
Date:   2017/05/12 19:19
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the 
same 
> timestamp 3 times, so in order to create my update for both three 
seconds, 
> I will have to count the number of punctuations and calculate the missed 


> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 


> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker <tobec...@tivo.com>
> To:     "users@kafka.apache.org" <users@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> ________________________________________
> From: Matthias J. Sax [matth...@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a 
record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 


> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and 
then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what 
is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-sz...@hu.ibm.com
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, 
copying, 
> or distribution of this email (or any attachments) by others is 
> prohibited. If you are not the intended recipient, please contact the 
> sender immediately and permanently delete this email and any 
attachments. 
> No employee or agent of TiVo Inc. is authorized to conclude any binding 
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
> Inc. may only be made by a signed written agreement.
> 
> 
> 
> 
> 

[attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 







Reply via email to