[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270955#comment-17270955 ] Matthias J. Sax commented on KAFKA-4113: I guess you will need to replicate the input data into a second topic, and then use one topic with timestamp 0, and the other topic with "normal" timestamps. If you have only one topic, reach record will be processed one-by-one and thus, you cannot have a fully bootstrapped table when processing the first record – with KafkaStreams it's not possible to read a topic to "bootstrap" and afterwards read the topic a second time. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270739#comment-17270739 ] Stuart Perks commented on KAFKA-4113: - [~mjsax] I have a scenario where I have a KTable which is compacted topic, which I convert to a stream and then stream the data, flatmap it and rekey and join with other data on the new keys with the same KTable. With cache off this scenario will process each message if the topic has not compacted them. So basically a self join on the KTable. I want to always use the latest data on the KTable so a bootstrap function would be great does not seem to be happening. If I attempt the 0 custom timestamp extractor this does not work as data is the same so both stream and table would be 0. Keeping the normal time semantic it seems of processing each record even if the same key. Are there any other ideas of ways around this to always join with the latest data on a KTable when i am already driving the join from the same KTable. Distinguish the timestamp extractor differently between the KTable and the KTable.toStream seems unlikely. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938125#comment-16938125 ] Matthias J. Sax commented on KAFKA-4113: Yes, increasing max.task.idle.ms help to get stricter guarantees. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938108#comment-16938108 ] Robert L Brooks Jr commented on KAFKA-4113: --- [~mjsax] Thanks for your quick responses. This was very helpful. For the proposed solution, we would also need to change this config correct - max.task.idle.ms? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938074#comment-16938074 ] Matthias J. Sax commented on KAFKA-4113: If you use the WallClockTime extractor, timestamps are assigned _after_ they are pulled from Kafka with the wall-clock time from you application (ie, client side) – btw: each time you re-read existing data, a different timestamp will be uses because wall-clock time will always be different. Hence, it's non-deterministic in which order records are processed because there is no contract what `poll()` returns. So your understanding seems correct. What you could do it, to write a custom timestamp extractor, and return `0` for each table side record and wall-clock time for each stream side record. In `extract()` to get a `ConsumerRecord` and can inspect the topic name to distinguish between both. Because `0` is smaller than wall-clock time, you can "bootstrap" the table to the end of the topic before any stream-side record gets processed. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16938024#comment-16938024 ] Robert L Brooks Jr commented on KAFKA-4113: --- Okay, so what happens if the stream and broker are set up using the process-time semantics, default.time.extractor = WallClockTime? It doesn't sound like the table is initialized from a snapshot before any join processing is done, or given any prioritization. So I guess this entire thread is about the fact that you can have failed joins given similar scenarios, using default event-time: premise: each topic has 1 partition, each record consists of a String key and Integer representing time produced. application-stream( 100,000 records): [key1, 5], [key5, 3], [key2, 2], [key3, 4], [key4, 3] employee-stream(10,000 records): [key1, 1], [key2, 2], [key3, 4], [key4, 3], [key5, 5]... Time of processing Left Join application-table: null null maybe maybe null employee-stream: key1 key2 key3 key4 key5 results from Join: fail fail maybe maybe fail Is this the correct understanding? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937950#comment-16937950 ] Matthias J. Sax commented on KAFKA-4113: Not sure If I understand. Let's do an example (only showing timestamps, consuming from left (small offset) to right). Table-topic-ts: 10, 12, 14, 16, 17, 19, 20 Stream-Topic:ts: 18, 19, 20, 21, 23 Assuming auto.offset.reset=earliest, we would compare the timestamps for both topic (on a per partition basis, ie, stream-p0 compare to table-p0 etc). In our example, the table records have the smallest timestamps and would be processed first, hence, the table would be "bootstraped" from 10 to 17. Afterwards, stream record with timestamp 18 would be processed because the next table update has larger timestamp 19. If the records from both inputs have the same timestamp, there is currently no guarantee which record is processed first – it's a know gap that we need to address at some point. Does this answer your question? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937936#comment-16937936 ] Robert L Brooks Jr commented on KAFKA-4113: --- I have been reading through these comments to try to understand the KTable Bootstrap process. Please correct me if my take away is incorrect, it is as follows: When the stream starts up KTables are created by taking a snapshot of the source topic based on the largest record time in the joining Stream source topic. The KTable's state store should be fully populated with the snapshot records before the streaming process any joins. Furthermore, KTables processing is prioritized over KStream processing. Is this the correct understanding of the KTable bootstrap process? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748284#comment-16748284 ] Matthias J. Sax commented on KAFKA-4113: Globally: [https://kafka.apache.org/21/documentation/streams/developer-guide/memory-mgmt.html] For one operator via `Materialized` parameter: [https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-join] > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1674#comment-1674 ] Edmondo Porcu commented on KAFKA-4113: -- How do you disable caching? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747512#comment-16747512 ] Matthias J. Sax commented on KAFKA-4113: Can it be related to caching? Did you disable caching? If not, try disabling it. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746672#comment-16746672 ] Edmondo Porcu commented on KAFKA-4113: -- Just checked, our app is already using 2.1 as a library. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746657#comment-16746657 ] Matthias J. Sax commented on KAFKA-4113: Updating Kafka Streams is sufficient – the fix is client side. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746634#comment-16746634 ] Edmondo Porcu commented on KAFKA-4113: -- Obviously stupid question: 2.1 broker side or kafka-streams jar side? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746592#comment-16746592 ] Matthias J. Sax commented on KAFKA-4113: I guess what you describe related to timestamp synchronization. This is fixed in 2.1 via https://issues.apache.org/jira/browse/KAFKA-3514 – can you try out 2.1 to see if this fixed the issue? The problem is, that timestamp synchronization is best effort before 2.1 and KTable-KTable joins are "eventually consistent" for the case you describe but there is no guarantee that you get all intermediate result on replay. In 2.1, this should work fully consistent on replay. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744779#comment-16744779 ] Edmondo Porcu commented on KAFKA-4113: -- Sure, most precisely, we are running a Ktable-KTable Kstreams app which performs a join of two compacted topics: they have the same number of partition and the same key, and when the up is "running" and we get new items on the topics, everything works fine. Since we are still having our brokers in 0.11, sometimes the app crashes with OutOfOrderException and as it restarts, since we have no local storage, it will consume all the changelog. When this happens, we see some join failures at startup, i.e. data that we know and we checked exist with the correct timestamps on both topics which doesn't trigger an output join. We performed the following checks # The data is in the topics at the right time for both left and right side, with the right timestamp # Missed join can be re-triggered by making either the left side or the right side tick again # In the end, since one of the two Ktable is a join of kstream-kstream separate app consuming from a topic produced by Kafka connect, we end up updating the timestamp columns in the database to solve the problem # Note that at point 1 we have verified that the data is always available in the Ktables, so the join mentioned at the 3 item of this list works correctly and is executed in a separate app. The one failing is the ktable-ktable The impression is that when OutOfOrderException occurs and the app restarts, one of the two topics is consumed quicker than the other one (one of the two topics is much larger in terms of data size) and therefore the lookup of the Inner Join failst. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744273#comment-16744273 ] Matthias J. Sax commented on KAFKA-4113: KTable-KTable join should not be effected because both sides are stateful (in contrast to KStream-KTable join for which only KTable side is stateful). Can you describe in more detail what "join failures" you observe, and what you exactly mean by this (missing expected join results?). Also, what do you mean by "we reprocess a long topic" – what does long mean? Is sounds like, you topic is configures with retention but not compaction? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743260#comment-16743260 ] Edmondo Porcu commented on KAFKA-4113: -- I think we might be experiencing this with KTable-KTable , we have some join failures at startup of the app.Could this be the cause? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535324#comment-16535324 ] Matthias J. Sax commented on KAFKA-4113: Thanks for the feedback [~twbecker] and [~graphex]! We prioritize feature based on user feedback. In the beginning there was not much complaint about it. For GlobalKTable the behavior is different by design, because GlobalKTables are designed for "static" data. From my point of view, the design space has two dimensions: partitioned vs broadcasted data, and timestamp-alignment or non-alignment. Currently, we only offer partitions plus aligned (KTable) and broadcasted plus non-aligned (GlobalKTable). Thus, we are missing two more. Bootstrapping/pre-loading only makes sense for the non-aligned cases IMHO. We brainstormed about making the strategy plugable at some point – but never pushed it forward so far. I see some more additional use-cases for which this might make sense. It's all about feature prioritization and how much we can get done... Of course, it's an open-source project and contributions are very welcome :) I personally believe that the timestamp aligned semantic is correct and we should not sacrifice it. As mentioned above, I am happy to complement the design space and offer all 4 KTable variants. The non-timestamp aligned KTable should not be too hard to implement. The broadcast plus timestamp alignment thing is the most difficult one. The plugable strategy might also not be too hard to implement. But all of those would require a KIP to get a sound design. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535230#comment-16535230 ] Sean McKibben commented on KAFKA-4113: -- Thanks for pointing out KAFKA-3514, after being frustrated with both these issues for over a year, I'm surprised I missed it. It would be better for all my use cases to have the option for the preloading that global KTables have, rather than even a working best effort timestamp alignment; sorry if my previous comment didn't make that very clear. Seems like this feature would be much easier to implement, test, and document than 3514 as well. I've spent weeks artificially backdating my compacted KTable topics only to see no benefit, when what I really want is just a flag to ensure that my KTable is kept as up to date as possible through whatever non-timestamp-related semantics are easiest to implement. Something that is already available for global KTables, but won't scale for my use cases. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535220#comment-16535220 ] Tommy Becker commented on KAFKA-4113: - [~mjsax] has any thought been given to making the strategy for choosing which topics to process from pluggable? I feel like the current timestamp behavior is one such strategy, but for some other use-cases I feel that a simple topic-level prioritization would be sufficient. For example, in the case where the table backing topic receives way less traffic than the stream topic, I think it could be reasonable to always prefer messages from the table topic over the stream topic. Such a scheme could work for a lot of cases and is quite a bit easier to reason about and implement. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535133#comment-16535133 ] Matthias J. Sax commented on KAFKA-4113: [~graphex] What you report is a bug and tracked via KAFKA-3514 – it's on the top list of things we want to fix! I completely agree that KAFKA-3514 breaks the expected behavior that a KTable with older record timestamps should be loaded before processing stream records starts. The difference is, that it's a bug, while the request discussed here is a semantic change request – thus, I think it's best to discuss both separately. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535084#comment-16535084 ] Sean McKibben commented on KAFKA-4113: -- I think the sad reality of Kafka Streams' behavior right now is even worse than is being portrayed in this ticket. Let's say I have a Kafka cluster with 2 topics, "events" and "users" and I want to produce a joined "enriched_events" topic using Kafka Streams KStream-KTable join. Let's also say that my Kafka cluster currently has 1 billion user records in the users topic, and 100 million event records in the events topic, with about 100k new events coming in per second. We can even further say that the timestamps for every message in the "users" topic is before any of the timestamps in the "events" topic (and of course both topics are keyed with the user id and partitioned the same). What I would expect from my new Kafka Streams app is that I would be able to make a KTable out of "users" and do a leftJoin with events, and be able to see messages with events+users flow right in to enriched_events, after waiting for the users KTable to populate the RocksDB database on each instance of my app. Unfortunately, what actually happens is that the app quickly processes the 100 million events while slowly populating the RocksDB instance, so enriched_events receives almost no enrichment from corresponding values in my KTable, regardless of any timestamp management. Only after the app has burned through the 100 million event backlog that existed when it started, and further continued to process 100k events per second for a really long time, will the local RocksDB even be mostly populated and we'll see a reasonable number of successful joins with users flowing into "enriched_events". The only time I've seen behavior remotely similar to what is described here as the best effort is when i restart a Kafka Streams with the same Application ID after it has been running in steady state for a long time. In that case, though it is difficult to actually see what is going on, there appears to be some degree of KTable preloading occurring. The only workable solution I've been able to find that avoids "enriched_events" being filled with a bunch of un-enriched events, is to make a new topic, "events_controlled" to use as my Kafka Streams KStream and keep it completely empty, then start up my app, manually watch the lag of my app reading the "users" topic until it gets all the way to 0, and then start a separate application to copy messages from "events" to "events_controlled". This is a pretty high touch solution and is far from ideal in any scenario. I really don't think the data model of a compacted topic is sufficient to reasonably even attempt to provide "join-at-the-time" semantics as described in this ticket. If you need historical joining, use a database with a history for each record. If you're using a compacted Kafka topic as a KTable, you're joining to, your source of truth is, by definition, only intended to contain the latest value for each key in your dataset. This is very much at odds with the "best effort" timestamp alignment strategy that acts as your only option for KStream-KTable match semantics, and which doesn't even appear to provide any effort during first run to preload anything into the KTable. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // p
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534356#comment-16534356 ] Matthias J. Sax commented on KAFKA-4113: I see you point. I still think, that the timestamp based semantic is superior and I am personally in favor to keep it. However, I agree that a timely decoupled table also has a broad use-case spectrum and we should allow this – however, it should not replace the current KTable, but complement it IMHO. The behavior and semantics would be similar to GlobalKTables. About, "table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone" – if you want to do re-processing I agree. However, the time-synchronization is not just important for re-processing, but provides sound semantics in general. Without it, the computation is inherently non-deterministic (what I believe is not what most people want). To fix the re-processing case, we would need to "protect" the head of the log from compaction: ie, the retention time of the input stream and the non-compacted head of the log must be equally large – there is a config `min.compaction.lag` but I am actually not 100% sure if it can be used for this purpose. Would need to double check. Maybe [~guozhang] knows? Semantically, it is sound that you cannot do reprocessing if you lost old table state – note that, reprocessing should ensure that you compute the same result (if you don't change the program) than in the original run – if log compaction deletes old data, you can obviously not reprocess it. Using the latest KTable data will result in joining old stream records with "future" table data (future in this case is relative future to the stream records of course) and thus produce a different result and would be incorrect, too. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259 ] Tommy Becker commented on KAFKA-4113: - Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics, 1 stream and 1 table with existing data and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512051#comment-16512051 ] Don commented on KAFKA-4113: Hi Ben, you had a very interesting link in the original comment. Coincidentally we were looking into how to bootstrap KTable/GLobalKTable when you posted this. It's giving 404 now :/ Was there any technical reason for removing it? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510911#comment-16510911 ] Ben Stopford commented on KAFKA-4113: - Whilst I like the 'time-aligned' approach to loading KTables very much, it definitely catches people out. I think this is compounded by the fact that GKTables don't behave like this (they bootstrap themselves on startup rather than being time aligned). Different use cases actually better suit one or the other (as noted above). So for example, if you're joining Orders to Customers and doing reprocessing you might want the 'as at' version of the customer (say with an old email address) or the latest version of the customer (with their most recent email). So I think KStreams should support both (a) preloaded or (b) event time ideally in both types of table, letting the user define the behaviour. I've tried to explain the background to this in a bit more detail [here|http://www.benstopford.com/2018/06/13/things-can-trip-building-streams-apps/]. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160518#comment-16160518 ] Elias Levy commented on KAFKA-4113: --- OK in the sense it is not a fatal failure, but not in the sense that it is the desired behavior. I concede that it may not be the desirable behavior on all use cases. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160493#comment-16160493 ] Jay Kreps commented on KAFKA-4113: -- But I suppose you must be okay with that not happening too, since in steady state you'll be joining data in the table with data in the stream at about the same timestamp. I think my argument is that if you are okay with this behavior in normal operation you should also be okay with it during bootstrapping (and in many cases joining on future data and producing results that wouldn't occur in steady state operation is not desirable). > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160435#comment-16160435 ] Elias Levy commented on KAFKA-4113: --- In response to [~jkreps], in our user case it would be best for the join to be performed against the latest {{KTable}} data. So yes, we do prefer the {{KTable}} to be fully loaded, even if it's timestamps are far ahead of the {{KStream}}, before the join occurs. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)