[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15795304#comment-15795304 ]
Mitch Seymour commented on KAFKA-4113: -------------------------------------- I believe I am experiencing the same issue that [~gfodor] describes above (note: I'm using 0.10.0 client libs since we haven't upgraded our brokers yet). If I stream a compacted source topic as a changelog stream using KStreamBuilder#table, with an application id that doesn't have any consumer metadata yet, then the state store does not get initialized as I would expect. This results in failed joins in our application (as [~mjsax] also noted above). I have been able to get around this issue by creating a dummy consumer before starting the stream, and checking for null meta data using a ConsumerRebalanceListener. If the meta data is null, I do a single poll on the source topic and then exit. I added an example of this method to a public gist, see here: https://gist.github.com/mitch-seymour/3427ecd0a577cf26b67021e964d6be6c I am not entirely sure if that approach is safe yet (please feel free to share any feedback or concerns you may have). It seems to be working, but occasionally I get the following error on start up. Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members. The error goes away after a restart, so I'm not sure what causes it. Anyways, I am happy to help with this issue if needed. > Allow KTable bootstrap > ---------------------- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)