I think it might be possible to do but I’m not aware of anyone working on that and I haven’t seen anyone on the mailing lists express interest in working on that.
> On 16. Jun 2017, at 11:31, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Ok thanks for the clarification. Do you think it could be possible (sooner or > later) to have in Flink some sort of synchronization between jobs (as in this > case where the input datastream should be "paused" until the second job > finishes)? I know I coould use something like Oozie or Falcon to orchestrate > jobs but I'd prefer to avoid to add them to our architecture.. > > Best, > Flavio > > On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > > I’m afraid not. You would have to wait for one job to finish before starting > the next one. > > Best, > Aljoscha >> On 15. Jun 2017, at 20:11, Flavio Pompermaier <pomperma...@okkam.it >> <mailto:pomperma...@okkam.it>> wrote: >> >> Hi Aljoscha, >> we're still investigating possible solutions here. Yes, as you correctly >> said there are links between data of different keys so we can only proceed >> with the next job only once we are sure at 100% that all input data has been >> consumed and no other data will be read until this last jobs ends. >> There should be some sort of synchronization between these 2 jobs...is that >> possible right now in Flink? >> >> Thanks a lot for the support, >> Flavio >> >> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi, >> >> Trying to revive this somewhat older thread: have you made any progress? I >> think going with a ProcessFunction that keeps all your state internally and >> periodically outputs to, say, Elasticsearch using a sink seems like the way >> to go? You can do the periodic emission using timers in the ProcessFunction. >> >> In your use case, does the data you would store in the Flink managed state >> have links between data of different keys? This sounds like it could be a >> problem when it comes to consistency when outputting to an external system. >> >> Best, >> Aljoscha >> >>> On 17. May 2017, at 14:12, Flavio Pompermaier <pomperma...@okkam.it >>> <mailto:pomperma...@okkam.it>> wrote: >>> >>> Hi to all, >>> there are a lot of useful discussion points :) >>> >>> I'll try to answer to everybody. >>> >>> @Ankit: >>> right now we're using Parquet on HDFS to store thrift objects. Those >>> objects are essentially structured like >>> key >>> alternative_key >>> list of tuples (representing the state of my Object) >>> This model could be potentially modeled as a Monoid and it's very well >>> suited for a stateful streaming computation where updates to a single key >>> state are not as expansive as a call to any db to get the current list of >>> tuples and update back that list with for an update (IMHO). Maybe here I'm >>> overestimating Flink streaming capabilities... >>> serialization should be ok using thrift, but Flink advice to use tuples to >>> have better performance so just after reading the data from disk (as a >>> ThriftObject) we convert them to its equivalent representation as >>> Tuple3<String, String, List<Tuple4>> version >>> Since I currently use Flink to ingest data that (in the end) means adding >>> tuples to my objects, it would be perfect to have an "online" state of the >>> grouped tuples in order to: >>> add/remove tuples to my object very quickly >>> from time to time, scan the whole online data (or a part of it) and >>> "translate" it into one ore more JSON indices (and put them into >>> Elasticsearch) >>> @Fabian: >>> You're right that batch processes are bot very well suited to work with >>> services that can fail...if in a map function the remote call fails all the >>> batch job fails...this should be less problematic with streaming because >>> there's checkpointing and with async IO is should be the possibile to add >>> some retry/backoff policies in order to not overload remote services like >>> db or solr/es indices (maybe it's not already there but it should be >>> possible to add). Am I wrong? >>> >>> @Kostas: >>> >>> From what I understood Queryable state is usefult for gets...what if I need >>> to scan the entire db? For us it could be better do periodically dump the >>> state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe >>> to start a batch job that reads the dumped data while, in the meantime, a >>> possible update of this dump could happen...is there any potential problem >>> to data consistency (indeed tuples within grouped objects have references >>> to other objects keys)? >>> >>> Best, >>> Flavio >>> >>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas >>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote: >>> Hi Flavio, >>> >>> For setting the retries, unfortunately there is no such setting yet and, if >>> I am not wrong, in case of a failure of a request, >>> an exception will be thrown and the job will restart. I am also including >>> Till in the thread as he may know better. >>> >>> For consistency guarantees and concurrency control, this depends on your >>> underlying backend. But if you want to >>> have end-to-end control, then you could do as Ankit suggested at his point >>> 3), i.e have a single job for the whole pipeline >>> (if this fits your needs of course). This will allow you to set your own >>> “precedence” rules for your operations. >>> >>> Now finally, there is no way currently to expose the state of a job to >>> another job. The way to do so is either Queryable >>> State, or writing to a Sink. If the problem for having one job is that you >>> emit one element at a time, you can always group >>> elements together and emit downstream less often, in batches. >>> >>> Finally, if you need 2 jobs, you can always use a hybrid solution where >>> you keep your current state in Flink, and you dump it >>> to a Sink that is queryable once per week for example. The Sink then can be >>> queried at any time, and data will be at most one >>> week old. >>> >>> Thanks, >>> Kostas >>> >>>> On May 17, 2017, at 9:35 AM, Fabian Hueske <fhue...@gmail.com >>>> <mailto:fhue...@gmail.com>> wrote: >>>> >>>> Hi Ankit, just a brief comment on the batch job is easier than streaming >>>> job argument. I'm not sure about that. >>>> I can see that just the batch job might seem easier to implement, but this >>>> is only one part of the whole story. The operational side of using batch >>>> is more complex IMO. >>>> You need a tool to ingest your stream, you need storage for the ingested >>>> data, you need a periodic scheduler to kick of your batch job, and you >>>> need to take care of failures if something goes wrong. >>>> The streaming case, this is not needed or the framework does it for you. >>>> >>>> Just my 2 cents, Fabian >>>> >>>> 2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.j...@here.com >>>> <mailto:ankit.j...@here.com>>: >>>> Hi Flavio, >>>> >>>> While you wait on an update from Kostas, wanted to understand the use-case >>>> better and share my thoughts- >>>> >>>> >>>> >>>> 1) Why is current batch mode expensive? Where are you persisting the >>>> data after updates? Way I see it by moving to Flink, you get to use >>>> RocksDB(a key-value store) that makes your lookups faster – probably right >>>> now you are using a non-indexed store like S3 maybe? >>>> >>>> So, gain is coming from moving to a better persistence store suited to >>>> your use-case than from batch->streaming. Myabe consider just going with a >>>> different data store. >>>> >>>> IMHO, stream should only be used if you really want to act on the new >>>> events in real-time. It is generally harder to get a streaming job correct >>>> than a batch one. >>>> >>>> >>>> >>>> 2) If current setup is expensive due to >>>> serialization-deserialization then that should be fixed by moving to a >>>> faster format (maybe AVRO? - I don’t have a lot of expertise in that). I >>>> don’t see how that problem will go away with Flink – so still need to >>>> handle serialization. >>>> >>>> >>>> >>>> 3) Even if you do decide to move to Flink – I think you can do this >>>> with one job, two jobs are not needed. At every incoming event, check the >>>> previous state and update/output to kafka or whatever data store you are >>>> using. >>>> >>>> >>>> >>>> >>>> >>>> Thanks >>>> >>>> Ankit >>>> >>>> >>>> >>>> From: Flavio Pompermaier <pomperma...@okkam.it >>>> <mailto:pomperma...@okkam.it>> >>>> Date: Tuesday, May 16, 2017 at 9:31 AM >>>> To: Kostas Kloudas <k.klou...@data-artisans.com >>>> <mailto:k.klou...@data-artisans.com>> >>>> Cc: user <user@flink.apache.org <mailto:user@flink.apache.org>> >>>> Subject: Re: Stateful streaming question >>>> >>>> >>>> >>>> Hi Kostas, >>>> >>>> thanks for your quick response. >>>> >>>> I also thought about using Async IO, I just need to figure out how to >>>> correctly handle parallelism and number of async requests. >>>> >>>> However that's probably the way to go..is it possible also to set a number >>>> of retry attempts/backoff when the async request fails (maybe due to a too >>>> busy server)? >>>> >>>> >>>> >>>> For the second part I think it's ok to persist the state into RocksDB or >>>> HDFS, my question is indeed about that: is it safe to start reading (with >>>> another Flink job) from RocksDB or HDFS having an updatable state >>>> "pending" on it? Should I ensure that state updates are not possible until >>>> the other Flink job hasn't finish to read the persisted data? >>>> >>>> >>>> >>>> And another question...I've tried to draft such a processand basically I >>>> have the following code: >>>> >>>> >>>> >>>> DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0) >>>> >>>> .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() { >>>> >>>> >>>> >>>> private transient ValueState<MyGroupedObj> state; >>>> >>>> >>>> >>>> @Override >>>> >>>> public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) >>>> throws Exception { >>>> >>>> MyGroupedObj current = state.value(); >>>> >>>> if (current == null) { >>>> >>>> current = new MyGroupedObj(); >>>> >>>> } >>>> >>>> .... >>>> >>>> current.addTuple(t); >>>> >>>> ... >>>> >>>> state.update(current); >>>> >>>> out.collect(current); >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public void open(Configuration config) { >>>> >>>> ValueStateDescriptor<MyGroupedObj> descriptor = >>>> >>>> new ValueStateDescriptor<>( >>>> "test",TypeInformation.of(MyGroupedObj.class)); >>>> >>>> state = getRuntimeContext().getState(descriptor); >>>> >>>> } >>>> >>>> }); >>>> >>>> groupedObj.print(); >>>> >>>> >>>> >>>> but obviously this way I emit the updated object on every update while, >>>> actually, I just want to persist the ValueState somehow (and make it >>>> available to another job that runs one/moth for example). Is that possible? >>>> >>>> >>>> >>>> >>>> >>>> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas >>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote: >>>> >>>> Hi Flavio, >>>> >>>> >>>> >>>> From what I understand, for the first part you are correct. You can use >>>> Flink’s internal state to keep your enriched data. >>>> >>>> In fact, if you are also querying an external system to enrich your data, >>>> it is worth looking at the AsyncIO feature: >>>> >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html> >>>> >>>> >>>> Now for the second part, currently in Flink you cannot iterate over all >>>> registered keys for which you have state. A pointer >>>> >>>> to look at the may be useful is the queryable state: >>>> >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html> >>>> >>>> >>>> This is still an experimental feature, but let us know your opinion if you >>>> use it. >>>> >>>> >>>> >>>> Finally, an alternative would be to keep state in Flink, and periodically >>>> flush it to an external storage system, which you can >>>> >>>> query at will. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Kostas >>>> >>>> >>>> >>>> >>>> >>>> On May 16, 2017, at 4:38 PM, Flavio Pompermaier <pomperma...@okkam.it >>>> <mailto:pomperma...@okkam.it>> wrote: >>>> >>>> >>>> >>>> Hi to all, >>>> >>>> we're still playing with Flink streaming part in order to see whether it >>>> can improve our current batch pipeline. >>>> >>>> At the moment, we have a job that translate incoming data (as Row) into >>>> Tuple4, groups them together by the first field and persist the result to >>>> disk (using a thrift object). When we need to add tuples to those grouped >>>> objects we need to read again the persisted data, flat it back to Tuple4, >>>> union with the new tuples, re-group by key and finally persist. >>>> >>>> >>>> >>>> This is very expansive to do with batch computation while is should pretty >>>> straightforward to do with streaming (from what I understood): I just need >>>> to use ListState. Right? >>>> >>>> Then, let's say I need to scan all the data of the stateful computation >>>> (key and values), in order to do some other computation, I'd like to know: >>>> >>>> how to do that? I.e. create a DataSet/DataSource<Key,Value> from the >>>> stateful data in the stream >>>> is there any problem to access the stateful data without stopping incoming >>>> data (and thus possible updates to the states)? >>>> Thanks in advance for the support, >>>> >>>> Flavio >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Flavio Pompermaier >>>> Development Department >>>> >>>> OKKAM S.r.l. >>>> Tel. +(39) 0461 1823908 <tel:+39%200461%20182%203908> >>> >>> >> >> > > >