Hi,

I’m afraid not. You would have to wait for one job to finish before starting 
the next one.

Best,
Aljoscha
> On 15. Jun 2017, at 20:11, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> Hi Aljoscha,
> we're still investigating possible solutions here. Yes, as you correctly said 
> there are links between data of different keys so we can only proceed with 
> the next job only once we are sure at 100% that all input data has been 
> consumed and no other data will be read until this last jobs ends.
> There should be some sort of synchronization between these 2 jobs...is that 
> possible right now in Flink?
> 
> Thanks a lot for the support,
> Flavio
> 
> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> Trying to revive this somewhat older thread: have you made any progress? I 
> think going with a ProcessFunction that keeps all your state internally and 
> periodically outputs to, say, Elasticsearch using a sink seems like the way 
> to go? You can do the periodic emission using timers in the ProcessFunction. 
> 
> In your use case, does the data you would store in the Flink managed state 
> have links between data of different keys? This sounds like it could be a 
> problem when it comes to consistency when outputting to an external system.
> 
> Best,
> Aljoscha
> 
>> On 17. May 2017, at 14:12, Flavio Pompermaier <pomperma...@okkam.it 
>> <mailto:pomperma...@okkam.it>> wrote:
>> 
>> Hi to all,
>> there are a lot of useful discussion points :)
>> 
>> I'll try to answer to everybody.
>> 
>> @Ankit: 
>> right now we're using Parquet on HDFS to store thrift objects. Those objects 
>> are essentially structured like
>> key
>> alternative_key
>> list of tuples (representing the state of my Object)
>> This model could be potentially modeled as a Monoid and it's very well 
>> suited for a stateful streaming computation where updates to a single key 
>> state are not as expansive as a call to any db to get the current list of 
>> tuples and update back that list with for an update (IMHO). Maybe here I'm 
>> overestimating Flink streaming capabilities...
>> serialization should be ok using thrift, but Flink advice to use tuples to 
>> have better performance so just after reading the data from disk (as a 
>> ThriftObject) we convert them to its equivalent representation as 
>> Tuple3<String, String, List<Tuple4>> version
>> Since I currently use Flink to ingest data that (in the end) means adding 
>> tuples to my objects, it would be perfect to have an "online" state of the 
>> grouped tuples in order to:
>> add/remove tuples to my object very quickly
>> from time to time, scan the whole online data (or a part of it) and 
>> "translate" it into one ore more JSON indices (and put them into 
>> Elasticsearch)
>> @Fabian:
>> You're right that batch processes are bot very well suited to work with 
>> services that can fail...if in a map function the remote call fails all the 
>> batch job fails...this should be less problematic with streaming because 
>> there's checkpointing and with async IO  is should be the possibile to add 
>> some retry/backoff policies in order to not overload remote services like db 
>> or solr/es indices (maybe it's not already there but it should be possible 
>> to add). Am I wrong?
>> 
>> @Kostas:
>> 
>> From what I understood Queryable state is usefult for gets...what if I need 
>> to scan the entire db? For us it could be better do periodically dump the 
>> state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe 
>> to start a batch job that reads the dumped data while, in the meantime, a 
>> possible update of this dump could happen...is there any potential problem 
>> to data consistency (indeed tuples within grouped objects have references to 
>> other objects keys)?
>> 
>> Best,
>> Flavio
>> 
>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas 
>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Flavio,
>> 
>> For setting the retries, unfortunately there is no such setting yet and, if 
>> I am not wrong, in case of a failure of a request, 
>> an exception will be thrown and the job will restart. I am also including 
>> Till in the thread as he may know better.
>> 
>> For consistency guarantees and concurrency control, this depends on your 
>> underlying backend. But if you want to 
>> have end-to-end control, then you could do as Ankit suggested at his point 
>> 3), i.e have a single job for the whole pipeline
>>  (if this fits your needs of course). This will allow you to set your own 
>> “precedence” rules for your operations.
>> 
>> Now finally, there is no way currently to expose the state of a job to 
>> another job. The way to do so is either Queryable
>> State, or writing to a Sink. If the problem for having one job is that you 
>> emit one element at a time, you can always group
>> elements together and emit downstream less often, in batches.
>>  
>> Finally, if  you need 2 jobs, you can always use a hybrid solution where you 
>> keep your current state in Flink, and you dump it 
>> to a Sink that is queryable once per week for example. The Sink then can be 
>> queried at any time, and data will be at most one 
>> week old.
>> 
>> Thanks,
>> Kostas
>> 
>>> On May 17, 2017, at 9:35 AM, Fabian Hueske <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> wrote:
>>> 
>>> Hi Ankit, just a brief comment on the batch job is easier than streaming 
>>> job argument. I'm not sure about that. 
>>> I can see that just the batch job might seem easier to implement, but this 
>>> is only one part of the whole story. The operational side of using batch is 
>>> more complex IMO. 
>>> You need a tool to ingest your stream, you need storage for the ingested 
>>> data, you need a periodic scheduler to kick of your batch job, and you need 
>>> to take care of failures if something goes wrong. 
>>> The streaming case, this is not needed or the framework does it for you.
>>> 
>>> Just my 2 cents, Fabian
>>> 
>>> 2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.j...@here.com 
>>> <mailto:ankit.j...@here.com>>:
>>> Hi Flavio,
>>> 
>>> While you wait on an update from Kostas, wanted to understand the use-case 
>>> better and share my thoughts-
>>> 
>>>  
>>> 
>>> 1)       Why is current batch mode expensive? Where are you persisting the 
>>> data after updates? Way I see it by moving to Flink, you get to use 
>>> RocksDB(a key-value store) that makes your lookups faster – probably right 
>>> now you are using a non-indexed store like S3 maybe?
>>> 
>>> So, gain is coming from moving to a better persistence store suited to your 
>>> use-case than from batch->streaming. Myabe consider just going with a 
>>> different data store.
>>> 
>>> IMHO, stream should only be used if you really want to act on the new 
>>> events in real-time. It is generally harder to get a streaming job correct 
>>> than a batch one.
>>> 
>>>  
>>> 
>>> 2)       If current setup is expensive due to serialization-deserialization 
>>> then that should be fixed by moving to a faster format (maybe AVRO? - I 
>>> don’t have a lot of expertise in that). I don’t see how that problem will 
>>> go away with Flink – so still need to handle serialization.
>>> 
>>>  
>>> 
>>> 3)       Even if you do decide to move to Flink – I think you can do this 
>>> with one job, two jobs are not needed. At every incoming event, check the 
>>> previous state and update/output to kafka or whatever data store you are 
>>> using.
>>> 
>>>  
>>> 
>>>  
>>> 
>>> Thanks
>>> 
>>> Ankit
>>> 
>>>  
>>> 
>>> From: Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>>
>>> Date: Tuesday, May 16, 2017 at 9:31 AM
>>> To: Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>>
>>> Cc: user <user@flink.apache.org <mailto:user@flink.apache.org>>
>>> Subject: Re: Stateful streaming question
>>> 
>>>  
>>> 
>>> Hi Kostas,
>>> 
>>> thanks for your quick response. 
>>> 
>>> I also thought about using Async IO, I just need to figure out how to 
>>> correctly handle parallelism and number of async requests. 
>>> 
>>> However that's probably the way to go..is it possible also to set a number 
>>> of retry attempts/backoff when the async request fails (maybe due to a too 
>>> busy server)?
>>> 
>>>  
>>> 
>>> For the second part I think it's ok to persist the state into RocksDB or 
>>> HDFS, my question is indeed about that: is it safe to start reading (with 
>>> another Flink job) from RocksDB or HDFS having an updatable state "pending" 
>>> on it? Should I ensure that state updates are not possible until the other 
>>> Flink job hasn't finish to read the persisted data?
>>> 
>>>  
>>> 
>>> And another question...I've tried to draft such a processand basically I 
>>> have the following code:
>>> 
>>>  
>>> 
>>> DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
>>> 
>>>         .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {
>>> 
>>>  
>>> 
>>>           private transient ValueState<MyGroupedObj> state;
>>> 
>>>  
>>> 
>>>           @Override
>>> 
>>>           public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws 
>>> Exception {
>>> 
>>>             MyGroupedObj current = state.value();
>>> 
>>>             if (current == null) {
>>> 
>>>               current = new MyGroupedObj();
>>> 
>>>             }
>>> 
>>>             ....
>>> 
>>>            current.addTuple(t);
>>> 
>>>             ... 
>>> 
>>>             state.update(current);
>>> 
>>>             out.collect(current);
>>> 
>>>           }
>>> 
>>>           
>>> 
>>>           @Override
>>> 
>>>           public void open(Configuration config) {
>>> 
>>>             ValueStateDescriptor<MyGroupedObj> descriptor =
>>> 
>>>                       new ValueStateDescriptor<>( 
>>> "test",TypeInformation.of(MyGroupedObj.class));
>>> 
>>>               state = getRuntimeContext().getState(descriptor);
>>> 
>>>           }
>>> 
>>>         });
>>> 
>>>     groupedObj.print();
>>> 
>>>  
>>> 
>>> but obviously this way I emit the updated object on every update while, 
>>> actually, I just want to persist the ValueState somehow (and make it 
>>> available to another job that runs one/moth for example). Is that possible?
>>> 
>>>  
>>> 
>>>  
>>> 
>>> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas 
>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>> Hi Flavio,
>>> 
>>>  
>>> 
>>> From what I understand, for the first part you are correct. You can use 
>>> Flink’s internal state to keep your enriched data.
>>> 
>>> In fact, if you are also querying an external system to enrich your data, 
>>> it is worth looking at the AsyncIO feature:
>>> 
>>>  
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html>
>>>  
>>> 
>>> Now for the second part, currently in Flink you cannot iterate over all 
>>> registered keys for which you have state. A pointer 
>>> 
>>> to look at the may be useful is the queryable state:
>>> 
>>>  
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html>
>>>  
>>> 
>>> This is still an experimental feature, but let us know your opinion if you 
>>> use it.
>>> 
>>>  
>>> 
>>> Finally, an alternative would be to keep state in Flink, and periodically 
>>> flush it to an external storage system, which you can
>>> 
>>> query at will.
>>> 
>>>  
>>> 
>>> Thanks,
>>> 
>>> Kostas
>>> 
>>>  
>>> 
>>>  
>>> 
>>> On May 16, 2017, at 4:38 PM, Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>> wrote:
>>> 
>>>  
>>> 
>>> Hi to all,
>>> 
>>> we're still playing with Flink streaming part in order to see whether it 
>>> can improve our current batch pipeline.
>>> 
>>> At the moment, we have a job that translate incoming data (as Row) into 
>>> Tuple4, groups them together by the first field and persist the result to 
>>> disk (using a thrift object). When we need to add tuples to those grouped 
>>> objects we need to read again the persisted data, flat it back to Tuple4, 
>>> union with the new tuples, re-group by key and finally persist.
>>> 
>>>  
>>> 
>>> This is very expansive to do with batch computation while is should pretty 
>>> straightforward to do with streaming (from what I understood): I just need 
>>> to use ListState. Right?
>>> 
>>> Then, let's say I need to scan all the data of the stateful computation 
>>> (key and values), in order to do some other computation, I'd like to know:
>>> 
>>> how to do that? I.e. create a DataSet/DataSource<Key,Value> from the 
>>> stateful data in the stream
>>> is there any problem to access the stateful data without stopping incoming 
>>> data (and thus possible updates to the states)?
>>> Thanks in advance for the support,
>>> 
>>> Flavio
>>> 
>>>  
>>> 
>>>  
>>> 
>>> 
>>> 
>>> 
>>>  
>>> 
>>> --
>>> 
>>> Flavio Pompermaier
>>> Development Department
>>> 
>>> OKKAM S.r.l.
>>> Tel. +(39) 0461 1823908 <tel:+39%200461%20182%203908>
>> 
>> 
> 
> 

Reply via email to