[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468817#comment-15468817
 ] 

Guozhang Wang commented on KAFKA-4113:
--------------------------------------

More thoughts regarding this issue after discussing with [~jkreps] offline:

1. Within a single stream, records may not be ordered based on their 
timestamps. And since Kafka Streams always try to process records based on the 
source topic's log append ordering, it will result in so-called {{late arrived 
records}}.

2. When a single Kafka Streams task has multiple input streams, it will try to 
"synchronize" these streams based on their timestamps in a best-effort manner. 
Therefore we may process a record from one stream with higher timestamp before 
processing another record from another stream with lower timestamp.

3. For table-stream joins, ideally we want to process records strictly 
following the timestamp ordering across multiple streams; in that case, I 
believe users do NOT really need to {{bootstrap a KTable}} since as long as the 
timestamps of the stream and the table changelog are defined correctly, we are 
guaranteed to have the correct table snapshot when joining with the stream. For 
example, say if your table's update traffic is only once-a-week, whose 
timestamp is defined as the EOW, then when you are (re-)starting the 
application, it is guaranteed that any stream records whose timestamp is 
defined before the EOW time will be joined against the "old" table snapshot, 
and any stream records whose timestamp is defined after the EOW time will be 
joined against the "updated" table snapshot, which is the right behavior.

4. However because 1) and 2) above, we are not strictly following the timestamp 
ordering, the join result is not guaranteed to be "correct" or deterministic. 
In addition, note that the above reasoning assumes that the timestamps on both 
streams are defined in a consistent manner, for example, if both of these 
records are generated by the same application who's using the same clock for 
setting the timestamps; otherwise, for example if the joining streams are not 
set by the same application of service and there is a time drift between their 
clocks, then even {{strictly following the timestamp-based ordering}} may still 
not generate the correct result, and scenarios like [~mjsax] mentioned that a 
KTable's record may not available when the KStream's record has arrived and is 
trying to be enriched with the KTable, if its timestamp is indeed defined to be 
smaller than the corresponding KTable's record timestamp.

5. Therefore, users propose {{bootstrap a KTable}} mainly as a way to give the 
table's changelog stream's time a bit "advantage" over the ordering based on 
their timestamps so that they are more likely to be processed than the other 
record stream with the similar timestamps. On the other hand, because of 4) 
mentioned above I think it is very hard, or even impossible to get absolute 
"correct answers", but just deterministic answers to the best (that is also the 
motivation of using window retention period in Kafka Streams, or watermarks / 
hints indicating if there is no late records, along with triggering mechanisms 
in other frameworks I think).

Following these arguments, here are a list of proposals I'm thinking about to 
tackle this requirement:

1. Give users the flexibility to define ordering priorities across multiple 
streams in determining what is the next record to process (i.e. "synchronize" 
them). There are difference ways to expose this API; for example, Samza uses a 
{{MessageChooser}} user-customizable interface.

2. More restrictive than 1) above, we only allow users to specify an amount of 
time that one stream should go a little "in advance" with other streams such 
that its records with timestamp {{t + delta}} where {{delta}} is configurable 
is considered at the same time with other stream's records with time {{t}}. I 
think most the proposed options in the description of this ticket fall into 
this category.

3. Different to proposal 1) / 2) above. We change the implementation of 
KTable-KStream joins to also materialize the KStream based on a sliding window, 
so that when a record from KStream arrives, it tries to join with the current 
snapshot of KTable with its backed state store; and when a record from KTable 
arrives, it tries to join with the KStream's materialized window store with any 
matching records whose timestamp is smaller than the KTable's update record.

4. This is complementary to 2) / 3) above, that if we do not make the stream 
synchronization mechanism customizable as proposed in 1), then we can at least 
consider making it deterministic. So that any join types will generate 
deterministic results as well.

Thoughts [~mjsax] [~enothereska] [~damianguy]?

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to