Matthias, I realized that the exception and actual problem is totally different. The problem was the client was not set with SSL truststore while server is SSLenabled. I also found this open bug on kafka https://issues.apache.org/jira/browse/KAFKA-4493 After setting the SSL properties on stream, I am able to get it up and running.
Due to above problem, it is very difficult to debug the issue and above bug can be fixed as soon as possible, or a proper exception should be thrown. On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pdeole2...@gmail.com> wrote: > Thanks... i will try increasing the memory in case you don't spot anything > wrong with the code. Other service also have streams and global k table but > they use spring-kafka, but i think that should not matter, and it should > work with normal kafka-streams code unless i am missing some > configuration/setting here > > On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote: > >> There is no hook. Only a restore listener, but this one is only used >> during startup when the global store is loaded. It's not sure during >> regular processing. >> >> Depending on your usage, maybe you can switch to a global store instead >> of GlobalKTable? That way, you can implement a custom `Processor` and >> add a hook manually? >> >> I don't see anything wrong with your setup. Unclear if/why the global >> store would require a lot of memory... >> >> >> -Matthias >> >> On 5/27/20 7:41 AM, Pushkar Deole wrote: >> > Matthias, >> > I tried with default store as well but getting same error, can you >> please >> > check if I am initializing the global store in the right way: >> > >> > public void setupGlobalCacheTables(String theKafkaServers) { >> > Properties props = new Properties(); >> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, >> DEFAULT_APPLICATION_ID); >> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers); >> > StreamsBuilder streamsBuilder = new StreamsBuilder(); >> > groupCacheTable = >> > streamsBuilder.globalTable(GROUP_CACHE_TOPIC, >> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()), >> > Materialized.as(GROUP_CACHE_STORE_NAME)); >> > Topology groupCacheTopology = streamsBuilder.build(); >> > kafkaStreams = new KafkaStreams(groupCacheTopology, props); >> > kafkaStreams.start(); >> > >> > Runtime.getRuntime().addShutdownHook(new Thread(() -> { >> > LOG.info("Stopping the stream"); >> > kafkaStreams.close(); >> > })); >> > } >> > >> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> >> wrote: >> > >> >> Hi Matthias, >> >> >> >> By the way, I used the in-memory global store and the service is giving >> >> out of memory error during startup. Unfortunately i don't have a stack >> >> trace now but when i got stack the first time, the error was coming >> >> somewhere from memorypool.allocate or similar kind of method. If i get >> the >> >> stack trace again, I will share that with you. >> >> However, the topic from where the store is reading from is empty so I >> am >> >> not sure why the global k table is trying to occupy a lot of space. >> The POD >> >> memory request and limits are 500 MiB and 750 MiB respectively so the >> state >> >> store should fit into this memory I believe since topic is empty. Can >> you >> >> provide inputs on this. >> >> >> >> >> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com> >> >> wrote: >> >> >> >>> Ok... got it... is there any hook that I can attach to the global k >> table >> >>> or global store? What I mean here is I want to know when the global >> store >> >>> is updated with data from topic in that case the hook that I specified >> >>> should be invoked so i can do some activity like logging that, this >> will >> >>> allow me to know how long the global store took to sync up with topic >> after >> >>> the event has been put on the topic. >> >>> >> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> >> >>> wrote: >> >>> >> >>>> For example it could be some "static" information, like a mapping >> from >> >>>> zip code to city name. >> >>>> >> >>>> Something that does usually not change over time. >> >>>> >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote: >> >>>>> Matthias, >> >>>>> >> >>>>> I am wondering what you mean by "Global store hold "axially" data >> that >> >>>> is >> >>>>> provided from "outside" of the >> >>>>> app" >> >>>>> >> >>>>> will you be able to give some example use case here as to what you >> >>>> mean by >> >>>>> axially data provided from outside app? >> >>>>> >> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> >> >>>> wrote: >> >>>>> >> >>>>>> Both stores sever a different purpose. >> >>>>>> >> >>>>>> Regular stores allow you to store state the application computes. >> >>>>>> Writing into the changelog is a fault-tolerance mechanism. >> >>>>>> >> >>>>>> Global store hold "axially" data that is provided from "outside" of >> >>>> the >> >>>>>> app. There is no changelog topic, but only the input topic (that is >> >>>> used >> >>>>>> to re-create the global state). >> >>>>>> >> >>>>>> Local stores are sharded and updates are "sync" as they don't need >> to >> >>>> be >> >>>>>> shared with anybody else. >> >>>>>> >> >>>>>> For global stores, as all instances need to be updated, updates are >> >>>>>> async (we don't know when which instance will update it's own >> global >> >>>>>> store replica). >> >>>>>> >> >>>>>>>> Say one stream thread updates the topic for global store and >> starts >> >>>>>>>> processing next event wherein the processor tries to read the >> global >> >>>>>> store >> >>>>>>>> which may not have been synced with the topic? >> >>>>>> >> >>>>>> Correct. There is no guarantee when the update to the global store >> >>>> will >> >>>>>> be applied. As said, global stores are not designed to hold data >> the >> >>>>>> application computes. >> >>>>>> >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> >> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote: >> >>>>>>> thanks... will try with GlobalKTable. >> >>>>>>> As a side question, I didn't really understand the significance of >> >>>> global >> >>>>>>> state store which kind of works in a reverse way to local state >> store >> >>>>>> i.e. >> >>>>>>> local state store is updated and then saved to changelog topic >> >>>> whereas in >> >>>>>>> case of global state store the topic is updated first and then >> >>>> synced to >> >>>>>>> global state store. Do these two work in sync i.e. the update to >> >>>> topic >> >>>>>> and >> >>>>>>> global state store ? >> >>>>>>> >> >>>>>>> Say one stream thread updates the topic for global store and >> starts >> >>>>>>> processing next event wherein the processor tries to read the >> global >> >>>>>> store >> >>>>>>> which may not have been synced with the topic? >> >>>>>>> >> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> >> >>>> wrote: >> >>>>>>> >> >>>>>>>> Yes. >> >>>>>>>> >> >>>>>>>> A `GlobalKTable` uses a global store internally. >> >>>>>>>> >> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or >> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually". >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote: >> >>>>>>>>> Thanks Matthias. >> >>>>>>>>> Can you elaborate on the replicated caching layer part? >> >>>>>>>>> When you say global stores, do you mean GlobalKTable created >> from a >> >>>>>> topic >> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ? >> >>>>>>>>> >> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax < >> mj...@apache.org >> >>>>> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> It's not possible to modify state store from "outside". >> >>>>>>>>>> >> >>>>>>>>>> If you want to build a "replicated caching layer", you could >> use >> >>>>>> global >> >>>>>>>>>> stores and write into the corresponding topics to update all >> >>>> stores. >> >>>>>> Of >> >>>>>>>>>> course, those updates would be async. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> -Matthias >> >>>>>>>>>> >> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote: >> >>>>>>>>>>> Hi All, >> >>>>>>>>>>> >> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use >> >>>> state >> >>>>>>>> stores >> >>>>>>>>>>> as a general replicated cache among multiple instances of >> service >> >>>>>>>>>> instances >> >>>>>>>>>>> however the state store is created through streambuilder but >> is >> >>>> not >> >>>>>>>>>>> actually modified through stream processor topology however it >> >>>> is to >> >>>>>> be >> >>>>>>>>>>> modified from outside the stream topology. So, essentially, >> the >> >>>> state >> >>>>>>>>>> store >> >>>>>>>>>>> is just to be created from streambuilder and then to be used >> as >> >>>> an >> >>>>>>>>>>> application level cache that will get replicated between >> >>>> application >> >>>>>>>>>>> instances. Is this possible using state stores? >> >>>>>>>>>>> >> >>>>>>>>>>> Secondly, if possible, is this a good design approach? >> >>>>>>>>>>> >> >>>>>>>>>>> Appreciate your response since I don't know the internals of >> >>>> state >> >>>>>>>>>> stores. >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>>> >> > >> >>