[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587966#comment-15587966 ]
Greg Fodor commented on KAFKA-4113: ----------------------------------- Having played around with Kafka Streams for a while now, I am still confused (and we still get burned) by this. Let me walk through a case, and see if you guys can find out where I am misunderstanding. Say we have a topic that's a user table changelog that has user id keys and user records. And we have a clickstream topic that is just a user id to url. For the sake of this example, lets assume our kafka streams job has been running from t = 0 where both topics were empty, so there's no bootstrapping problem. In the Kafka Streams DSL, I would tap the user table topic via `KStreamBuilder#table`. As best I can tell, this creates a KTable with: - An unlogged rocksdb state store (which is going to land on disk) - A new source that is the table topic After this, I'm going to tap + inner join the click stream as a KStream on user id, and just for this example lets assume I'll sink it all out too to a new topic. As my node is humming along, it is writing the user id -> user record k/vs to the local rocksdb but is *not* storing the changes to the rocksdb in a topic, because it is not marked as logged. When it reads a record from the KStream, the join is performed by looking for the key in the state store. As mentioned, my understanding is that the join against the stream will wait until the records for the KTable which have earlier timestamps have been consumed. This makes sense. If I terminate and restart the java process, the kafka consumer for the KTable will pick off at the last committed offset for the user table topic. It may re-consume a few seconds worth of records, and re-write a few keys in the rocks db store, but after that it's still got the full historical state of the topic. So joins against any user id will continue to work. Where things completely stop making sense for me is if I lose the node. If I lose the node, i lose my rocksdb, which is not logged so is not backed by a changelog topic. When I bring up a new node, my understanding is that the consumer will *not* start at the beginning of the topic used for the KTable, it will just pick up at the last commit. So what I end up with is a rocksdb that only contains the last couple of records from the user table topic. This is obviously really broken, because now my joins will start failling. (And it seems I was lulled into complaency here since I was robust across JVM restarts, but not across node failures.) I believe this problem also happens in a more nefarious way upon rebalances, since if a partition of the KTable gets reassigned, it will also have a partially complete rocksdb store for that partition since it will just consume from the last committed offset. Similarly, and even scarier, if it gets assigned back to the original node, that node now has a rocksdb store with a very small gap, for the key changes that happened during the period where it was assigned to another node. I am not sure if I am missing something here but this has been the behavior we have seen. The workarounds we have done for this problem are: - write a routine to let us reset the KTable topics consumer offsets to zero (still doesn't help with a rebalance) - perform a "touch" to the database records we are flushing to kafka, so new copies of all of the records are appended to the topic via kafka connect, and are forced into the rocksdb stores (this works well, but obviously is terrible) - put a dummy aggregation/reduce after the tap of the KTable topic, which forces things into a logged state store that will be fully materialized on startup if it is missing Thoughts? > Allow KTable bootstrap > ---------------------- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)