[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2021-01-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270955#comment-17270955
 ] 

Matthias J. Sax commented on KAFKA-4113:


I guess you will need to replicate the input data into a second topic, and then 
use one topic with timestamp 0, and the other topic with "normal" timestamps.

If you have only one topic, reach record will be processed one-by-one and thus, 
you cannot have a fully bootstrapped table when processing the first record – 
with KafkaStreams it's not possible to read a topic to "bootstrap" and 
afterwards read the topic a second time.

 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2021-01-23 Thread Stuart Perks (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270739#comment-17270739
 ] 

Stuart Perks commented on KAFKA-4113:
-

[~mjsax] I have a scenario where I have a KTable which is compacted topic, 
which I convert to a stream and then stream the data, flatmap it and rekey and 
join with other data on the new keys with the same KTable. With cache off this 
scenario will process each message if the topic has not compacted them.

So basically a self join on the KTable. I want to always use the latest data on 
the KTable so a bootstrap function would be great does not seem to be 
happening. If I attempt the 0 custom timestamp extractor this does not work as 
data is the same so both stream and table would be 0. Keeping the normal time 
semantic it seems of processing each record even if the same key. 

Are there any other ideas of ways around this to always join with the latest 
data on a KTable when i am already driving the join from the same KTable.

Distinguish the timestamp extractor differently between the KTable and the 
KTable.toStream seems unlikely. 







> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938125#comment-16938125
 ] 

Matthias J. Sax commented on KAFKA-4113:


Yes, increasing max.task.idle.ms help to get stricter guarantees.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Robert L Brooks Jr (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938108#comment-16938108
 ] 

Robert L Brooks Jr commented on KAFKA-4113:
---

[~mjsax] Thanks for your quick responses. This was very helpful.  For the 
proposed solution, we would also need to change this config correct - 
max.task.idle.ms?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938074#comment-16938074
 ] 

Matthias J. Sax commented on KAFKA-4113:


If you use the WallClockTime extractor, timestamps are assigned _after_ they 
are pulled from Kafka with the wall-clock time from you application (ie, client 
side) – btw: each time you re-read existing data, a different timestamp will be 
uses because wall-clock time will always be different. Hence, it's 
non-deterministic in which order records are processed because there is no 
contract what `poll()` returns.

So your understanding seems correct.

What you could do it, to write a custom timestamp extractor, and return `0` for 
each table side record and wall-clock time for each stream side record. In 
`extract()` to get a `ConsumerRecord` and can inspect the topic name to 
distinguish between both. Because `0` is smaller than wall-clock time, you can 
"bootstrap" the table to the end of the topic before any stream-side record 
gets processed.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Robert L Brooks Jr (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938024#comment-16938024
 ] 

Robert L Brooks Jr commented on KAFKA-4113:
---

Okay, so what happens if the stream and broker are set up using the 
process-time semantics, default.time.extractor = WallClockTime? It doesn't 
sound like the table is initialized from a snapshot before any join processing 
is done, or given any prioritization. So I guess this entire thread is about 
the fact that you can have failed joins given similar scenarios, using default 
event-time:

premise: each topic has 1 partition, each record consists of a String key and 
Integer representing time produced.

application-stream( 100,000 records): [key1, 5], [key5, 3], [key2, 2], [key3, 
4], [key4, 3]

employee-stream(10,000 records):       [key1, 1], [key2, 2], [key3, 4], [key4, 
3], [key5, 5]...

Time of processing Left Join

application-table:        null   null    maybe    maybe   null

employee-stream:      key1  key2  key3       key4      key5

results from Join:        fail    fail     maybe    maybe   fail

 

Is this the correct understanding?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937950#comment-16937950
 ] 

Matthias J. Sax commented on KAFKA-4113:


Not sure If I understand. Let's do an example (only showing timestamps, 
consuming from left (small offset) to right).

Table-topic-ts: 10, 12, 14, 16, 17, 19, 20

Stream-Topic:ts: 18, 19, 20, 21, 23

Assuming auto.offset.reset=earliest, we would compare the timestamps for both 
topic (on a per partition basis, ie, stream-p0 compare to table-p0 etc). In our 
example, the table records have the smallest timestamps and would be processed 
first, hence, the table would be "bootstraped" from 10 to 17. Afterwards, 
stream record with timestamp 18 would be processed because the next table 
update has larger timestamp 19.

If the records from both inputs have the same timestamp, there is currently no 
guarantee which record is processed first – it's a know gap that we need to 
address at some point.

Does this answer your question?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-09-25 Thread Robert L Brooks Jr (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937936#comment-16937936
 ] 

Robert L Brooks Jr commented on KAFKA-4113:
---

I have been reading through these comments to try to understand the KTable 
Bootstrap process. Please correct me if my take away is incorrect, it is as 
follows:

When the stream starts up KTables are created by taking a snapshot of the 
source topic based on the largest record time in the joining Stream source 
topic. The KTable's state store should be fully populated with the snapshot 
records before the streaming process any joins. Furthermore, KTables processing 
is prioritized over KStream processing.

Is this the correct understanding of the KTable bootstrap process?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748284#comment-16748284
 ] 

Matthias J. Sax commented on KAFKA-4113:


Globally: 
[https://kafka.apache.org/21/documentation/streams/developer-guide/memory-mgmt.html]
For one operator via `Materialized` parameter: 
[https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-join]

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-21 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1674#comment-1674
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

How do you disable caching?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747512#comment-16747512
 ] 

Matthias J. Sax commented on KAFKA-4113:


Can it be related to caching? Did you disable caching? If not, try disabling it.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-18 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746672#comment-16746672
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

Just checked, our app is already using 2.1 as a library. 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-18 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746657#comment-16746657
 ] 

Matthias J. Sax commented on KAFKA-4113:


Updating Kafka Streams is sufficient – the fix is client side.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-18 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746634#comment-16746634
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

Obviously stupid question: 2.1 broker side or kafka-streams jar side?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-18 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746592#comment-16746592
 ] 

Matthias J. Sax commented on KAFKA-4113:


I guess what you describe related to timestamp synchronization. This is fixed 
in 2.1 via https://issues.apache.org/jira/browse/KAFKA-3514 – can you try out 
2.1 to see if this fixed the issue? The problem is, that timestamp 
synchronization is best effort before 2.1 and KTable-KTable joins are 
"eventually consistent" for the case you describe but there is no guarantee 
that you get all intermediate result on replay. In 2.1, this should work fully 
consistent on replay.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-17 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744779#comment-16744779
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

Sure, most precisely, we are running a Ktable-KTable Kstreams app which 
performs a join of two compacted topics: they have the same number of partition 
and the same key, and when the up is "running" and we get new items on the 
topics, everything works fine.

 

Since we are still having our brokers in 0.11, sometimes the app crashes with 
OutOfOrderException and as it restarts, since we have no local storage, it will 
consume all the changelog. When this happens, we see some join failures at 
startup, i.e. data that we know and we checked exist with the correct 
timestamps on both topics which doesn't trigger an output join. 

 

We performed the following checks
 # The data is in the topics at the right time for both left and right side, 
with the right timestamp
 # Missed join can be re-triggered by making either the left side or the right 
side tick again
 # In the end, since one of the two Ktable is a join of kstream-kstream 
separate app consuming from a topic produced by Kafka connect, we end up 
updating the timestamp columns in the database to solve the problem
 # Note that at point 1 we have verified that the data is always available in 
the Ktables, so the join mentioned at the 3 item of this list works correctly 
and is executed in a separate app. The one failing is the ktable-ktable 

 

The impression is that when OutOfOrderException occurs and the app restarts, 
one of the two topics is consumed quicker than the other one (one of the two 
topics is much larger in terms of data size) and therefore the lookup of the 
Inner Join failst. 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744273#comment-16744273
 ] 

Matthias J. Sax commented on KAFKA-4113:


KTable-KTable join should not be effected because both sides are stateful (in 
contrast to KStream-KTable join for which only KTable side is stateful). Can 
you describe in more detail what "join failures" you observe, and what you 
exactly mean by this (missing expected join results?).

Also, what do you mean by "we reprocess a long topic" – what does long mean? Is 
sounds like, you topic is configures with retention but not compaction?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-15 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743260#comment-16743260
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

 I think we might be experiencing this with KTable-KTable , we have some join 
failures at startup of the app.Could this be the cause?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535324#comment-16535324
 ] 

Matthias J. Sax commented on KAFKA-4113:


Thanks for the feedback [~twbecker] and [~graphex]! We prioritize feature based 
on user feedback. In the beginning there was not much complaint about it.

For GlobalKTable the behavior is different by design, because GlobalKTables are 
designed for "static" data. From my point of view, the design space has two 
dimensions: partitioned vs broadcasted data, and timestamp-alignment or 
non-alignment. Currently, we only offer partitions plus aligned (KTable) and 
broadcasted plus non-aligned (GlobalKTable). Thus, we are missing two more.

Bootstrapping/pre-loading only makes sense for the non-aligned cases IMHO.

We brainstormed about making the strategy plugable at some point – but never 
pushed it forward so far. I see some more additional use-cases for which this 
might make sense. It's all about feature prioritization and how much we can get 
done... Of course, it's an open-source project and contributions are very 
welcome :)

I personally believe that the timestamp aligned semantic is correct and we 
should not sacrifice it. As mentioned above, I am happy to complement the 
design space and offer all 4 KTable variants. The non-timestamp aligned KTable 
should not be too hard to implement. The broadcast plus timestamp alignment 
thing is the most difficult one. The plugable strategy might also not be too 
hard to implement. But all of those would require a KIP to get a sound design.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Sean McKibben (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535230#comment-16535230
 ] 

Sean McKibben commented on KAFKA-4113:
--

Thanks for pointing out KAFKA-3514, after being frustrated with both these 
issues for over a year, I'm surprised I missed it. It would be better for all 
my use cases to have the option for the preloading that global KTables have, 
rather than even a working best effort timestamp alignment; sorry if my 
previous comment didn't make that very clear.

Seems like this feature would be much easier to implement, test, and document 
than 3514 as well. I've spent weeks artificially backdating my compacted KTable 
topics only to see no benefit, when what I really want is just a flag to ensure 
that my KTable is kept as up to date as possible through whatever 
non-timestamp-related semantics are easiest to implement. Something that is 
already available for global KTables, but won't scale for my use cases.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535220#comment-16535220
 ] 

Tommy Becker commented on KAFKA-4113:
-

[~mjsax] has any thought been given to making the strategy for choosing which 
topics to process from pluggable? I feel like the current timestamp behavior is 
one such strategy, but for some other use-cases I feel that a simple 
topic-level prioritization would be sufficient. For example, in the case where 
the table backing topic receives way less traffic than the stream topic, I 
think it could be reasonable to always prefer messages from the table topic 
over the stream topic. Such a scheme could work for a lot of cases and is quite 
a bit easier to reason about and implement.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535133#comment-16535133
 ] 

Matthias J. Sax commented on KAFKA-4113:


[~graphex] What you report is a bug and tracked via KAFKA-3514 – it's on the 
top list of things we want to fix! I completely agree that KAFKA-3514 breaks 
the expected behavior that a KTable with older record timestamps should be 
loaded before processing stream records starts. The difference is, that it's a 
bug, while the request discussed here is a semantic change request – thus, I 
think it's best to discuss both separately.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Sean McKibben (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535084#comment-16535084
 ] 

Sean McKibben commented on KAFKA-4113:
--

I think the sad reality of Kafka Streams' behavior right now is even worse than 
is being portrayed in this ticket. Let's say I have a Kafka cluster with 2 
topics, "events" and "users" and I want to produce a joined "enriched_events" 
topic using Kafka Streams KStream-KTable join. Let's also say that my Kafka 
cluster currently has 1 billion user records in the users topic, and 100 
million event records in the events topic, with about 100k new events coming in 
per second. We can even further say that the timestamps for every message in 
the "users" topic is before any of the timestamps in the "events" topic (and of 
course both topics are keyed with the user id and partitioned the same).

What I would expect from my new Kafka Streams app is that I would be able to 
make a KTable out of "users" and do a leftJoin with events, and be able to see 
messages with events+users flow right in to enriched_events, after waiting for 
the users KTable to populate the RocksDB database on each instance of my app. 
Unfortunately, what actually happens is that the app quickly processes the 100 
million events while slowly populating the RocksDB instance, so enriched_events 
receives almost no enrichment from corresponding values in my KTable, 
regardless of any timestamp management.

Only after the app has burned through the 100 million event backlog that 
existed when it started, and further continued to process 100k events per 
second for a really long time, will the local RocksDB even be mostly populated 
and we'll see a reasonable number of successful joins with users flowing into 
"enriched_events".

The only time I've seen behavior remotely similar to what is described here as 
the best effort is when i restart a Kafka Streams with the same Application ID 
after it has been running in steady state for a long time. In that case, though 
it is difficult to actually see what is going on, there appears to be some 
degree of KTable preloading occurring.

The only workable solution I've been able to find that avoids "enriched_events" 
being filled with a bunch of un-enriched events, is to make a new topic, 
"events_controlled" to use as my Kafka Streams KStream and keep it completely 
empty, then start up my app, manually watch the lag of my app reading the 
"users" topic until it gets all the way to 0, and then start a separate 
application to copy messages from "events" to "events_controlled". This is a 
pretty high touch solution and is far from ideal in any scenario.

I really don't think the data model of a compacted topic is sufficient to 
reasonably even attempt to provide "join-at-the-time" semantics as described in 
this ticket. If you need historical joining, use a database with a history for 
each record. If you're using a compacted Kafka topic as a KTable, you're 
joining to, your source of truth is, by definition, only intended to contain 
the latest value for each key in your dataset. This is very much at odds with 
the "best effort" timestamp alignment strategy that acts as your only option 
for KStream-KTable match semantics, and which doesn't even appear to provide 
any effort during first run to preload anything into the KTable.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // p

[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-05 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534356#comment-16534356
 ] 

Matthias J. Sax commented on KAFKA-4113:


I see you point. I still think, that the timestamp based semantic is superior 
and I am personally in favor to keep it. However, I agree that a timely 
decoupled table also has a broad use-case spectrum and we should allow this – 
however, it should not replace the current KTable, but complement it IMHO. The 
behavior and semantics would be similar to GlobalKTables.

About, "table backing topic is almost certainly log-compacted which means you 
can't achieve these semantics regardless as these older values are now gone" – 
if you want to do re-processing I agree. However, the time-synchronization is 
not just important for re-processing, but provides sound semantics in general. 
Without it, the computation is inherently non-deterministic (what I believe is 
not what most people want).

To fix the re-processing case, we would need to "protect" the head of the log 
from compaction: ie, the retention time of the input stream and the 
non-compacted head of the log must be equally large – there is a config 
`min.compaction.lag` but I am actually not 100% sure if it can be used for this 
purpose. Would need to double check. Maybe [~guozhang] knows?

Semantically, it is sound that you cannot do reprocessing if you lost old table 
state – note that, reprocessing should ensure that you compute the same result 
(if you don't change the program) than in the original run – if log compaction 
deletes old data, you can obviously not reprocess it. Using the latest KTable 
data will result in joining old stream records with "future" table data (future 
in this case is relative future to the stream records of course) and thus 
produce a different result and would be incorrect, too.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-05 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259
 ] 

Tommy Becker commented on KAFKA-4113:
-

Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics, 1 stream and 1 table with existing data and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the stream (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-06-14 Thread Don (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512051#comment-16512051
 ] 

Don commented on KAFKA-4113:


Hi Ben, you had a very interesting link in the original comment. Coincidentally 
we were looking into how to bootstrap KTable/GLobalKTable when you posted this. 
It's giving 404 now :/
Was there any technical reason for removing it? 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-06-13 Thread Ben Stopford (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510911#comment-16510911
 ] 

Ben Stopford commented on KAFKA-4113:
-

Whilst I like the 'time-aligned' approach to loading KTables very much, it 
definitely catches people out. I think this is compounded by the fact that 
GKTables don't behave like this (they bootstrap themselves on startup rather 
than being time aligned).

Different use cases actually better suit one or the other (as noted above). So 
for example, if you're joining Orders to Customers and doing reprocessing you 
might want the 'as at' version of the customer (say with an old email address) 
or the latest version of the customer (with their most recent email).

So I think KStreams should support both (a) preloaded or (b) event time ideally 
in both types of table, letting the user define the behaviour.

I've tried to explain the background to this in a bit more detail 
[here|http://www.benstopford.com/2018/06/13/things-can-trip-building-streams-apps/].
 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2017-09-10 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160518#comment-16160518
 ] 

Elias Levy commented on KAFKA-4113:
---

OK in the sense it is not a fatal failure, but not in the sense that it is the 
desired behavior.  I concede that it may not be the desirable behavior on all 
use cases.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2017-09-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160493#comment-16160493
 ] 

Jay Kreps commented on KAFKA-4113:
--

But I suppose you must be okay with that not happening too, since in steady 
state you'll be joining data in the table with data in the stream at about the 
same timestamp. I think my argument is that if you are okay with this behavior 
in normal operation you should also be okay with it during bootstrapping (and 
in many cases joining on future data and producing results that wouldn't occur 
in steady state operation is not desirable).

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2017-09-10 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160435#comment-16160435
 ] 

Elias Levy commented on KAFKA-4113:
---

In response to [~jkreps], in our user case it would be best for the join to be 
performed against the latest {{KTable}} data.  So yes, we do prefer the 
{{KTable}} to be fully loaded, even if it's timestamps are far ahead of the 
{{KStream}}, before the join occurs.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)