Hi Aljoscha,

So basically, I am reading events from a kafka topic. These events have
corresponding eventIds and a list of modes.

*Job1*
1. When I first read from the kafka topic, I key by the eventId's and use a
processfuntion to create a state named "events". Also, the list of modes are
used to generate keys along with some other values from the original event.
So in the end we store (list of modekeys, event, timestamp) inside our
state.

For ex: For eventId 1 -> ([key1,key2,key3], event1, timestamp)

This function also traverses over the list of generated keys and a series of
values are collected which is then sent to the downstream operator.
For ex:
(key1, event1)
(key2, event2)
(key3, event3)

2. The values which were collected above is then used for this second step,
where I key by the keys sent by the upstream operator. And then in another
process function, I create a second state called "matchkeys". Here, we
access old value for the key and add the new value to the list (if available
or create a new list) and update the state.

For ex. 
Fetch key1 -> ([eventid1, eventid5], timestamp)   
Add value of current eventId21 for the key key1
Update the state for key1 -> ([eventid1, eventid5, eventid21], newTimestamp)

>From here we collect (event) and send to downstream operator

This is how the 2 states are created and managed.

Now, the event sent from upstream is sent over a kafka topic from where our
Job 2 starts reading.

*Job2*
It reads the events from the kafka topic where the events were sinked form
Job1 (We could've read directly from the original topic, but we need
synchronization because otherwise we wouldn't find the events in the states)

1. So after reading from the kafka topic, the list of keys are generated
from the modes again. But this time while traversing over the keys, the
state "matchkeys" is accessed to get a list of eventId's, which in turn
would be used to access the "events" state for a list of keys. This
recursion would happen only till a depth of 1, after which all the fetched
events would be consolidated and merged(as they refer to the same txn) and
this "super" event with values from all the related events is sent over a
different topic.


I am not really sure how clear was I in explaining the situation, but let me
know if you need any further information.

Also, I am curious to know why using QueryableStateClient within the same
job which creates the state lead to inconsistencies. Isn't the state client
only capable of reading data without modifying it? 

Thanks for all help, btw :) 

Best Regards,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14780.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to