Hi all,
As promised, here is my take at how one could implement the previously
discussed hybrid semantics using the 2 PunctuationType callbacks (one
for STREAM_TIME and one for SYSTEM_TIME).
However, there's a twist.
Since currently calling context.schedule() adds a new
PunctuationSchedule and does not overwrite the previous one, a slight
change would be required:
a) either that PuncuationSchedules are cancellable
b) or that calling schedule() ||overwrites(cancels) the previous one
with the given |PunctuationType |(but that's not how it works currently)
Below is an example assuming approach a) is implemented by having
schedule return Cancellable instead of void.
|ProcessorContext context;|
|long| |streamTimeInterval = ...;|
|long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound =
streamTimeInterval + some tolerance|
|Cancellable streamTimeSchedule;|
|Cancellable systemTimeSchedule;|
|long| |lastStreamTimePunctation = -||1||;|
| |
|public| |void| |init(ProcessorContext context){|
| ||this||.context = context;|
| ||streamTimeSchedule =
context.schedule(PunctuationType.STREAM_TIME,
streamTimeInterval, ||this||::streamTimePunctuate);|
| ||systemTimeSchedule =
context.schedule(PunctuationType.SYSTEM_TIME,
systemTimeUpperBound, ||this||::systemTimePunctuate); |
|}|
| |
|public| |void| |streamTimePunctuate(||long| |streamTime){|
| ||periodicBusiness(streamTime);|
| ||systemTimeSchedule.cancel();|
| ||systemTimeSchedule =
context.schedule(PunctuationType.SYSTEM_TIME,
systemTimeUpperBound, ||this||::systemTimePunctuate);|
|}|
| |
|public| |void| |systemTimePunctuate(||long| |systemTime){|
| ||periodicBusiness(context.timestamp());|
| ||streamTimeSchedule.cancel();|
| ||streamTimeSchedule =
context.schedule(PunctuationType.STREAM_TIME,
streamTimeInterval, ||this||::streamTimePunctuate);|
|}|
| |
|public| |void| |periodicBusiness(||long| |streamTime){|
| ||// guard against streamTime == -1, easy enough.|
| ||// if you need system time instead, just use
System.currentTimeMillis()|
| |
| ||// do something businessy here|
|}|
Where Cancellable is either an interface containing just a single void
cancel() method or also boolean isCancelled() like here
<http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
Please let your opinions known whether we should proceed in this
direction or leave "hybrid" considerations out of scope.
Looking forward to hearing your thoughts.
Thanks,
Michal
On 30/04/17 20:07, Michal Borowiecki wrote:
Hi Matthias,
I'd like to start moving the discarded ideas into Rejected
Alternatives section. Before I do, I want to tidy them up, ensure
they've each been given proper treatment.
To that end let me go back to one of your earlier comments about the
original suggestion (A) to put that to bed.
On 04/04/17 06:44, Matthias J. Sax wrote:
(A) You argue, that users can still "punctuate" on event-time via
process(), but I am not sure if this is possible. Note, that users only
get record timestamps via context.timestamp(). Thus, users would need to
track the time progress per partition (based on the partitions they
obverse via context.partition(). (This alone puts a huge burden on the
user by itself.) However, users are not notified at startup what
partitions are assigned, and user are not notified when partitions get
revoked. Because this information is not available, it's not possible to
"manually advance" stream-time, and thus event-time punctuation within
process() seems not to be possible -- or do you see a way to get it
done? And even if, it might still be too clumsy to use.
I might have missed something but I'm guessing your worry about users
having to track time progress /per partition/ comes from the what the
stream-time does currently.
But I'm not sure that those semantics of stream-time are ideal for
users of punctuate.
That is, if stream-time punctuate didn't exist and users had to use
process(), would they actually want to use the current semantics of
stream time?
As a reminder stream time, in all its glory, is (not exactly
actually, but when trying to be absolutely precise here I spotted
KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I
think this approximation suffices to illustrate the point):
a minimum across all input partitions of (
if(msgs never received by partition) -1;
else {
a non-descending-minimum of ( the per-batch minimum msg timestamp)
}
)
Would that really be clear enough to the users of punctuate? Do they
care for such a convoluted notion of time? I see how this can be
useful for StreamTask to pick the next partition to take a record
from but for punctuate?
If users had to implement punctuation with process(), is that what
they would have chosen as their notion of time?
I'd argue not.
None of the processors implementing the rich windowing/join
operations in the DSL use punctuate.
Let's take the KStreamKStreamJoinProcessor as an example, in it's
process() method it simply uses context().timestamp(), which, since
it's called from process, returns simply, per javadoc:
If it is triggered while processing a record streamed from the source
processor, timestamp is defined as the timestamp of the current input
record;
So they don't use that convoluted formula for stream-time. Instead,
they only care about the timestamp of the current record. I think
that having users track just that wouldn't be that much of a burden.
I don't think they need to care about which partitions got assigned
or not. And StreamTask would still be picking records first from the
partition having the lowest timestamp to try to "synchronize" the
streams as it does now.
What users would have to do in their Processor implementations is
somewhere along the lines of:
long lastPunctuationTime = 0;
long interval = <some-number>; //millis
@Override
public void process(K key, V value){
while (ctx.timestamp() >= lastPunctuationTime + interval){
punctuate(ctx.timestamp());
lastPunctuationTime += interval;// I'm not sure of the merit
of this vs lastPunctuationTime = ctx.timestamp(); but that's what
PunctuationQueue does currently
}
// do some other business logic here
}
Looking forward to your thoughts.
Cheers,
Michal
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600
+44 203 249 8448
E: michal.borowie...@openbet.com
W: www.openbet.com <http://www.openbet.com/>
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify
the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete
it from your system as well as any copies. The content of e-mails as
well as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print
this e-mail unless necessary. OpenBet Ltd. Registered Office:
Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
United Kingdom. A company registered in England and Wales. Registered
no. 3134634. VAT no. GB927523612
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600
+44 203 249 8448
E: michal.borowie...@openbet.com
W: www.openbet.com <http://www.openbet.com/>
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612