There is no hook. Only a restore listener, but this one is only used
during startup when the global store is loaded. It's not sure during
regular processing.

Depending on your usage, maybe you can switch to a global store instead
of GlobalKTable? That way, you can implement a custom `Processor` and
add a hook manually?

I don't see anything wrong with your setup. Unclear if/why the global
store would require a lot of memory...


-Matthias

On 5/27/20 7:41 AM, Pushkar Deole wrote:
> Matthias,
> I tried with default store as well but getting same error, can you please
> check if I am initializing the global store in the right way:
> 
> public void setupGlobalCacheTables(String theKafkaServers) {
>     Properties props = new Properties();
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>     StreamsBuilder streamsBuilder = new StreamsBuilder();
>     groupCacheTable =
>     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>         Materialized.as(GROUP_CACHE_STORE_NAME));
>     Topology groupCacheTopology = streamsBuilder.build();
>      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>       kafkaStreams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> LOG.info("Stopping the stream");
> kafkaStreams.close();
> }));
> }
> 
> On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> wrote:
> 
>> Hi Matthias,
>>
>> By the way, I used the in-memory global store and the service is giving
>> out of memory error during startup. Unfortunately i don't have a stack
>> trace now but when i got stack the first time, the error was coming
>> somewhere from memorypool.allocate or similar kind of method. If i get the
>> stack trace again, I will share that with you.
>> However, the topic from where the store is reading from is empty so I am
>> not sure why the global k table is trying to occupy a lot of space. The POD
>> memory request and limits are 500 MiB and 750 MiB respectively so the state
>> store should fit into this memory I believe since topic is empty. Can you
>> provide inputs on this.
>>
>>
>> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com>
>> wrote:
>>
>>> Ok... got it... is there any hook that I can attach to the global k table
>>> or global store? What I mean here is I want to know when the global store
>>> is updated with data from topic in that case the hook that I specified
>>> should be invoked so i can do some activity like logging that, this will
>>> allow me to know how long the global store took to sync up with topic after
>>> the event has been put on the topic.
>>>
>>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>
>>>> For example it could be some "static" information, like a mapping from
>>>> zip code to city name.
>>>>
>>>> Something that does usually not change over time.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> I am wondering what you mean by "Global store hold "axially" data that
>>>> is
>>>>> provided from "outside" of the
>>>>> app"
>>>>>
>>>>> will you be able to give some example use case here as to what you
>>>> mean by
>>>>> axially data provided from outside app?
>>>>>
>>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Both stores sever a different purpose.
>>>>>>
>>>>>> Regular stores allow you to store state the application computes.
>>>>>> Writing into the changelog is a fault-tolerance mechanism.
>>>>>>
>>>>>> Global store hold "axially" data that is provided from "outside" of
>>>> the
>>>>>> app. There is no changelog topic, but only the input topic (that is
>>>> used
>>>>>> to re-create the global state).
>>>>>>
>>>>>> Local stores are sharded and updates are "sync" as they don't need to
>>>> be
>>>>>> shared with anybody else.
>>>>>>
>>>>>> For global stores, as all instances need to be updated, updates are
>>>>>> async (we don't know when which instance will update it's own global
>>>>>> store replica).
>>>>>>
>>>>>>>> Say one stream thread updates the topic for global store and starts
>>>>>>>> processing next event wherein the processor tries to read the global
>>>>>> store
>>>>>>>> which may not have been synced with the topic?
>>>>>>
>>>>>> Correct. There is no guarantee when the update to the global store
>>>> will
>>>>>> be applied. As said, global stores are not designed to hold data the
>>>>>> application computes.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>>>>>> thanks... will try with GlobalKTable.
>>>>>>> As a side question, I didn't really understand the significance of
>>>> global
>>>>>>> state store which kind of works in a reverse way to local state store
>>>>>> i.e.
>>>>>>> local state store is updated and then saved to changelog topic
>>>> whereas in
>>>>>>> case of global state store the topic is updated first and then
>>>> synced to
>>>>>>> global state store. Do these two work in sync i.e. the update to
>>>> topic
>>>>>> and
>>>>>>> global state store ?
>>>>>>>
>>>>>>> Say one stream thread updates the topic for global store and starts
>>>>>>> processing next event wherein the processor tries to read the global
>>>>>> store
>>>>>>> which may not have been synced with the topic?
>>>>>>>
>>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>>
>>>>>>>> Yes.
>>>>>>>>
>>>>>>>> A `GlobalKTable` uses a global store internally.
>>>>>>>>
>>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>>>>>>>> Thanks Matthias.
>>>>>>>>> Can you elaborate on the replicated caching layer part?
>>>>>>>>> When you say global stores, do you mean GlobalKTable created from a
>>>>>> topic
>>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>>>>>>>>
>>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org
>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It's not possible to modify state store from "outside".
>>>>>>>>>>
>>>>>>>>>> If you want to build a "replicated caching layer", you could use
>>>>>> global
>>>>>>>>>> stores and write into the corresponding topics to update all
>>>> stores.
>>>>>> Of
>>>>>>>>>> course, those updates would be async.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>>>> state
>>>>>>>> stores
>>>>>>>>>>> as a general replicated cache among multiple instances of service
>>>>>>>>>> instances
>>>>>>>>>>> however the state store is created through streambuilder but is
>>>> not
>>>>>>>>>>> actually modified through stream processor topology however it
>>>> is to
>>>>>> be
>>>>>>>>>>> modified from outside the stream topology. So, essentially, the
>>>> state
>>>>>>>>>> store
>>>>>>>>>>> is just to be created from streambuilder and then to be used as
>>>> an
>>>>>>>>>>> application level cache that will get replicated between
>>>> application
>>>>>>>>>>> instances. Is this possible using state stores?
>>>>>>>>>>>
>>>>>>>>>>> Secondly, if possible, is this a good design approach?
>>>>>>>>>>>
>>>>>>>>>>> Appreciate your response since I don't know the internals of
>>>> state
>>>>>>>>>> stores.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to