Hi Matthias, I have configured the GlobalKTable to stream from a topic and application is working fine, however during automated build test cases, sometimes I get an exception: I believe this could be because of race between actual topic creation and the service startup (since topic creation may not happen immediately and the API returns future). The question I have here: is there any property that I can set so the streams application would perform some retries to establish the global store before giving up with an exception?
","logger_name":"com.avaya.analytics.dsi.DsiMain","thread_name":"main","level":"DEBUG","level_value":10000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: There are no partitions available for topic analytics-group-cache when initializing global store group-cache-store\n\tat org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.topicPartitionsForStore(GlobalStateManagerImpl.java:265)\n\tat org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.register(GlobalStateManagerImpl.java:190)\n\tat org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:101)\n\tat org.apache.kafka.streams.state.internals.InMemoryKeyValueStore.init(InMemoryKeyValueStore.java:58)\n\tat org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder$InMemoryTimestampedKeyValueStoreMarker.init(TimestampedKeyValueStoreBuilder.java:100)\n\tat org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)\n\tat org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:112)\n\tat org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:136)\n\tat org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)\n\tat org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.initialize(GlobalStreamThread.java:229)\n\tat org.apache.kafka.streams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:345)\n\tat org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:270)\n"} On Thu, May 28, 2020 at 1:27 PM Pushkar Deole <pdeole2...@gmail.com> wrote: > Matthias, > > I realized that the exception and actual problem is totally different. The > problem was the client was not set with SSL truststore while server is > SSLenabled. > I also found this open bug on kafka > https://issues.apache.org/jira/browse/KAFKA-4493 > After setting the SSL properties on stream, I am able to get it up and > running. > > Due to above problem, it is very difficult to debug the issue and above > bug can be fixed as soon as possible, or a proper exception should be > thrown. > > On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pdeole2...@gmail.com> > wrote: > >> Thanks... i will try increasing the memory in case you don't spot >> anything wrong with the code. Other service also have streams and global k >> table but they use spring-kafka, but i think that should not matter, and it >> should work with normal kafka-streams code unless i am missing some >> configuration/setting here >> >> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> >>> There is no hook. Only a restore listener, but this one is only used >>> during startup when the global store is loaded. It's not sure during >>> regular processing. >>> >>> Depending on your usage, maybe you can switch to a global store instead >>> of GlobalKTable? That way, you can implement a custom `Processor` and >>> add a hook manually? >>> >>> I don't see anything wrong with your setup. Unclear if/why the global >>> store would require a lot of memory... >>> >>> >>> -Matthias >>> >>> On 5/27/20 7:41 AM, Pushkar Deole wrote: >>> > Matthias, >>> > I tried with default store as well but getting same error, can you >>> please >>> > check if I am initializing the global store in the right way: >>> > >>> > public void setupGlobalCacheTables(String theKafkaServers) { >>> > Properties props = new Properties(); >>> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, >>> DEFAULT_APPLICATION_ID); >>> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers); >>> > StreamsBuilder streamsBuilder = new StreamsBuilder(); >>> > groupCacheTable = >>> > streamsBuilder.globalTable(GROUP_CACHE_TOPIC, >>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()), >>> > Materialized.as(GROUP_CACHE_STORE_NAME)); >>> > Topology groupCacheTopology = streamsBuilder.build(); >>> > kafkaStreams = new KafkaStreams(groupCacheTopology, props); >>> > kafkaStreams.start(); >>> > >>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> { >>> > LOG.info("Stopping the stream"); >>> > kafkaStreams.close(); >>> > })); >>> > } >>> > >>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> >>> wrote: >>> > >>> >> Hi Matthias, >>> >> >>> >> By the way, I used the in-memory global store and the service is >>> giving >>> >> out of memory error during startup. Unfortunately i don't have a stack >>> >> trace now but when i got stack the first time, the error was coming >>> >> somewhere from memorypool.allocate or similar kind of method. If i >>> get the >>> >> stack trace again, I will share that with you. >>> >> However, the topic from where the store is reading from is empty so I >>> am >>> >> not sure why the global k table is trying to occupy a lot of space. >>> The POD >>> >> memory request and limits are 500 MiB and 750 MiB respectively so the >>> state >>> >> store should fit into this memory I believe since topic is empty. Can >>> you >>> >> provide inputs on this. >>> >> >>> >> >>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com> >>> >> wrote: >>> >> >>> >>> Ok... got it... is there any hook that I can attach to the global k >>> table >>> >>> or global store? What I mean here is I want to know when the global >>> store >>> >>> is updated with data from topic in that case the hook that I >>> specified >>> >>> should be invoked so i can do some activity like logging that, this >>> will >>> >>> allow me to know how long the global store took to sync up with >>> topic after >>> >>> the event has been put on the topic. >>> >>> >>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> >>> >>> wrote: >>> >>> >>> >>>> For example it could be some "static" information, like a mapping >>> from >>> >>>> zip code to city name. >>> >>>> >>> >>>> Something that does usually not change over time. >>> >>>> >>> >>>> >>> >>>> -Matthias >>> >>>> >>> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote: >>> >>>>> Matthias, >>> >>>>> >>> >>>>> I am wondering what you mean by "Global store hold "axially" data >>> that >>> >>>> is >>> >>>>> provided from "outside" of the >>> >>>>> app" >>> >>>>> >>> >>>>> will you be able to give some example use case here as to what you >>> >>>> mean by >>> >>>>> axially data provided from outside app? >>> >>>>> >>> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> >>> >>>> wrote: >>> >>>>> >>> >>>>>> Both stores sever a different purpose. >>> >>>>>> >>> >>>>>> Regular stores allow you to store state the application computes. >>> >>>>>> Writing into the changelog is a fault-tolerance mechanism. >>> >>>>>> >>> >>>>>> Global store hold "axially" data that is provided from "outside" >>> of >>> >>>> the >>> >>>>>> app. There is no changelog topic, but only the input topic (that >>> is >>> >>>> used >>> >>>>>> to re-create the global state). >>> >>>>>> >>> >>>>>> Local stores are sharded and updates are "sync" as they don't >>> need to >>> >>>> be >>> >>>>>> shared with anybody else. >>> >>>>>> >>> >>>>>> For global stores, as all instances need to be updated, updates >>> are >>> >>>>>> async (we don't know when which instance will update it's own >>> global >>> >>>>>> store replica). >>> >>>>>> >>> >>>>>>>> Say one stream thread updates the topic for global store and >>> starts >>> >>>>>>>> processing next event wherein the processor tries to read the >>> global >>> >>>>>> store >>> >>>>>>>> which may not have been synced with the topic? >>> >>>>>> >>> >>>>>> Correct. There is no guarantee when the update to the global store >>> >>>> will >>> >>>>>> be applied. As said, global stores are not designed to hold data >>> the >>> >>>>>> application computes. >>> >>>>>> >>> >>>>>> >>> >>>>>> -Matthias >>> >>>>>> >>> >>>>>> >>> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote: >>> >>>>>>> thanks... will try with GlobalKTable. >>> >>>>>>> As a side question, I didn't really understand the significance >>> of >>> >>>> global >>> >>>>>>> state store which kind of works in a reverse way to local state >>> store >>> >>>>>> i.e. >>> >>>>>>> local state store is updated and then saved to changelog topic >>> >>>> whereas in >>> >>>>>>> case of global state store the topic is updated first and then >>> >>>> synced to >>> >>>>>>> global state store. Do these two work in sync i.e. the update to >>> >>>> topic >>> >>>>>> and >>> >>>>>>> global state store ? >>> >>>>>>> >>> >>>>>>> Say one stream thread updates the topic for global store and >>> starts >>> >>>>>>> processing next event wherein the processor tries to read the >>> global >>> >>>>>> store >>> >>>>>>> which may not have been synced with the topic? >>> >>>>>>> >>> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org >>> > >>> >>>> wrote: >>> >>>>>>> >>> >>>>>>>> Yes. >>> >>>>>>>> >>> >>>>>>>> A `GlobalKTable` uses a global store internally. >>> >>>>>>>> >>> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or >>> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually". >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> -Matthias >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote: >>> >>>>>>>>> Thanks Matthias. >>> >>>>>>>>> Can you elaborate on the replicated caching layer part? >>> >>>>>>>>> When you say global stores, do you mean GlobalKTable created >>> from a >>> >>>>>> topic >>> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ? >>> >>>>>>>>> >>> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax < >>> mj...@apache.org >>> >>>>> >>> >>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> It's not possible to modify state store from "outside". >>> >>>>>>>>>> >>> >>>>>>>>>> If you want to build a "replicated caching layer", you could >>> use >>> >>>>>> global >>> >>>>>>>>>> stores and write into the corresponding topics to update all >>> >>>> stores. >>> >>>>>> Of >>> >>>>>>>>>> course, those updates would be async. >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> -Matthias >>> >>>>>>>>>> >>> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote: >>> >>>>>>>>>>> Hi All, >>> >>>>>>>>>>> >>> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use >>> >>>> state >>> >>>>>>>> stores >>> >>>>>>>>>>> as a general replicated cache among multiple instances of >>> service >>> >>>>>>>>>> instances >>> >>>>>>>>>>> however the state store is created through streambuilder but >>> is >>> >>>> not >>> >>>>>>>>>>> actually modified through stream processor topology however >>> it >>> >>>> is to >>> >>>>>> be >>> >>>>>>>>>>> modified from outside the stream topology. So, essentially, >>> the >>> >>>> state >>> >>>>>>>>>> store >>> >>>>>>>>>>> is just to be created from streambuilder and then to be used >>> as >>> >>>> an >>> >>>>>>>>>>> application level cache that will get replicated between >>> >>>> application >>> >>>>>>>>>>> instances. Is this possible using state stores? >>> >>>>>>>>>>> >>> >>>>>>>>>>> Secondly, if possible, is this a good design approach? >>> >>>>>>>>>>> >>> >>>>>>>>>>> Appreciate your response since I don't know the internals of >>> >>>> state >>> >>>>>>>>>> stores. >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>>> >>> > >>> >>>