Aljoscha's approach is probably better, but to answer your questions... >How do you send a request from one Flink job to another? All of our different flink jobs communicate over kafka. So the main flink job would be listening to both a "live" kafka source, and a "historical" kafka source. The historical flink job would listen to a "request" kafka source. When the main job gets an event that it does not have state for it writes to the "request" topic. The historical job would read the request, grab the relevant old events from GCS, and write them to the "historical" kafka topic. The "historical" source and the "live" source are merged and proceed through the main flink job as one stream.
>How do you handle the switchover between the live stream and the historical stream? Do you somehow block the live stream source and detect when the historical data source is no longer emitting new elements? When the main job sends a request to the historical job, the main job starts storing any events that are come in for that key. As the historical events come in they are processed immediately. The historical flink job flags the last event it sends. When the main flink job sees the flagged event it knows it is caught up to where it was when it sent the request. You can then process the events that the main job stored, and when that is done you are caught up to the live stream, and can stop storing events for that key and just process them as normal. Keep in mind that this is the dangerous part that I was talking about, where memory in the main job would continue to build until the "historical" events are all processed. >In my case I would want the Flink state to always contain the latest state of every item (except when the job first starts and there's a period of time where it's rebuilding its state from the Kafka log). You could absolutely do it by reading from the beginning of a kafka topic. The reason we do it with GCS is it is really cheap storage, and we are not planning on storing forever on the kafka topic. >Since I would have everything needed to rebuild the state persisted in a Kafka topic, I don't think I would need a second Flink job for this? The reason for the second flink job in our case is that we didn't really want to block the flink task slot while a single key gets caught up. We have a much larger key domain then we have number of task slots, so there would be multiple keys on single task slot. If you go with the single job approach (which might be the right approach for you guys) any other keys on that task slot will be blocked until the one key is getting it's state built up. Hope that helps, On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote: > Hi Jason, > > Thanks for the reply - I didn't quite understand all of it though! > > > it sends a request to the historical flink job for the old data > How do you send a request from one Flink job to another? > > > It continues storing the live events until all the events form the > historical job have been processed, then it processes the stored events, > and finally starts processing the live stream again. > How do you handle the switchover between the live stream and the > historical stream? Do you somehow block the live stream source and detect > when the historical data source is no longer emitting new elements? > > > So in you case it looks like what you could do is send a request to the > "historical" job whenever you get a item that you don't yet have the > current state of. > In my case I would want the Flink state to always contain the latest state > of every item (except when the job first starts and there's a period of > time where it's rebuilding its state from the Kafka log). Since I would > have everything needed to rebuild the state persisted in a Kafka topic, I > don't think I would need a second Flink job for this? > > Thanks, > Josh > > > > > On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc....@gmail.com> > wrote: > >> Hey Josh, >> >> The way we replay historical data is we have a second Flink job that >> listens to the same live stream, and stores every single event in Google >> Cloud Storage. >> >> When the main Flink job that is processing the live stream gets a request >> for a specific data set that it has not been processing yet, it sends a >> request to the historical flink job for the old data. The live job then >> starts storing relevant events from the live stream in state. It continues >> storing the live events until all the events form the historical job have >> been processed, then it processes the stored events, and finally starts >> processing the live stream again. >> >> As long as it's properly keyed (we key on the specific data set) then it >> doesn't block anything, keeps everything ordered, and eventually catches >> up. It also allows us to completely blow away state and rebuild it from >> scratch. >> >> So in you case it looks like what you could do is send a request to the >> "historical" job whenever you get a item that you don't yet have the >> current state of. >> >> The potential problems you may have are that it may not be possible to >> store every single historical event, and that you need to make sure there >> is enough memory to handle the ever increasing state size while the >> historical events are being replayed (and make sure to clear the state when >> it is done). >> >> It's a little complicated, and pretty expensive, but it works. Let me >> know if something doesn't make sense. >> >> >> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote: >> >>> Hi all, >>> >>> I was wondering what approaches people usually take with reprocessing >>> data with Flink - specifically the case where you want to upgrade a Flink >>> job, and make it reprocess historical data before continuing to process a >>> live stream. >>> >>> I'm wondering if we can do something similar to the 'simple rewind' or >>> 'parallel rewind' which Samza uses to solve this problem, discussed here: >>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html >>> >>> Having used Flink over the past couple of months, the main issue I've >>> had involves Flink's internal state - from my experience it seems it is >>> easy to break the state when upgrading a job, or when changing the >>> parallelism of operators, plus there's no easy way to view/access an >>> internal key-value state from outside Flink. >>> >>> For an example of what I mean, consider a Flink job which consumes a >>> stream of 'updates' to items, and maintains a key-value store of items >>> within Flink's internal state (e.g. in RocksDB). The job also writes the >>> updated items to a Kafka topic: >>> >>> http://oi64.tinypic.com/34q5opf.jpg >>> >>> My worry with this is that the state in RocksDB could be lost or become >>> incompatible with an updated version of the job. If this happens, we need >>> to be able to rebuild Flink's internal key-value store in RocksDB. So I'd >>> like to be able to do something like this (which I believe is the Samza >>> solution): >>> >>> http://oi67.tinypic.com/219ri95.jpg >>> >>> Has anyone done something like this already with Flink? If so are there >>> any examples of how to do this replay & switchover (rebuild state by >>> consuming from a historical log, then switch over to processing the live >>> stream)? >>> >>> Thanks for any insights, >>> Josh >>> >>> >> >> >> -- >> *Jason Brelloch* | Product Developer >> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >> <http://www.bettercloud.com/> >> Subscribe to the BetterCloud Monitor >> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >> - >> Get IT delivered to your inbox >> > > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 <http://www.bettercloud.com/> Subscribe to the BetterCloud Monitor <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - Get IT delivered to your inbox