I think it might be possible to do but I’m not aware of anyone working on that 
and I haven’t seen anyone on the mailing lists express interest in working on 
that.

> On 16. Jun 2017, at 11:31, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> Ok thanks for the clarification. Do you think it could be possible (sooner or 
> later) to have in Flink some sort of synchronization between jobs (as in this 
> case where the input datastream should be "paused" until the second job 
> finishes)? I know I coould use something like Oozie or Falcon to orchestrate 
> jobs but I'd prefer to avoid to add them to our architecture..
> 
> Best,
> Flavio
> 
> On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> I’m afraid not. You would have to wait for one job to finish before starting 
> the next one.
> 
> Best,
> Aljoscha
>> On 15. Jun 2017, at 20:11, Flavio Pompermaier <pomperma...@okkam.it 
>> <mailto:pomperma...@okkam.it>> wrote:
>> 
>> Hi Aljoscha,
>> we're still investigating possible solutions here. Yes, as you correctly 
>> said there are links between data of different keys so we can only proceed 
>> with the next job only once we are sure at 100% that all input data has been 
>> consumed and no other data will be read until this last jobs ends.
>> There should be some sort of synchronization between these 2 jobs...is that 
>> possible right now in Flink?
>> 
>> Thanks a lot for the support,
>> Flavio
>> 
>> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> 
>> Trying to revive this somewhat older thread: have you made any progress? I 
>> think going with a ProcessFunction that keeps all your state internally and 
>> periodically outputs to, say, Elasticsearch using a sink seems like the way 
>> to go? You can do the periodic emission using timers in the ProcessFunction. 
>> 
>> In your use case, does the data you would store in the Flink managed state 
>> have links between data of different keys? This sounds like it could be a 
>> problem when it comes to consistency when outputting to an external system.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 17. May 2017, at 14:12, Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>> wrote:
>>> 
>>> Hi to all,
>>> there are a lot of useful discussion points :)
>>> 
>>> I'll try to answer to everybody.
>>> 
>>> @Ankit: 
>>> right now we're using Parquet on HDFS to store thrift objects. Those 
>>> objects are essentially structured like
>>> key
>>> alternative_key
>>> list of tuples (representing the state of my Object)
>>> This model could be potentially modeled as a Monoid and it's very well 
>>> suited for a stateful streaming computation where updates to a single key 
>>> state are not as expansive as a call to any db to get the current list of 
>>> tuples and update back that list with for an update (IMHO). Maybe here I'm 
>>> overestimating Flink streaming capabilities...
>>> serialization should be ok using thrift, but Flink advice to use tuples to 
>>> have better performance so just after reading the data from disk (as a 
>>> ThriftObject) we convert them to its equivalent representation as 
>>> Tuple3<String, String, List<Tuple4>> version
>>> Since I currently use Flink to ingest data that (in the end) means adding 
>>> tuples to my objects, it would be perfect to have an "online" state of the 
>>> grouped tuples in order to:
>>> add/remove tuples to my object very quickly
>>> from time to time, scan the whole online data (or a part of it) and 
>>> "translate" it into one ore more JSON indices (and put them into 
>>> Elasticsearch)
>>> @Fabian:
>>> You're right that batch processes are bot very well suited to work with 
>>> services that can fail...if in a map function the remote call fails all the 
>>> batch job fails...this should be less problematic with streaming because 
>>> there's checkpointing and with async IO  is should be the possibile to add 
>>> some retry/backoff policies in order to not overload remote services like 
>>> db or solr/es indices (maybe it's not already there but it should be 
>>> possible to add). Am I wrong?
>>> 
>>> @Kostas:
>>> 
>>> From what I understood Queryable state is usefult for gets...what if I need 
>>> to scan the entire db? For us it could be better do periodically dump the 
>>> state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe 
>>> to start a batch job that reads the dumped data while, in the meantime, a 
>>> possible update of this dump could happen...is there any potential problem 
>>> to data consistency (indeed tuples within grouped objects have references 
>>> to other objects keys)?
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas 
>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>>> Hi Flavio,
>>> 
>>> For setting the retries, unfortunately there is no such setting yet and, if 
>>> I am not wrong, in case of a failure of a request, 
>>> an exception will be thrown and the job will restart. I am also including 
>>> Till in the thread as he may know better.
>>> 
>>> For consistency guarantees and concurrency control, this depends on your 
>>> underlying backend. But if you want to 
>>> have end-to-end control, then you could do as Ankit suggested at his point 
>>> 3), i.e have a single job for the whole pipeline
>>>  (if this fits your needs of course). This will allow you to set your own 
>>> “precedence” rules for your operations.
>>> 
>>> Now finally, there is no way currently to expose the state of a job to 
>>> another job. The way to do so is either Queryable
>>> State, or writing to a Sink. If the problem for having one job is that you 
>>> emit one element at a time, you can always group
>>> elements together and emit downstream less often, in batches.
>>>  
>>> Finally, if  you need 2 jobs, you can always use a hybrid solution where 
>>> you keep your current state in Flink, and you dump it 
>>> to a Sink that is queryable once per week for example. The Sink then can be 
>>> queried at any time, and data will be at most one 
>>> week old.
>>> 
>>> Thanks,
>>> Kostas
>>> 
>>>> On May 17, 2017, at 9:35 AM, Fabian Hueske <fhue...@gmail.com 
>>>> <mailto:fhue...@gmail.com>> wrote:
>>>> 
>>>> Hi Ankit, just a brief comment on the batch job is easier than streaming 
>>>> job argument. I'm not sure about that. 
>>>> I can see that just the batch job might seem easier to implement, but this 
>>>> is only one part of the whole story. The operational side of using batch 
>>>> is more complex IMO. 
>>>> You need a tool to ingest your stream, you need storage for the ingested 
>>>> data, you need a periodic scheduler to kick of your batch job, and you 
>>>> need to take care of failures if something goes wrong. 
>>>> The streaming case, this is not needed or the framework does it for you.
>>>> 
>>>> Just my 2 cents, Fabian
>>>> 
>>>> 2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.j...@here.com 
>>>> <mailto:ankit.j...@here.com>>:
>>>> Hi Flavio,
>>>> 
>>>> While you wait on an update from Kostas, wanted to understand the use-case 
>>>> better and share my thoughts-
>>>> 
>>>>  
>>>> 
>>>> 1)       Why is current batch mode expensive? Where are you persisting the 
>>>> data after updates? Way I see it by moving to Flink, you get to use 
>>>> RocksDB(a key-value store) that makes your lookups faster – probably right 
>>>> now you are using a non-indexed store like S3 maybe?
>>>> 
>>>> So, gain is coming from moving to a better persistence store suited to 
>>>> your use-case than from batch->streaming. Myabe consider just going with a 
>>>> different data store.
>>>> 
>>>> IMHO, stream should only be used if you really want to act on the new 
>>>> events in real-time. It is generally harder to get a streaming job correct 
>>>> than a batch one.
>>>> 
>>>>  
>>>> 
>>>> 2)       If current setup is expensive due to 
>>>> serialization-deserialization then that should be fixed by moving to a 
>>>> faster format (maybe AVRO? - I don’t have a lot of expertise in that). I 
>>>> don’t see how that problem will go away with Flink – so still need to 
>>>> handle serialization.
>>>> 
>>>>  
>>>> 
>>>> 3)       Even if you do decide to move to Flink – I think you can do this 
>>>> with one job, two jobs are not needed. At every incoming event, check the 
>>>> previous state and update/output to kafka or whatever data store you are 
>>>> using.
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> Thanks
>>>> 
>>>> Ankit
>>>> 
>>>>  
>>>> 
>>>> From: Flavio Pompermaier <pomperma...@okkam.it 
>>>> <mailto:pomperma...@okkam.it>>
>>>> Date: Tuesday, May 16, 2017 at 9:31 AM
>>>> To: Kostas Kloudas <k.klou...@data-artisans.com 
>>>> <mailto:k.klou...@data-artisans.com>>
>>>> Cc: user <user@flink.apache.org <mailto:user@flink.apache.org>>
>>>> Subject: Re: Stateful streaming question
>>>> 
>>>>  
>>>> 
>>>> Hi Kostas,
>>>> 
>>>> thanks for your quick response. 
>>>> 
>>>> I also thought about using Async IO, I just need to figure out how to 
>>>> correctly handle parallelism and number of async requests. 
>>>> 
>>>> However that's probably the way to go..is it possible also to set a number 
>>>> of retry attempts/backoff when the async request fails (maybe due to a too 
>>>> busy server)?
>>>> 
>>>>  
>>>> 
>>>> For the second part I think it's ok to persist the state into RocksDB or 
>>>> HDFS, my question is indeed about that: is it safe to start reading (with 
>>>> another Flink job) from RocksDB or HDFS having an updatable state 
>>>> "pending" on it? Should I ensure that state updates are not possible until 
>>>> the other Flink job hasn't finish to read the persisted data?
>>>> 
>>>>  
>>>> 
>>>> And another question...I've tried to draft such a processand basically I 
>>>> have the following code:
>>>> 
>>>>  
>>>> 
>>>> DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
>>>> 
>>>>         .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {
>>>> 
>>>>  
>>>> 
>>>>           private transient ValueState<MyGroupedObj> state;
>>>> 
>>>>  
>>>> 
>>>>           @Override
>>>> 
>>>>           public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) 
>>>> throws Exception {
>>>> 
>>>>             MyGroupedObj current = state.value();
>>>> 
>>>>             if (current == null) {
>>>> 
>>>>               current = new MyGroupedObj();
>>>> 
>>>>             }
>>>> 
>>>>             ....
>>>> 
>>>>            current.addTuple(t);
>>>> 
>>>>             ... 
>>>> 
>>>>             state.update(current);
>>>> 
>>>>             out.collect(current);
>>>> 
>>>>           }
>>>> 
>>>>           
>>>> 
>>>>           @Override
>>>> 
>>>>           public void open(Configuration config) {
>>>> 
>>>>             ValueStateDescriptor<MyGroupedObj> descriptor =
>>>> 
>>>>                       new ValueStateDescriptor<>( 
>>>> "test",TypeInformation.of(MyGroupedObj.class));
>>>> 
>>>>               state = getRuntimeContext().getState(descriptor);
>>>> 
>>>>           }
>>>> 
>>>>         });
>>>> 
>>>>     groupedObj.print();
>>>> 
>>>>  
>>>> 
>>>> but obviously this way I emit the updated object on every update while, 
>>>> actually, I just want to persist the ValueState somehow (and make it 
>>>> available to another job that runs one/moth for example). Is that possible?
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas 
>>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>>>> 
>>>> Hi Flavio,
>>>> 
>>>>  
>>>> 
>>>> From what I understand, for the first part you are correct. You can use 
>>>> Flink’s internal state to keep your enriched data.
>>>> 
>>>> In fact, if you are also querying an external system to enrich your data, 
>>>> it is worth looking at the AsyncIO feature:
>>>> 
>>>>  
>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html>
>>>>  
>>>> 
>>>> Now for the second part, currently in Flink you cannot iterate over all 
>>>> registered keys for which you have state. A pointer 
>>>> 
>>>> to look at the may be useful is the queryable state:
>>>> 
>>>>  
>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html>
>>>>  
>>>> 
>>>> This is still an experimental feature, but let us know your opinion if you 
>>>> use it.
>>>> 
>>>>  
>>>> 
>>>> Finally, an alternative would be to keep state in Flink, and periodically 
>>>> flush it to an external storage system, which you can
>>>> 
>>>> query at will.
>>>> 
>>>>  
>>>> 
>>>> Thanks,
>>>> 
>>>> Kostas
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> On May 16, 2017, at 4:38 PM, Flavio Pompermaier <pomperma...@okkam.it 
>>>> <mailto:pomperma...@okkam.it>> wrote:
>>>> 
>>>>  
>>>> 
>>>> Hi to all,
>>>> 
>>>> we're still playing with Flink streaming part in order to see whether it 
>>>> can improve our current batch pipeline.
>>>> 
>>>> At the moment, we have a job that translate incoming data (as Row) into 
>>>> Tuple4, groups them together by the first field and persist the result to 
>>>> disk (using a thrift object). When we need to add tuples to those grouped 
>>>> objects we need to read again the persisted data, flat it back to Tuple4, 
>>>> union with the new tuples, re-group by key and finally persist.
>>>> 
>>>>  
>>>> 
>>>> This is very expansive to do with batch computation while is should pretty 
>>>> straightforward to do with streaming (from what I understood): I just need 
>>>> to use ListState. Right?
>>>> 
>>>> Then, let's say I need to scan all the data of the stateful computation 
>>>> (key and values), in order to do some other computation, I'd like to know:
>>>> 
>>>> how to do that? I.e. create a DataSet/DataSource<Key,Value> from the 
>>>> stateful data in the stream
>>>> is there any problem to access the stateful data without stopping incoming 
>>>> data (and thus possible updates to the states)?
>>>> Thanks in advance for the support,
>>>> 
>>>> Flavio
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> 
>>>> 
>>>> 
>>>>  
>>>> 
>>>> --
>>>> 
>>>> Flavio Pompermaier
>>>> Development Department
>>>> 
>>>> OKKAM S.r.l.
>>>> Tel. +(39) 0461 1823908 <tel:+39%200461%20182%203908>
>>> 
>>> 
>> 
>> 
> 
> 
> 

Reply via email to