[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587966#comment-15587966
 ] 

Greg Fodor commented on KAFKA-4113:
-----------------------------------

Having played around with Kafka Streams for a while now, I am still confused 
(and we still get burned) by this. Let me walk through a case, and see if you 
guys can find out where I am misunderstanding.

Say we have a topic that's a user table changelog that has user id keys and 
user records. And we have a clickstream topic that is just a user id to url. 
For the sake of this example, lets assume our kafka streams job has been 
running from t = 0 where both topics were empty, so there's no bootstrapping 
problem.

In the Kafka Streams DSL, I would tap the user table topic via 
`KStreamBuilder#table`. As best I can tell, this creates a KTable with:
- An unlogged rocksdb state store (which is going to land on disk)
- A new source that is the table topic

After this, I'm going to tap + inner join the click stream as a KStream on user 
id, and just for this example lets assume I'll sink it all out too to a new 
topic.

As my node is humming along, it is writing the user id -> user record k/vs to 
the local rocksdb but is *not* storing the changes to the rocksdb in a topic, 
because it is not marked as logged. When it reads a record from the KStream, 
the join is performed by looking for the key in the state store. As mentioned, 
my understanding is that the join against the stream will wait until the 
records for the KTable which have earlier timestamps have been consumed. This 
makes sense.

If I terminate and restart the java process, the kafka consumer for the KTable 
will pick off at the last committed offset for the user table topic. It may 
re-consume a few seconds worth of records, and re-write a few keys in the rocks 
db store, but after that it's still got the full historical state of the topic. 
So joins against any user id will continue to work.

Where things completely stop making sense for me is if I lose the node. If I 
lose the node, i lose my rocksdb, which is not logged so is not backed by a 
changelog topic. When I bring up a new node, my understanding is that the 
consumer will *not* start at the beginning of the topic used for the KTable, it 
will just pick up at the last commit. So what I end up with is a rocksdb that 
only contains the last couple of records from the user table topic. This is 
obviously really broken, because now my joins will start failling. (And it 
seems I was lulled into complaency here since I was robust across JVM restarts, 
but not across node failures.) I believe this problem also happens in a more 
nefarious way upon rebalances, since if a partition of the KTable gets 
reassigned, it will also have a partially complete rocksdb store for that 
partition since it will just consume from the last committed offset. Similarly, 
and even scarier, if it gets assigned back to the original node, that node now 
has a rocksdb store with a very small gap, for the key changes that happened 
during the period where it was assigned to another node.

I am not sure if I am missing something here but this has been the behavior we 
have seen. The workarounds we have done for this problem are:
- write a routine to let us reset the KTable topics consumer offsets to zero 
(still doesn't help with a rebalance)
- perform a "touch" to the database records we are flushing to kafka, so new 
copies of all of the records are appended to the topic via kafka connect, and 
are forced into the rocksdb stores (this works well, but obviously is terrible)
- put a dummy aggregation/reduce after the tap of the KTable topic, which 
forces things into a logged state store that will be fully materialized on 
startup if it is missing

Thoughts?

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to