Thanks.

I agree. Seems your options are limited. The API is not really a good fix for what you want to do... Sorry.

-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:
Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is stored to a changelog as well. The time series data is bucketed (5 minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2 time buckets in the store at once. As we receive new timeseries data, we figure out what time bucket it belongs to, and add it to its respective bucket. We have a “grace period” which allows for late arriving data to be processed even after a time bucket has ended. That’s the reason why we have this constraint of 2 time buckets max within the store; 1 for the previous bucket in its grace period, 1 for the current bucket.

So we wanted to extend the base state store and add a simple in-memory map to track the 2 time buckets per timeseries (that’s the store key). A couple reasons why we don’t want to add this as a separate state store or the existing store are: 1. There is a ton of serialization / deserialization that happens behind the scenes

2. This new time bucket tracking map would only be updated a couple times per time bucket, and does not need to be updated on every message read.

3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store functionality, create a “new state store” that really just instantiates one of the included stores within, add this in memory map, and then plug into/alter/extend the restore functionality to populate the time bucket tracking map during restore time.

It sounds like I will either have to 1) create a custom state store from scratch, or 2) see if there is a post-restore hook that can then call a method to scan the whole store and build up the time bucket map before starting to process.

Any advice on Kafka streams / state store logic would be appreciated!

-Upesh

Upesh Desai​  | Senior Software Developer  | *ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com>

*www.itrsgroup.com* <https://www.itrsgroup.com/>  
        

<https://www.itrsgroup.com/>

        

*From: *Matthias J. Sax <mj...@apache.org>
*Date: *Wednesday, January 18, 2023 at 12:50 AM
*To: *users@kafka.apache.org <users@kafka.apache.org>
*Subject: *Re: Custom Kafka Streams State Restore Logic

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:
Hello all,

I am currently working on creating a new InMemoryWindowStore, by extending the default in memory window store. One of the roadblocks I’ve run into is finding a way to add some custom logic when the state store is being restored from the changelog. I know that this is possible if I completely write the store logic from scratch, but we really only want to add a tiny bit of custom logic, and do not want to have to replicate all the existing logic.

Is there a simple way for this to be done? I see the default implementation in the InMemoryWindowStore :

context.register(
      root,
      (RecordBatchingStateRestoreCallback) records -> {
          for (final ConsumerRecord<byte[], byte[]> record : records) {
              put(
                  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
                  record.value(),
/extractStoreTimestamp/(record.key())
              );
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
                  record,
                  consistencyEnabled,
                  position
              );
          }
      }
);

Thanks in advance!

Upesh

<https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

Upesh Desai​
Senior Software Developer

*ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com 
<mailto:ude...@itrsgroup.com>>
*www.itrsgroup.com* <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

Internet communications are not secure and therefore the ITRS Group does not accept legal responsibility for the contents of this message. Any view or opinions presented are solely those of the author and do not necessarily represent those of the ITRS Group unless otherwise specifically stated.

[itrs.email.signature]


Reply via email to