Re: Understanding concept of grace in JoinWindows

2020-02-23 Thread Sachin Mittal
Hi,
All this makes perfect sense now and I could not be more clearer on how
kafka and streams handle times.
So if we use event time semantics (with or without custom timestamp
extractor) getting out of order records is something expected and ones
stream topology design should take care of it.

Right now log append time works in our case because we don't have more than
one producer writing to a partition of a topic.
But yes once we need multiple I suppose we need to consider using event
time and take care of out of order records.

BTW do you ever plan to host summits in India (Mumbai), would be happy to
attend one.

Thanks again
Sachin


On Sun, Feb 23, 2020 at 5:35 PM Matthias J. Sax  wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> >> This really helped to understand that grace period takes care of
> >> out of order records rather than late arriving records.
>
> Well, the grace period defines if (or when) an out-of-order record is
> consider late. Of course, per definition of "late', those records are
> not processed but are dropped.
>
> Note, that a late record is a special case of an out-of-order record.
> If data is ordered, it cannot be late.
>
> About ordering guarantees: Brokers guarantee offset-order per
> partition. However, for out-of-order data we consider record
> timestamps, but not offsets.
>
> You are correct thought, that if you configure a topic to use
> "AppendTime", out-of-order records are not possible. However, using
> "AppendTime" to "avoid" out-of-order data is kind of a hack, as you
> loose actual event-time semantics (ie, if you want to join on
> event-time and you change your config to use "AppendTime" instead you
> modify your data and will get a different result -- note that the
> timestamp is a first class citizen with this regard and modifying it
> something you should be careful about).
>
> If a topic is configure with "CreateTime" (what is the default and in
> general the most useful configuration), than out-of-order records are
> possible:
>
> - - a single producer might "reorder" data if it retries sends
> internally (using `max.in.flight.request=1` or `idempotence=true`
> would guarantee order)
>
> - - an application can set an explicit timestamp for each record before
> writing it into a topic; hence, if the upstream producer application
> does send out-of-order data, it would land like this in the topic
>
> - - often multiple producer are writing into the same topic partition:
> for this case, writes are interleaved and thus out-of-order records
> are expected in general (note that his pattern applies to Kafka
> Streams in each repartition step, and thus, even if you input topic
> have ordered data, repartitioning introduced out-of-order records
> downstream).
>
> Hence, even with the default timestamp extractor you might see
> out-of-order data. Similar, for a custom timestamp extractor. In the
> end it does not really make a big difference if the timestamp is
> stored in the payload or in the record timestamp field: for both
> cases, it really depends on the upstream application that produces the
> data.
>
> Btw: I gave a talk about time semantics at Kafka Summit recently, so
> you might want to check out the recording (there will be a follow up
> talk at Kafka Summit London in April focusing on time semantics in
> Kafka Streams):
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-
> and-why
> 
>
>
> - -Matthias
>
> On 2/22/20 7:43 PM, Sachin Mittal wrote:
> > Hi, This really helped to understand that grace period takes care
> > of out of order records rather than late arriving records.
> >
> > I however have a question that why would a record arrive out of
> > order. Doesn't kafka guarantees the order. If we use default
> > timestamp extractor then it will use the embedded time stamp in the
> > record which would be: - event time if message time stamp type is
> > create time. or - ingestion time if the message time stamp type is
> > set as log append time.
> >
> > I guess is these two cases especially the second case when we use
> > log append time, out of order record will never happen. Please let
> > me know if my this understanding is correct. So in this case there
> > would be no point setting grace period.
> >
> > I suppose grace period makes sense when we use a custom timestamp
> > extractor where timestamp is extracted based on record's payload,
> > in this case there are chances that records are processed out of
> > order.
> >
> > Please confirm this.
> >
> > Thanks Sachin
> >
> >
> >
> >
> >
> >
> > On Sat, Feb 22, 2020 at 5:05 PM Matthias J. Sax 
> > wrote:
> >
> >> Sachin,
> >>
> >> "late" data is data that arrives after the grace period and is
> >> not processed but dropped for this reason. What you mean is
> >> "out-of-order data" for which you can use the grace period to
> >> process it -- increasing the window size would be a semanti

Re: Understanding concept of grace in JoinWindows

2020-02-23 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

>> This really helped to understand that grace period takes care of
>> out of order records rather than late arriving records.

Well, the grace period defines if (or when) an out-of-order record is
consider late. Of course, per definition of "late', those records are
not processed but are dropped.

Note, that a late record is a special case of an out-of-order record.
If data is ordered, it cannot be late.

About ordering guarantees: Brokers guarantee offset-order per
partition. However, for out-of-order data we consider record
timestamps, but not offsets.

You are correct thought, that if you configure a topic to use
"AppendTime", out-of-order records are not possible. However, using
"AppendTime" to "avoid" out-of-order data is kind of a hack, as you
loose actual event-time semantics (ie, if you want to join on
event-time and you change your config to use "AppendTime" instead you
modify your data and will get a different result -- note that the
timestamp is a first class citizen with this regard and modifying it
something you should be careful about).

If a topic is configure with "CreateTime" (what is the default and in
general the most useful configuration), than out-of-order records are
possible:

- - a single producer might "reorder" data if it retries sends
internally (using `max.in.flight.request=1` or `idempotence=true`
would guarantee order)

- - an application can set an explicit timestamp for each record before
writing it into a topic; hence, if the upstream producer application
does send out-of-order data, it would land like this in the topic

- - often multiple producer are writing into the same topic partition:
for this case, writes are interleaved and thus out-of-order records
are expected in general (note that his pattern applies to Kafka
Streams in each repartition step, and thus, even if you input topic
have ordered data, repartitioning introduced out-of-order records
downstream).

Hence, even with the default timestamp extractor you might see
out-of-order data. Similar, for a custom timestamp extractor. In the
end it does not really make a big difference if the timestamp is
stored in the payload or in the record timestamp field: for both
cases, it really depends on the upstream application that produces the
data.

Btw: I gave a talk about time semantics at Kafka Summit recently, so
you might want to check out the recording (there will be a follow up
talk at Kafka Summit London in April focusing on time semantics in
Kafka Streams):

https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-
and-why


- -Matthias

On 2/22/20 7:43 PM, Sachin Mittal wrote:
> Hi, This really helped to understand that grace period takes care
> of out of order records rather than late arriving records.
>
> I however have a question that why would a record arrive out of
> order. Doesn't kafka guarantees the order. If we use default
> timestamp extractor then it will use the embedded time stamp in the
> record which would be: - event time if message time stamp type is
> create time. or - ingestion time if the message time stamp type is
> set as log append time.
>
> I guess is these two cases especially the second case when we use
> log append time, out of order record will never happen. Please let
> me know if my this understanding is correct. So in this case there
> would be no point setting grace period.
>
> I suppose grace period makes sense when we use a custom timestamp
> extractor where timestamp is extracted based on record's payload,
> in this case there are chances that records are processed out of
> order.
>
> Please confirm this.
>
> Thanks Sachin
>
>
>
>
>
>
> On Sat, Feb 22, 2020 at 5:05 PM Matthias J. Sax 
> wrote:
>
>> Sachin,
>>
>> "late" data is data that arrives after the grace period and is
>> not processed but dropped for this reason. What you mean is
>> "out-of-order data" for which you can use the grace period to
>> process it -- increasing the window size would be a semantic
>> change, while increasing the grace period allows you get the same
>> result for ordered and unordered input.
>>
>> Let's look at an example with a join-window of 5 seconds
>> 
>>
>> Stream1:Stream2: 
>>  
>>
>> With a grace period of zero, the computation and result would be
>> as follows:
>>
>> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
>> -> result  s1 -> k2 (insert into store + remove k1
>> from store because window size is only 5 and grace period is
>> zero) s1 -> k1 (insert into store + remove k2 from store because
>> window size is only 5 and grace period is zero) s2 -> k2 (insert
>> into store -> no result because k2 from s1 was already removed)
>> s2 -> k1 (out-of-order record that is also late, drop on the
>> floor).
>>
>> Note that the last record from s2 should actually be in the
>> result and if it would not have been out-or-order it would have
>> joined with the first record from s1.
>>
>> If we increase the grace per

Re: Understanding concept of grace in JoinWindows

2020-02-22 Thread Sachin Mittal
Hi,
This really helped to understand that grace period takes care of out of
order records rather than late arriving records.

I however have a question that why would a record arrive out of order.
Doesn't kafka guarantees the order.
If we use default timestamp extractor then it will use the embedded time
stamp in the record which would be:
- event time if message time stamp type is create time.
or
- ingestion time if the message time stamp type is set as log append time.

I guess is these two cases especially the second case when we use log
append time, out of order record will never happen.
Please let me know if my this understanding is correct.
So in this case there would be no point setting grace period.

I suppose grace period makes sense when we use a custom timestamp extractor
where timestamp is extracted based on record's payload,
in this case there are chances that records are processed out of order.

Please confirm this.

Thanks
Sachin






On Sat, Feb 22, 2020 at 5:05 PM Matthias J. Sax  wrote:

> Sachin,
>
> "late" data is data that arrives after the grace period and is not
> processed but dropped for this reason. What you mean is "out-of-order
> data" for which you can use the grace period to process it -- increasing
> the window size would be a semantic change, while increasing the grace
> period allows you get the same result for ordered and unordered input.
>
> Let's look at an example with a join-window of 5 seconds
> 
>
> Stream1:   
> Stream2:   
>
> With a grace period of zero, the computation and result would be as
> follows:
>
> s1 -> k1 (insert into store)
> s2 -> k1 (insert into store + join)
> -> result 
> s1 -> k2 (insert into store + remove k1 from store because window size
> is only 5 and grace period is zero)
> s1 -> k1 (insert into store + remove k2 from store because window size
> is only 5 and grace period is zero)
> s2 -> k2 (insert into store -> no result because k2 from s1 was already
> removed)
> s2 -> k1 (out-of-order record that is also late, drop on the floor).
>
> Note that the last record from s2 should actually be in the result and
> if it would not have been out-or-order it would have joined with the
> first record from s1.
>
> If we increase the grace period (note that default grace period is 24h)
> to for example to 50, we would get the following:
>
> s1 -> k1 (insert into store)
> s2 -> k1 (insert into store + join)
> -> result 
> s1 -> k2 (insert into store)
> s1 -> k1 (insert into store -- does not join because window is only 5)
> s2 -> k2 (insert into store -- does not join because window is only 5)
> s2 -> k1 (out-of-order record, outside of the window but processed
> normally because it's within the grace period: insert into store + join)
> -> result 
>
> This result is semantically "the same" as the result above -- if is
> different though as we allow to process out-of-order data. The missing
> join result from the last record of s2 and the first record of s1 is now
> in the result as desired.
>
> On the other hand, if we increase the window size to 50, we get a
> semantically different result:
>
> s1 -> k1 (insert into store)
> s2 -> k1 (insert into store + join)
> -> result 
> s1 -> k2 (insert into store)
> s1 -> k1 (insert into store + join)
> -> result 
> s2 -> k2 (insert into store + join)
> -> result 
> s2 -> k1 (out-of-order record, within the window: insert into store + join)
> -> 2 result2 , 
>
> Because we changes the window size, we get 5 result records instead of 2
> (or 1) as in the first two examples.
>
> Does this make sense?
>
>
> -Matthias
>
>
> On 2/21/20 7:35 PM, Sachin Mittal wrote:
> > Hi,
> > Reading the kafka docs I see that grace period is defined as:
> > the time to admit late-arriving events after the end of the window
> >
> > I however have not understood as when to use it?
> >
> > If I see that some records are arriving after the end of the window and
> > hence not included in the join, should I not simply increase the window
> > size to accommodate that.
> >
> > When do I really need to use grace and not alter the window size.
> >
> > Thanks
> > Sachin
> >
>
>


Re: Understanding concept of grace in JoinWindows

2020-02-22 Thread Matthias J. Sax
Sachin,

"late" data is data that arrives after the grace period and is not
processed but dropped for this reason. What you mean is "out-of-order
data" for which you can use the grace period to process it -- increasing
the window size would be a semantic change, while increasing the grace
period allows you get the same result for ordered and unordered input.

Let's look at an example with a join-window of 5 seconds


Stream1:   
Stream2:   

With a grace period of zero, the computation and result would be as follows:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result 
s1 -> k2 (insert into store + remove k1 from store because window size
is only 5 and grace period is zero)
s1 -> k1 (insert into store + remove k2 from store because window size
is only 5 and grace period is zero)
s2 -> k2 (insert into store -> no result because k2 from s1 was already
removed)
s2 -> k1 (out-of-order record that is also late, drop on the floor).

Note that the last record from s2 should actually be in the result and
if it would not have been out-or-order it would have joined with the
first record from s1.

If we increase the grace period (note that default grace period is 24h)
to for example to 50, we would get the following:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result 
s1 -> k2 (insert into store)
s1 -> k1 (insert into store -- does not join because window is only 5)
s2 -> k2 (insert into store -- does not join because window is only 5)
s2 -> k1 (out-of-order record, outside of the window but processed
normally because it's within the grace period: insert into store + join)
-> result 

This result is semantically "the same" as the result above -- if is
different though as we allow to process out-of-order data. The missing
join result from the last record of s2 and the first record of s1 is now
in the result as desired.

On the other hand, if we increase the window size to 50, we get a
semantically different result:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result 
s1 -> k2 (insert into store)
s1 -> k1 (insert into store + join)
-> result 
s2 -> k2 (insert into store + join)
-> result 
s2 -> k1 (out-of-order record, within the window: insert into store + join)
-> 2 result2 , 

Because we changes the window size, we get 5 result records instead of 2
(or 1) as in the first two examples.

Does this make sense?


-Matthias


On 2/21/20 7:35 PM, Sachin Mittal wrote:
> Hi,
> Reading the kafka docs I see that grace period is defined as:
> the time to admit late-arriving events after the end of the window
> 
> I however have not understood as when to use it?
> 
> If I see that some records are arriving after the end of the window and
> hence not included in the join, should I not simply increase the window
> size to accommodate that.
> 
> When do I really need to use grace and not alter the window size.
> 
> Thanks
> Sachin
> 



signature.asc
Description: OpenPGP digital signature


Understanding concept of grace in JoinWindows

2020-02-21 Thread Sachin Mittal
Hi,
Reading the kafka docs I see that grace period is defined as:
the time to admit late-arriving events after the end of the window

I however have not understood as when to use it?

If I see that some records are arriving after the end of the window and
hence not included in the join, should I not simply increase the window
size to accommodate that.

When do I really need to use grace and not alter the window size.

Thanks
Sachin