Hi Kostas,
thanks for your quick response.
I also thought about using Async IO, I just need to figure out how to
correctly handle parallelism and number of async requests.
However that's probably the way to go..is it possible also to set a number
of retry attempts/backoff when the async request fails (maybe due to a too
busy server)?

For the second part I think it's ok to persist the state into RocksDB or
HDFS, my question is indeed about that: is it safe to start reading (with
another Flink job) from RocksDB or HDFS having an updatable state "pending"
on it? Should I ensure that state updates are not possible until the other
Flink job hasn't finish to read the persisted data?

And another question...I've tried to draft such a processand basically I
have the following code:

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

          private transient ValueState<MyGroupedObj> state;

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws
Exception {
            MyGroupedObj current = state.value();
            if (current == null) {
              current = new MyGroupedObj();

          public void open(Configuration config) {
            ValueStateDescriptor<MyGroupedObj> descriptor =
                      new ValueStateDescriptor<>(
              state = getRuntimeContext().getState(descriptor);

but obviously this way I emit the updated object on every update while,
actually, I just want to persist the ValueState somehow (and make it
available to another job that runs one/moth for example). Is that possible?

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Flavio,
> From what I understand, for the first part you are correct. You can use
> Flinkā€™s internal state to keep your enriched data.
> In fact, if you are also querying an external system to enrich your data,
> it is worth looking at the AsyncIO feature:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> dev/stream/asyncio.html
> Now for the second part, currently in Flink you cannot iterate over all
> registered keys for which you have state. A pointer
> to look at the may be useful is the queryable state:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> dev/stream/queryable_state.html
> This is still an experimental feature, but let us know your opinion if you
> use it.
> Finally, an alternative would be to keep state in Flink, and periodically
> flush it to an external storage system, which you can
> query at will.
> Thanks,
> Kostas
> On May 16, 2017, at 4:38 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
> Hi to all,
> we're still playing with Flink streaming part in order to see whether it
> can improve our current batch pipeline.
> At the moment, we have a job that translate incoming data (as Row) into
> Tuple4, groups them together by the first field and persist the result to
> disk (using a thrift object). When we need to add tuples to those grouped
> objects we need to read again the persisted data, flat it back to Tuple4,
> union with the new tuples, re-group by key and finally persist.
> This is very expansive to do with batch computation while is should pretty
> straightforward to do with streaming (from what I understood): I just need
> to use ListState. Right?
> Then, let's say I need to scan all the data of the stateful computation
> (key and values), in order to do some other computation, I'd like to know:
>    - how to do that? I.e. create a DataSet/DataSource<Key,Value> from the
>    stateful data in the stream
>    - is there any problem to access the stateful data without stopping
>    incoming data (and thus possible updates to the states)?
> Thanks in advance for the support,
> Flavio

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908 <+39%200461%20182%203908>

Reply via email to