No problem, it was an interesting little bughunt. I'll create a JIRA and PR this afternoon/evening.
Thanks Alex On Wed, Nov 8, 2017 at 3:05 PM Matthias J. Sax <matth...@confluent.io> wrote: > Cool. Thanks for cycling back. What you describe verifies my suspicion. > Your patch make sense. > > Do you want to create a JIRA and provide a PR to fix this? We could > include this into 0.11.0.2 that we plan to release soon (so if you want > to contribute the patch, please do it right away -- we want to cut the > first RC for 0.11.0.2 this Friday). If you don't want to do it, it's > also fine, and I (or someone else) can do it. Please let us know. > > For 0.11.0.1, the workaround would be to not use transactions when you > write the topic that you read as global state/KTable. > > Thanks a lot for reporting the issue and helping to nail it down! > > > -Matthias > > On 11/8/17 3:13 AM, Alex Good wrote: > > Previously deleting and recreating the topic has solved the problem. > > > > Based on what you've said about the offset correction I did a quick test > by > > building kafka streams myself with the following code in > > `GlobalStateManagerImpl#restoreState()` > > > > while (offset < highWatermark) { > > final ConsumerRecords<byte[], byte[]> records = > consumer.poll(100); > > final List<KeyValue<byte[], byte[]>> restoreRecords = new > > ArrayList<>(); > > for (ConsumerRecord<byte[], byte[]> record : records) { > > offset = record.offset() + 1; > > if (record.key() != null) { > > restoreRecords.add(KeyValue.pair(record.key(), > > record.value())); > > } > > } > > stateRestoreAdapter.restoreAll(restoreRecords); > > stateRestoreListener.onBatchRestored(topicPartition, storeName, > > offset, restoreRecords.size()); > > restoreCount += restoreRecords.size(); > > offset = consumer.position(topicPartition); > > } > > > > Note the recalculation of the offset using consumer position at the end > of > > the loop. That fixed the issue so may serve as further verification of > your > > hypothesis? > > > > In the meantime I suppose the workaround is to not produce transactional > > messages to topics backing a GlobalKTable? > > > > Thanks > > Alex > > > > > > On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Did you delete and recreate the topic of the GlobalStore? > >> > >> I did have a look into the code, too, and think there is a bug in > >> `GlobalStateManagerImpl#restoreState()`. I did some initial > >> investigation using an existing test, and the test passed without > >> transactional data but fails if the global store data is written using > >> transactions. > >> > >> Note: if transactions are available, commit markers will take "one spot" > >> in the partitions. Currently, we break the loop using consumer record > >> offset > >> > >>> offset = record.offset() + 1; > >> > >> but I think, if there is a commit marker, the highWatermark is one > >> offset larger and thus this loop never terminates. We would need to > >> update the offset using consumer position instead that should step over > >> the commit marker correctly. > >> > >> Will look into this in more detail tomorrow. Would still be valuable, if > >> you could verify my suspicion. > >> > >> Thanks! > >> > >> -Matthias > >> > >> On 11/7/17 7:01 PM, Alex Good wrote: > >>> Disabling transactions doesn't seem to have changed anything. I've had > a > >>> read through the kafka streams source code, specifically the parts > >> relating > >>> to the restoration of the global stores and I can't see anything > obvious > >> I > >>> should look at. > >>> > >>> @Ted will do, here's a pastebin of the most recent run > >>> https://pastebin.com/rw2WbFyt > >>> > >>> Thanks > >>> Alex > >>> > >>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote: > >>> > >>>> Alex: > >>>> In the future, please use pastebin if the log is not too large. > >>>> > >>>> When people find this thread in mailing list archive, the attachment > >>>> wouldn't be there. > >>>> > >>>> Thanks > >>>> > >>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Alex, > >>>>> > >>>>> I am not sure, but maybe it's a bug. I noticed that you read > >> transaction > >>>>> data. Can you try to write to the topic without using transactions > >>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It > only a > >>>>> guess that it might be related to transactions and it would be great > to > >>>>> verify or rule it out. > >>>>> > >>>>> Thanks a lot! > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 11/7/17 3:15 PM, Alex Good wrote: > >>>>>> Hi All, > >>>>>> > >>>>>> I have a simple kafka streams app that seems to hang when restoring > >>>>>> state for a GlobalKTable. We're running in a test environment at the > >>>>>> moment and the topic it is loading from only has two messages in > it, I > >>>>>> don't know if the very low volume of messages would affect the > >> restore? > >>>>>> > >>>>>> I've attached a log, the topic in question is invoices-state. As you > >>>> can > >>>>>> see the GlobalStreamThread appears to load the two messages in the > >>>> topic > >>>>>> and then continues to send read requests to the topic despite having > >>>>>> caught up. Any tips on debugging this would be very welcome. > >>>>>> > >>>>>> Thanks > >>>>>> Alex > >>>>> > >>>>> > >>>> > >>> > >> > >> > > > >