Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Upesh Desai
Yeah, work around we’ve come up with is to use the Processor.init() method. We 
confirmed that this is only called once all state stores for used by that 
processor have been fully restored. From there, we iterate over the entire 
store and (re)build our tracking map.

It seems to work well and is performant enough thus far! Thanks for your help.

Cheers,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Matthias J. Sax 
Date: Monday, January 23, 2023 at 8:05 PM
To: users@kafka.apache.org 
Subject: Re: Custom Kafka Streams State Restore Logic
Thanks.

I agree. Seems your options are limited. The API is not really a good
fix for what you want to do... Sorry.

-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:
> Hi Matthias, thanks for your reply! Sure, so the use case is as follows.
>
> We currently store some time series data in the state store, and it is
> stored to a changelog as well. The time series data is bucketed (5
> minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2
> time buckets in the store at once. As we receive new timeseries data, we
> figure out what time bucket it belongs to, and add it to its respective
> bucket. We have a “grace period” which allows for late arriving data to
> be processed even after a time bucket has ended. That’s the reason why
> we have this constraint of 2 time buckets max within the store; 1 for
> the previous bucket in its grace period, 1 for the current bucket.
>
> So we wanted to extend the base state store and add a simple in-memory
> map to track the 2 time buckets per timeseries (that’s the store key). A
> couple reasons why we don’t want to add this as a separate state store
> or the existing store are:
> 1. There is a ton of serialization / deserialization that happens behind
> the scenes
>
> 2. This new time bucket tracking map would only be updated a couple
> times per time bucket, and does not need to be updated on every message
> read.
>
> 3. There’s no API on the included stores that allows us to do so
>
> Therefore, I thought it best to try to use the existing store
> functionality, create a “new state store” that really just instantiates
> one of the included stores within, add this in memory map, and then plug
> into/alter/extend the restore functionality to populate the time bucket
> tracking map during restore time.
>
> It sounds like I will either have to 1) create a custom state store from
> scratch, or 2) see if there is a post-restore hook that can then call a
> method to scan the whole store and build up the time bucket map before
> starting to process.
>
> Any advice on Kafka streams / state store logic would be appreciated!
>
> -Upesh
>
> Upesh Desai​   |   Senior Software Developer|   
> *ude...@itrsgroup.com*
> <mailto:ude...@itrsgroup.com>
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
>
> <https://www.itrsgroup.com/>
>
>
>
> *From: *Matthias J. Sax 
> *Date: *Wednesday, January 18, 2023 at 12:50 AM
> *To: *users@kafka.apache.org 
> *Subject: *Re: Custom Kafka Streams State Restore Logic
>
> Guess it depends what you actually want to achieve?
>
> Also note: `InMemoryWindowStore` is an internal class, and thus might
> change at any point, and it was never designed to be extended...
>
>
> -Matthias
>
> On 1/13/23 2:55 PM, Upesh Desai wrote:
>> Hello all,
>>
>> I am currently working on creating a new InMemoryWindowStore, by
>> extending the default in memory window store. One of the roadblocks I’ve
>> run into is finding a way to add some custom logic when the state store
>> is being restored from the changelog. I know that this is possible if I
>> completely write the store logic from scratch, but we really only want
>> to add a tiny bit of custom logic, and do not want to have to replicate
>> all the existing logic.
>>
>> Is there a simple way for this to be done? I see the default
>> implementation in the InMemoryWindowStore :
>>
>> context.register(
>>  root,
>>  (RecordBatchingStateRestoreCallback) records -> {
>>  for (final ConsumerRecord record : records) {
>>  put(
>>  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
>>  record.value(),
>> /extractStoreTimestamp/(record.key())
>>  );
>>
>> ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
>>  record,
>>  consistencyEnabled,
>>  position
>>  );
>>  }
>>  }
>> );
>>
>> Thanks in advance!
>>
>> Upesh
>>
>&

Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Matthias J. Sax

Thanks.

I agree. Seems your options are limited. The API is not really a good 
fix for what you want to do... Sorry.


-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:

Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is 
stored to a changelog as well. The time series data is bucketed (5 
minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2 
time buckets in the store at once. As we receive new timeseries data, we 
figure out what time bucket it belongs to, and add it to its respective 
bucket. We have a “grace period” which allows for late arriving data to 
be processed even after a time bucket has ended. That’s the reason why 
we have this constraint of 2 time buckets max within the store; 1 for 
the previous bucket in its grace period, 1 for the current bucket.


So we wanted to extend the base state store and add a simple in-memory 
map to track the 2 time buckets per timeseries (that’s the store key). A 
couple reasons why we don’t want to add this as a separate state store 
or the existing store are:
1. There is a ton of serialization / deserialization that happens behind 
the scenes


2. This new time bucket tracking map would only be updated a couple 
times per time bucket, and does not need to be updated on every message 
read.


3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store 
functionality, create a “new state store” that really just instantiates 
one of the included stores within, add this in memory map, and then plug 
into/alter/extend the restore functionality to populate the time bucket 
tracking map during restore time.


It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a 
method to scan the whole store and build up the time bucket map before 
starting to process.


Any advice on Kafka streams / state store logic would be appreciated!

-Upesh

Upesh Desai​	 | 	Senior Software Developer	 | 	*ude...@itrsgroup.com* 
<mailto:ude...@itrsgroup.com>


*www.itrsgroup.com* <https://www.itrsgroup.com/>  


<https://www.itrsgroup.com/>



*From: *Matthias J. Sax 
*Date: *Wednesday, January 18, 2023 at 12:50 AM
*To: *users@kafka.apache.org 
*Subject: *Re: Custom Kafka Streams State Restore Logic

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
      root,
      (RecordBatchingStateRestoreCallback) records -> {
      for (final ConsumerRecord record : records) {
      put(
      Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
      record.value(),
/extractStoreTimestamp/(record.key())
      );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

      record,
      consistencyEnabled,
      position
      );
      }
      }
);

Thanks in advance!

Upesh

<https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

   
Upesh Desai​

Senior Software Developer

*ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com 
<mailto:ude...@itrsgroup.com>>
*www.itrsgroup.com* <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]





Re: Custom Kafka Streams State Restore Logic

2023-01-18 Thread Upesh Desai
Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is stored 
to a changelog as well. The time series data is bucketed (5 minutes, 1 hour, 
and 1 day). Our goal was to always only have a max of 2 time buckets in the 
store at once. As we receive new timeseries data, we figure out what time 
bucket it belongs to, and add it to its respective bucket. We have a “grace 
period” which allows for late arriving data to be processed even after a time 
bucket has ended. That’s the reason why we have this constraint of 2 time 
buckets max within the store; 1 for the previous bucket in its grace period, 1 
for the current bucket.

So we wanted to extend the base state store and add a simple in-memory map to 
track the 2 time buckets per timeseries (that’s the store key). A couple 
reasons why we don’t want to add this as a separate state store or the existing 
store are:
1. There is a ton of serialization / deserialization that happens behind the 
scenes
2. This new time bucket tracking map would only be updated a couple times per 
time bucket, and does not need to be updated on every message read.
3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store functionality, 
create a “new state store” that really just instantiates one of the included 
stores within, add this in memory map, and then plug into/alter/extend the 
restore functionality to populate the time bucket tracking map during restore 
time.

It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a method 
to scan the whole store and build up the time bucket map before starting to 
process.

Any advice on Kafka streams / state store logic would be appreciated!
-Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Matthias J. Sax 
Date: Wednesday, January 18, 2023 at 12:50 AM
To: users@kafka.apache.org 
Subject: Re: Custom Kafka Streams State Restore Logic
Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:
> Hello all,
>
> I am currently working on creating a new InMemoryWindowStore, by
> extending the default in memory window store. One of the roadblocks I’ve
> run into is finding a way to add some custom logic when the state store
> is being restored from the changelog. I know that this is possible if I
> completely write the store logic from scratch, but we really only want
> to add a tiny bit of custom logic, and do not want to have to replicate
> all the existing logic.
>
> Is there a simple way for this to be done? I see the default
> implementation in the InMemoryWindowStore :
>
> context.register(
>  root,
>  (RecordBatchingStateRestoreCallback) records -> {
>  for (final ConsumerRecord record : records) {
>  put(
>  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
>  record.value(),
> /extractStoreTimestamp/(record.key())
>  );
>
> ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
>  record,
>  consistencyEnabled,
>  position
>  );
>  }
>  }
> );
>
> Thanks in advance!
>
> Upesh
>
> <https://www.itrsgroup.com/>
>
>
> Upesh Desai​
> Senior Software Developer
>
> *ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any
> view or opinions presented are solely those of the author and do not
> necessarily represent those of the ITRS Group unless otherwise
> specifically stated.
>
> [itrs.email.signature]
>


Re: Custom Kafka Streams State Restore Logic

2023-01-17 Thread Matthias J. Sax

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might 
change at any point, and it was never designed to be extended...



-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
     root,
     (RecordBatchingStateRestoreCallback) records -> {
     for (final ConsumerRecord record : records) {
     put(
     Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
     record.value(),
/extractStoreTimestamp/(record.key())
     );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

     record,
     consistencyEnabled,
     position
     );
     }
     }
);

Thanks in advance!

Upesh




Upesh Desai​
Senior Software Developer

*ude...@itrsgroup.com* 
*www.itrsgroup.com* 

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]