-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

>> This really helped to understand that grace period takes care of
>> out of order records rather than late arriving records.

Well, the grace period defines if (or when) an out-of-order record is
consider late. Of course, per definition of "late', those records are
not processed but are dropped.

Note, that a late record is a special case of an out-of-order record.
If data is ordered, it cannot be late.

About ordering guarantees: Brokers guarantee offset-order per
partition. However, for out-of-order data we consider record
timestamps, but not offsets.

You are correct thought, that if you configure a topic to use
"AppendTime", out-of-order records are not possible. However, using
"AppendTime" to "avoid" out-of-order data is kind of a hack, as you
loose actual event-time semantics (ie, if you want to join on
event-time and you change your config to use "AppendTime" instead you
modify your data and will get a different result -- note that the
timestamp is a first class citizen with this regard and modifying it
something you should be careful about).

If a topic is configure with "CreateTime" (what is the default and in
general the most useful configuration), than out-of-order records are
possible:

- - a single producer might "reorder" data if it retries sends
internally (using `max.in.flight.request=1` or `idempotence=true`
would guarantee order)

- - an application can set an explicit timestamp for each record before
writing it into a topic; hence, if the upstream producer application
does send out-of-order data, it would land like this in the topic

- - often multiple producer are writing into the same topic partition:
for this case, writes are interleaved and thus out-of-order records
are expected in general (note that his pattern applies to Kafka
Streams in each repartition step, and thus, even if you input topic
have ordered data, repartitioning introduced out-of-order records
downstream).

Hence, even with the default timestamp extractor you might see
out-of-order data. Similar, for a custom timestamp extractor. In the
end it does not really make a big difference if the timestamp is
stored in the payload or in the record timestamp field: for both
cases, it really depends on the upstream application that produces the
data.

Btw: I gave a talk about time semantics at Kafka Summit recently, so
you might want to check out the recording (there will be a follow up
talk at Kafka Summit London in April focusing on time semantics in
Kafka Streams):

https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-
and-why


- -Matthias

On 2/22/20 7:43 PM, Sachin Mittal wrote:
> Hi, This really helped to understand that grace period takes care
> of out of order records rather than late arriving records.
>
> I however have a question that why would a record arrive out of
> order. Doesn't kafka guarantees the order. If we use default
> timestamp extractor then it will use the embedded time stamp in the
> record which would be: - event time if message time stamp type is
> create time. or - ingestion time if the message time stamp type is
> set as log append time.
>
> I guess is these two cases especially the second case when we use
> log append time, out of order record will never happen. Please let
> me know if my this understanding is correct. So in this case there
> would be no point setting grace period.
>
> I suppose grace period makes sense when we use a custom timestamp
> extractor where timestamp is extracted based on record's payload,
> in this case there are chances that records are processed out of
> order.
>
> Please confirm this.
>
> Thanks Sachin
>
>
>
>
>
>
> On Sat, Feb 22, 2020 at 5:05 PM Matthias J. Sax <mj...@apache.org>
> wrote:
>
>> Sachin,
>>
>> "late" data is data that arrives after the grace period and is
>> not processed but dropped for this reason. What you mean is
>> "out-of-order data" for which you can use the grace period to
>> process it -- increasing the window size would be a semantic
>> change, while increasing the grace period allows you get the same
>> result for ordered and unordered input.
>>
>> Let's look at an example with a join-window of 5 seconds
>> <key,value,timestamp>
>>
>> Stream1: <k1,v1,10> <k2,v2,20> <k1,v3,26> Stream2: <k1,w1,12>
>> <k2,w2,30> <k1,w3,13>
>>
>> With a grace period of zero, the computation and result would be
>> as follows:
>>
>> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
>> -> result <k1,v1+w1,12> s1 -> k2 (insert into store + remove k1
>> from store because window size is only 5 and grace period is
>> zero) s1 -> k1 (insert into store + remove k2 from store because
>> window size is only 5 and grace period is zero) s2 -> k2 (insert
>> into store -> no result because k2 from s1 was already removed)
>> s2 -> k1 (out-of-order record that is also late, drop on the
>> floor).
>>
>> Note that the last record from s2 should actually be in the
>> result and if it would not have been out-or-order it would have
>> joined with the first record from s1.
>>
>> If we increase the grace period (note that default grace period
>> is 24h) to for example to 50, we would get the following:
>>
>> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
>> -> result <k1,v1+w1,12> s1 -> k2 (insert into store) s1 -> k1
>> (insert into store -- does not join because window is only 5) s2
>> -> k2 (insert into store -- does not join because window is only
>> 5) s2 -> k1 (out-of-order record, outside of the window but
>> processed normally because it's within the grace period: insert
>> into store + join) -> result <k1,v1+w3,13>
>>
>> This result is semantically "the same" as the result above -- if
>> is different though as we allow to process out-of-order data. The
>> missing join result from the last record of s2 and the first
>> record of s1 is now in the result as desired.
>>
>> On the other hand, if we increase the window size to 50, we get
>> a semantically different result:
>>
>> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
>> -> result <k1,v1+w1,12> s1 -> k2 (insert into store) s1 -> k1
>> (insert into store + join) -> result <k1,v3+w1,26> s2 -> k2
>> (insert into store + join) -> result <k2,v2+w2,30> s2 -> k1
>> (out-of-order record, within the window: insert into store +
>> join) -> 2 result2 <k1,v1+w3,13>, <k1,v3+w3,26>
>>
>> Because we changes the window size, we get 5 result records
>> instead of 2 (or 1) as in the first two examples.
>>
>> Does this make sense?
>>
>>
>> -Matthias
>>
>>
>> On 2/21/20 7:35 PM, Sachin Mittal wrote:
>>> Hi, Reading the kafka docs I see that grace period is defined
>>> as: the time to admit late-arriving events after the end of the
>>> window
>>>
>>> I however have not understood as when to use it?
>>>
>>> If I see that some records are arriving after the end of the
>>> window and hence not included in the join, should I not simply
>>> increase the window size to accommodate that.
>>>
>>> When do I really need to use grace and not alter the window
>>> size.
>>>
>>> Thanks Sachin
>>>
>>
>>
>
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Sal0ACgkQO4miYXKq
/Oh6zxAAng5mModZjyI+TGuZibI1kSdfSwGjmOWnMMdyw1XLCp64G+7x4ZgL3b3v
9bNpgEjmF4Y3C5SOKNXE8s02pVeqkCYN8JHJfa7laS3GeKiSyCARCno5y8eRItbA
JQnq9MgEY8I+sIWZbdviMU/FgDKVS2E2TTI5H/XMx0EtClYUakWrIlQ3ubMVeakH
9DJBrV+VZbCKYOmnGMpyJ635oImuHLl0+ANgZAIbngrV0jPZ7Mc9WY8F1nLwzrwZ
8GwRhcDHkd4HSLK7THWxHFPPvoV2BbyzDxWHon9gYahk5rOyaATKeTOoWsu7AOqA
uxTgL97LdK8Ovj0ZFkGYwGmI1lMNbtQBYJlcO2wePuEVZBfLgRH+62KmYqmhaWaZ
mbXOBhZZYSWc3xdKbDedKUB3CqocV5hgZM5MLBaHeJ8KJa0iChH+frzOp59BHt0B
vJrEeojuoe9TeMfKanAiGAu+0OAFh5Bvmy8tc8sM6hdZ4QWOeg0dMrPzMiMFYFqf
NEkphFRdC0m8JD1CuvCsgue0pTRqL1y9B/MzgIRZQjKkR4GCO0J3j5PeF0bSjtyp
evRvZnq2VarZxNf2daJ0KppsoV659ff/5DH4WL5J+m8y2bnBFVVC/y1cBTt9cNSI
B/dGHVCJFiGalNgFsC8eipAXqxG5/xrE0vH4atqfFE4vnTNRYYg=
=qp80
-----END PGP SIGNATURE-----

Reply via email to