No problem, it was an interesting little bughunt. I'll create a JIRA and PR
this afternoon/evening.

Thanks
Alex

On Wed, Nov 8, 2017 at 3:05 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Cool. Thanks for cycling back. What you describe verifies my suspicion.
> Your patch make sense.
>
> Do you want to create a JIRA and provide a PR to fix this? We could
> include this into 0.11.0.2 that we plan to release soon (so if you want
> to contribute the patch, please do it right away -- we want to cut the
> first RC for 0.11.0.2 this Friday). If you don't want to do it, it's
> also fine, and I (or someone else) can do it. Please let us know.
>
> For 0.11.0.1, the workaround would be to not use transactions when you
> write the topic that you read as global state/KTable.
>
> Thanks a lot for reporting the issue and helping to nail it down!
>
>
> -Matthias
>
> On 11/8/17 3:13 AM, Alex Good wrote:
> > Previously deleting and recreating the topic has solved the problem.
> >
> > Based on what you've said about the offset correction I did a quick test
> by
> > building kafka streams myself with the following code in
> > `GlobalStateManagerImpl#restoreState()`
> >
> >     while (offset < highWatermark) {
> >         final ConsumerRecords<byte[], byte[]> records =
> consumer.poll(100);
> >         final List<KeyValue<byte[], byte[]>> restoreRecords = new
> > ArrayList<>();
> >         for (ConsumerRecord<byte[], byte[]> record : records) {
> >             offset = record.offset() + 1;
> >             if (record.key() != null) {
> >                 restoreRecords.add(KeyValue.pair(record.key(),
> > record.value()));
> >             }
> >         }
> >         stateRestoreAdapter.restoreAll(restoreRecords);
> >         stateRestoreListener.onBatchRestored(topicPartition, storeName,
> > offset, restoreRecords.size());
> >         restoreCount += restoreRecords.size();
> >         offset = consumer.position(topicPartition);
> >     }
> >
> > Note the recalculation of the offset using consumer position at the end
> of
> > the loop. That fixed the issue so may serve as further verification of
> your
> > hypothesis?
> >
> > In the meantime I suppose the workaround is to not produce transactional
> > messages to topics backing a GlobalKTable?
> >
> > Thanks
> > Alex
> >
> >
> > On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Did you delete and recreate the topic of the GlobalStore?
> >>
> >> I did have a look into the code, too, and think there is a bug in
> >> `GlobalStateManagerImpl#restoreState()`. I did some initial
> >> investigation using an existing test, and the test passed without
> >> transactional data but fails if the global store data is written using
> >> transactions.
> >>
> >> Note: if transactions are available, commit markers will take "one spot"
> >> in the partitions. Currently, we break the loop using consumer record
> >> offset
> >>
> >>> offset = record.offset() + 1;
> >>
> >> but I think, if there is a commit marker, the highWatermark is one
> >> offset larger and thus this loop never terminates. We would need to
> >> update the offset using consumer position instead that should step over
> >> the commit marker correctly.
> >>
> >> Will look into this in more detail tomorrow. Would still be valuable, if
> >> you could verify my suspicion.
> >>
> >> Thanks!
> >>
> >> -Matthias
> >>
> >> On 11/7/17 7:01 PM, Alex Good wrote:
> >>> Disabling transactions doesn't seem to have changed anything. I've had
> a
> >>> read through the kafka streams source code, specifically the parts
> >> relating
> >>> to the restoration of the global stores and I can't see anything
> obvious
> >> I
> >>> should look at.
> >>>
> >>> @Ted will do, here's a pastebin of the most recent run
> >>> https://pastebin.com/rw2WbFyt
> >>>
> >>> Thanks
> >>> Alex
> >>>
> >>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote:
> >>>
> >>>> Alex:
> >>>> In the future, please use pastebin if the log is not too large.
> >>>>
> >>>> When people find this thread in mailing list archive, the attachment
> >>>> wouldn't be there.
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Alex,
> >>>>>
> >>>>> I am not sure, but maybe it's a bug. I noticed that you read
> >> transaction
> >>>>> data. Can you try to write to the topic without using transactions
> >>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It
> only a
> >>>>> guess that it might be related to transactions and it would be great
> to
> >>>>> verify or rule it out.
> >>>>>
> >>>>> Thanks a lot!
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 11/7/17 3:15 PM, Alex Good wrote:
> >>>>>> Hi All,
> >>>>>>
> >>>>>> I have a simple kafka streams app that seems to hang when restoring
> >>>>>> state for a GlobalKTable. We're running in a test environment at the
> >>>>>> moment and the topic it is loading from only has two messages in
> it, I
> >>>>>> don't know if the very low volume of messages would affect the
> >> restore?
> >>>>>>
> >>>>>> I've attached a log, the topic in question is invoices-state. As you
> >>>> can
> >>>>>> see the GlobalStreamThread appears to load the two messages in the
> >>>> topic
> >>>>>> and then continues to send read requests to the topic despite having
> >>>>>> caught up. Any tips on debugging this would be very welcome.
> >>>>>>
> >>>>>> Thanks
> >>>>>> Alex
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to