Hi Aljoscha,
So basically, I am reading events from a kafka topic. These events have corresponding eventIds and a list of modes. *Job1* 1. When I first read from the kafka topic, I key by the eventId's and use a processfuntion to create a state named "events". Also, the list of modes are used to generate keys along with some other values from the original event. So in the end we store (list of modekeys, event, timestamp) inside our state. For ex: For eventId 1 -> ([key1,key2,key3], event1, timestamp) This function also traverses over the list of generated keys and a series of values are collected which is then sent to the downstream operator. For ex: (key1, event1) (key2, event2) (key3, event3) 2. The values which were collected above is then used for this second step, where I key by the keys sent by the upstream operator. And then in another process function, I create a second state called "matchkeys". Here, we access old value for the key and add the new value to the list (if available or create a new list) and update the state. For ex. Fetch key1 -> ([eventid1, eventid5], timestamp) Add value of current eventId21 for the key key1 Update the state for key1 -> ([eventid1, eventid5, eventid21], newTimestamp) >From here we collect (event) and send to downstream operator This is how the 2 states are created and managed. Now, the event sent from upstream is sent over a kafka topic from where our Job 2 starts reading. *Job2* It reads the events from the kafka topic where the events were sinked form Job1 (We could've read directly from the original topic, but we need synchronization because otherwise we wouldn't find the events in the states) 1. So after reading from the kafka topic, the list of keys are generated from the modes again. But this time while traversing over the keys, the state "matchkeys" is accessed to get a list of eventId's, which in turn would be used to access the "events" state for a list of keys. This recursion would happen only till a depth of 1, after which all the fetched events would be consolidated and merged(as they refer to the same txn) and this "super" event with values from all the related events is sent over a different topic. I am not really sure how clear was I in explaining the situation, but let me know if you need any further information. Also, I am curious to know why using QueryableStateClient within the same job which creates the state lead to inconsistencies. Isn't the state client only capable of reading data without modifying it? Thanks for all help, btw :) Best Regards, Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14780.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.