Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>    test 1 record had been added to the store. However the numRecords variable
>    is still set to 0
>    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>    as well. But it is confusing since the store is actually empty / that
>    record I added does not exist in the store when trying to check for it.
>    3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type <CustomKey,byte[]> where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> <ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wangg...@gmail.com>
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users <users@kafka.apache.org>
> *Cc: *Bart Lilje <bli...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com> wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >   .*keyValueStoreBuilder*(
> >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >     kSerde,
> >     vSerde)
> >   .withLoggingEnabled(Map.*of*(
> >     *RETENTION_MS_CONFIG*, "90000000"),
> >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >    1. Records are written to the state store, and subsequently produced
> >    to the changelog topic.
> >    2. Store streams application
> >    3. Delete state.dir directory
> >    4. Restart streams application
> >    5. Confirm state store is initialized empty with no records restored
> >    from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the application still does not “restore”
> from
> > the changelog, but records are still seen in the state store.
> >
> >
> >
> > When rolling back to 2.6, we do not see this issue.
> >
> >
> >
> > Doing some debugging in the source code, in the StoreChangelogReader
> class
> > I found that the number of records to restore is always 0 based on the
> > below snippet:
> >
> >
> >
> > private void restoreChangelog(final ChangelogMetadata changelogMetadata)
> {
> >     final ProcessorStateManager stateManager =
> changelogMetadata.stateManager;
> >     final StateStoreMetadata storeMetadata =
> changelogMetadata.storeMetadata;
> >     final TopicPartition partition = storeMetadata.changelogPartition();
> >     final String storeName = storeMetadata.store().name();
> >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >
> >
> >
> > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >
> >
> >
> > My question to you all is, 1) Is this expected behavior? 2) If not, is it
> > a bug?
> >
> >
> >
> > Hope to get some clarity, and thanks in advance!
> >
> >
> >
> > Best,
> > Upesh
> > <https://www.itrsgroup.com/>
> > Upesh Desai​
> > Senior Software Developer
> > *ude...@itrsgroup.com* <ude...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > Internet communications are not secure and therefore the ITRS Group does
> > not accept legal responsibility for the contents of this message. Any
> view
> > or opinions presented are solely those of the author and do not
> necessarily
> > represent those of the ITRS Group unless otherwise specifically stated.
> > [itrs.email.signature]
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Reply via email to