Re: DataSet: combineGroup/reduceGroup with large number of groups

2017-06-16 Thread Urs Schoenenberger
Hi Fabian,

thanks, that is very helpful indeed - I now understand why the DataSet
drivers insist on sorting the buffers and then processing instead of
keeping state.

In our case, the state should easily fit into the heap of the cluster,
though. In a quick&dirty example I tried just now, the MapPartition
solution outperforms GroupReduce/Combine by a factor of 3, looking
forward to testing this on our real data set soon.

Two things I'd like to clarify:

- Your suggestion of limiting the size of the HashMap in the
MapPartitionFunction is meant to reduce the risks of OOMEs, right? If
I'm confident my state fits into heap, there's no reason to do this?

- With your DataStream suggestion, I can't tell when to schedule a
processing time timer. I would therefore need to use an event time timer
(at Long.MAX_VALUE-1, say), and modify my source to emit a watermark
with Long.MAX_VALUE after it reaches the end of input, correct?

Thanks,
Urs

On 16.06.2017 17:58, Fabian Hueske wrote:
> Hi Urs,
> 
> on the DataSet API, the only memory-safe way to do it is
> GroupReduceFunction.
> As you observed this requires a full sort of the dataset which can be quite
> expensive but after the sort the computation is streamed.
> You could also try to manually implement a hash-based combiner using a
> MapPartitionFunction. The function would have a HashMap on the key with a
> fixed size that needs to be manually tuned.
> When you have to insert a new record into the HashMap but it reached the
> max size, you have to evict a record first. Since all of this happens on
> the heap, it won't be memory-safe and might fail with an OOME.
> 
> On the DataStream API you can use a ProcessFunction with keyed ValueState
> for the current AggregateT of each key. For each record you fetch the
> Aggregate from the state and update it.
> To emit the results at the end, you'll need to register a timer to emit the
> results at the end because the final aggregates are stored in the local
> state but never emitted.
> Another thing to consider is the state backend. You'll probably have to use
> the RocksDBStateBackend to be able to spill state to disk.
> 
> Hope this helps,
> Fabian
> 
> 
> 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <
> urs.schoenenber...@tngtech.com>:
> 
>> Hi,
>>
>> I'm working on a batch job (roughly 10 billion records of input, 10
>> million groups) that is essentially a 'fold' over each group, that is, I
>> have a function
>>
>> AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>>
>> and want to fold this over each group in my DataSet.
>>
>> My understanding is that I cannot use .groupBy(0).reduce(...) since the
>> ReduceFunction only supports the case where AggregateT is the same as
>> RecordT.
>>
>> A simple solution using .reduceGroup(...) works, but spills all input
>> data in the reduce step, which produces a lot of slow & expensive Disk IO.
>>
>> Therefore, we tried using .combineGroup(...).reduceGroup(...), but
>> experienced a similar amount of spilling. Checking the source of the
>> *Combine drivers, it seems that they accumulate events in a buffer, sort
>> the buffer by key, and combine adjacent records in the same group. This
>> does not work in my case due to the large number of groups - the records
>> in the buffer are most likely to all belong to different groups. The
>> "combine" phase therefore becomes a noop turning a single RecordT into
>> an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.
>>
>> Is there a way of modelling this computation efficiently with the
>> DataSet API? Alternatively, can I turn this into a DataStream job? (The
>> implementation there would simply be a MapFunction on a KeyedStream with
>> the AggregateT residing in keyed state, although I don't know how I
>> would emit this state at the end of the data stream only.)
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>>
>> Hi Urs,
>>
>> on the DataSet API, the only memory-safe way to do it is
>> GroupReduceFunction.
>> As you observed this requires a full sort of the dataset which can be
>> quite expensive but after the sort the computation is streamed.
>> You could also try to manually implement a hash-based combiner using a
>> MapPartitionFunction. The function would have a HashMap on the key
>> with a fixed size that needs to be manually tuned.
>> When you have to insert a new record into the HashMap but it reached
>> the max size, you have to evict a record first. Since all of this
>> happens on the heap, it won't be memory-safe and might fail with an OOME.
>>
>> On the DataStream API you can use a ProcessFunction with keyed
>> ValueState for the current AggregateT of each key. For each record you
>> fetch the Aggregate from the state and update it.
>> To emit the results at the end, you'll need to reg

Re: Using Contextual Data

2017-06-16 Thread Doron Keller
Thank you for your response.

Please let me know if this is doable in CEP, or if you need more information.

Doron

From: "Tzu-Li (Gordon) Tai" mailto:tzuli...@apache.org>>
Date: Sunday, April 16, 2017 at 11:05 PM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"kklou...@apache.org" 
mailto:kklou...@apache.org>>
Subject: Re: Using Contextual Data

Hi,

It seems like you want to load the context tables from external databases when 
the CEP operators start, and allow pattern detection logic to refer to those 
tables. I’m not entirely sure if this possible with the Flink CEP library.
I’ve looped in Klou who is more familiar with the CEP library and would 
probably have more insight on this (its currently the national holidays so we 
might not be responsive at the moment).

Otherwise, this is definitely achievable using the DataStream API. More 
specifically, you would want to implement the rich variant functions and load 
the contextual tables in `open()`. The occurrence frequency of events would be 
kept as registered state in the case you described.

Hope this helps!

Cheers,
Gordon



On 14 April 2017 at 7:41:46 AM, Doron Keller 
(do...@exabeam.com) wrote:
Hello Flink,

Is it possible to use contextual data as part of Event Processing (CEP)?

I would like to create some tables associated with each key. These tables would 
need to be updated as information streams in. The values in these tables would 
also have to be referenced as part of the rule logic. E.g. if a certain value 
was seen more than x times, trigger.

Finally, when the system is starting up these tables would have to be loaded 
from a database.

Thanks,
Doron


confusing RocksDBStateBackend parameters

2017-06-16 Thread Bowen Li
Hello guys,
  I've been trying to figure out differences among several parameters
of RocksDBStateBackend. The confusing parameters are:

  In flink-conf.yaml:
  1. state.backend.fs.checkpointdir
  2. state.backend.rocksdb.checkpointdir
  3. state.checkpoints.dir

  and
   4. the param *'**checkpointDataUri**'* you pass in to
RocksDBStateBackend
constructor in`public RocksDBStateBackend(*URI checkpointDataUri*)`

This email thread

explained the first three well. But what's the 4th one for? What's its
difference from others? I'd appreciate your clarification.

   Thanks very much!
Bowen


Re: DataSet: combineGroup/reduceGroup with large number of groups

2017-06-16 Thread Fabian Hueske
Hi Urs,

on the DataSet API, the only memory-safe way to do it is
GroupReduceFunction.
As you observed this requires a full sort of the dataset which can be quite
expensive but after the sort the computation is streamed.
You could also try to manually implement a hash-based combiner using a
MapPartitionFunction. The function would have a HashMap on the key with a
fixed size that needs to be manually tuned.
When you have to insert a new record into the HashMap but it reached the
max size, you have to evict a record first. Since all of this happens on
the heap, it won't be memory-safe and might fail with an OOME.

On the DataStream API you can use a ProcessFunction with keyed ValueState
for the current AggregateT of each key. For each record you fetch the
Aggregate from the state and update it.
To emit the results at the end, you'll need to register a timer to emit the
results at the end because the final aggregates are stored in the local
state but never emitted.
Another thing to consider is the state backend. You'll probably have to use
the RocksDBStateBackend to be able to spill state to disk.

Hope this helps,
Fabian


2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <
urs.schoenenber...@tngtech.com>:

> Hi,
>
> I'm working on a batch job (roughly 10 billion records of input, 10
> million groups) that is essentially a 'fold' over each group, that is, I
> have a function
>
> AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>
> and want to fold this over each group in my DataSet.
>
> My understanding is that I cannot use .groupBy(0).reduce(...) since the
> ReduceFunction only supports the case where AggregateT is the same as
> RecordT.
>
> A simple solution using .reduceGroup(...) works, but spills all input
> data in the reduce step, which produces a lot of slow & expensive Disk IO.
>
> Therefore, we tried using .combineGroup(...).reduceGroup(...), but
> experienced a similar amount of spilling. Checking the source of the
> *Combine drivers, it seems that they accumulate events in a buffer, sort
> the buffer by key, and combine adjacent records in the same group. This
> does not work in my case due to the large number of groups - the records
> in the buffer are most likely to all belong to different groups. The
> "combine" phase therefore becomes a noop turning a single RecordT into
> an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.
>
> Is there a way of modelling this computation efficiently with the
> DataSet API? Alternatively, can I turn this into a DataStream job? (The
> implementation there would simply be a MapFunction on a KeyedStream with
> the AggregateT residing in keyed state, although I don't know how I
> would emit this state at the end of the data stream only.)
>
> Thanks,
> Urs
>
> --
> Urs Schönenberger
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


DataSet: combineGroup/reduceGroup with large number of groups

2017-06-16 Thread Urs Schoenenberger
Hi,

I'm working on a batch job (roughly 10 billion records of input, 10
million groups) that is essentially a 'fold' over each group, that is, I
have a function

AggregateT addToAggrate(AggregateT agg, RecordT record) {...}

and want to fold this over each group in my DataSet.

My understanding is that I cannot use .groupBy(0).reduce(...) since the
ReduceFunction only supports the case where AggregateT is the same as
RecordT.

A simple solution using .reduceGroup(...) works, but spills all input
data in the reduce step, which produces a lot of slow & expensive Disk IO.

Therefore, we tried using .combineGroup(...).reduceGroup(...), but
experienced a similar amount of spilling. Checking the source of the
*Combine drivers, it seems that they accumulate events in a buffer, sort
the buffer by key, and combine adjacent records in the same group. This
does not work in my case due to the large number of groups - the records
in the buffer are most likely to all belong to different groups. The
"combine" phase therefore becomes a noop turning a single RecordT into
an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.

Is there a way of modelling this computation efficiently with the
DataSet API? Alternatively, can I turn this into a DataStream job? (The
implementation there would simply be a MapFunction on a KeyedStream with
the AggregateT residing in keyed state, although I don't know how I
would emit this state at the end of the data stream only.)

Thanks,
Urs

-- 
Urs Schönenberger
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Biplob Biswas
Hi Kostas,

Thanks for that suggestion, I would try that next, I have out of order
events on one of my Kafka topics and that's why I am using
BoundedOutOfOrdernessTimestampExtractor(), now that this doesn't work as
expected I would try to work with the Base class as you suggested. 

Although this behaviour is not at all consistent and not what I was
expecting, will update with the results which I get from my experiments.

Thanks again,
Biplob





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13807.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Kostas Kloudas
Hi Biplob,

If  you know what you want, you can always write your custom 
AssignerWithPeriodicWatermarks that does your job. If you want
to just increase the watermark, you could simply check if you have
received any elements and if not, emit a watermark with the timestamp
of the previous watermark + X.

Kostas

> On Jun 16, 2017, at 3:28 PM, Biplob Biswas  wrote:
> 
> Hi Kostas,
> 
> Thanks for the reply, makes things a bit more clear.
> 
> Also, I went through this link and it is something similar I am trying to
> observe. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-td9371.html
> 
> I am checking for timed out events and when I am using event time, its
> behaviour is non-deterministic. For one pattern it's generating a few
> 'matched events' and for a different pattern no 'matched events'. And almost
> no timedout events in any of the patterns unless I run the series of mock
> events a second time, during which I get a series of anomaly events. 
> 
> I had a topic created with this issue but I didn't get any satisfactory
> solutions there, so was testing it with processing time whether it works
> even or not. 
> 
> https://gist.github.com/revolutionisme/cf675ceee1492b93be020d4526bc9d38
> https://gist.github.com/revolutionisme/38578e631f7a15f02cb2488f9fe56c76
> 
> I would really like to know how to increment the watermark without any
> events coming in, such that at least the timedout events are emitted by the
> system. 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13800.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-06-16 Thread ninad
Hi Aljoscha,

I gather you guys aren't able to reproduce this. 

Here are the answers to your questions:

How do you ensure that you only shut down the brokers once Flink has read
all the data that you expect it to read

Ninad: I am able to see the number of messages received on the Flink Job UI.

And, how do you ensure that the offset that Flink checkpoints in step 3) is
the offset that corresponds to the end of your test data.

Ninad: I haven't explicitly verified which offsets were checkpointed. When I
say that a checkpoint was successful, I am referring to the Flink logs. So,
as long as Flink says that my last successful checkpoint was #7. And on
recovery, it restores it's state of checkpoint #7.


What is the difference between steps 3) and 5)?

Ninad: I didn't realize that windows are merged eagerly. I have a session
window with interval of 30 secs. Once I see from the UI that all the
messages have been received, I don't see the following logs for 30 secs. So
that's why I thought that the windows are merged once the window trigger is
fired.

Ex:

I verified from the UI that all messages were received. 

I then see this checkpoint in the logs:
2017-06-01 20:21:49,012 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   - Notification
of complete checkpoint for task TriggerWindow(ProcessingTimeSessionWindows
(3),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)
) -> Sink: sink.http.sep (1/1)


I then see the windows being merged after a few seconds:

2017-06-01 20:22:14,300 DEBUG
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet  -
Merging [TimeWindow{start=1496348534287, end=1496348564287},
TimeWindow{start=1496348534300, end=1496348564300}] into
TimeWindow{start=1496348534287, end=1496348564300}


So, point 3 is referring to these logs "MergingWindowSet - Merging .."
And point 4 is referring to the data in windows being evaluated.

Hope this helps. Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13805.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How choose between YARN/Mesos/StandAlone Flink

2017-06-16 Thread Chesnay Schepler

Hello,

just quickly chiming in for clarification/correction purpose:

Flink can work in multi-node environments without yarn/mesos. If you are 
only starting out,
or have short-lived workloads (i.e. a job that does not have to run for 
days/weeks),

I would recommend standalone mode for easy-of-entry.

Also, Flink does not use resource managers to distribute tasks, but to 
manage Job- and TaskManagers,

which in standalone mode you have to do yourself.

On 16.06.2017 15:37, Biplob Biswas wrote:

Hi Andrea,

If you are using Flink for research and/or testing purpose, standalone Flink
is more or less sufficient. Although if you have a huge amount of data, it
may take forever to process data with only one node/machine and that's where
a cluster would be needed. A yarn and mesos cluster could provide you high
availability and fault tolerance so that you don't lose your data if
something happens to one of the nodes in your cluster setup. Also, AFAIK,
flink relies on a resource manager like Yarn or Mesos to distribute the task
between multiple nodes so that you don't have to worry about that
distribution.

For the rest, I would like the experts here to correct me and add more info
here.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-choose-between-YARN-Mesos-StandAlone-Flink-tp13793p13801.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Unable to use Flink RocksDB state backend due to endianness mismatch

2017-06-16 Thread Stefan Richter
Hi,

we would also like to update to the latest RocksDB and drop FRocksDB 
altogether. But unfortunately, newer versions of RocksDB have issues with 
certain features in the Java API that we use in Flink, for example this one 
https://github.com/facebook/rocksdb/issues/1964 
. Once those problems are 
fixed, we can consider upgrading.

Best,
Stefan 

> Am 15.06.2017 um 18:23 schrieb Ziyad Muhammed :
> 
> Hi Stefan
> 
> I could solve the issue by building frocksdb with a patch for ppc 
> architecture. How ever this patch is already applied to the latest version of 
> rocksdb, where as frocksdb seems not updated in a while. It would be nice to 
> have it updated with this patch.
> 
> Thanks
> Ziyad
> 
> 
> On Tue, Jun 6, 2017 at 10:01 AM, Stefan Richter  > wrote:
> Hi,
> 
> RocksDB is a native library with JNI binding. It is included as a dependency 
> and does not build from source when you build Flink. The included jar 
> provides native code for Linux, OSX, and Windows on x86-64. From the 
> Exception, I would conclude you are using a different CPU architecture that 
> is big endian. In this case, you would have to compile RocksDB yourself on 
> your native platform and replace the jar that ships with Flink.
> 
> Best,
> Stefan
> 
>> Am 03.06.2017 um 14:16 schrieb Ziyad Muhammed > >:
>> 
>> Dear all,
>> 
>> My Flink Job reads from a kafka topic and store the data in a RocksDB state 
>> backend, in order to make use of the queryable state. I'm able to run the 
>> job and query the state in my local machine. But when deploying on the 
>> cluster, I'm getting the below error:
>> 
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
>> execution failed.
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>> at 
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at 
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> Caused by: java.lang.IllegalStateException: Could not initialize keyed state 
>> backend.
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:666)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:654)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.lang.Exception: Could not load the native RocksDB library
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:483)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:235)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:785)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
>> ... 6 more
>> 
>> Caused by: java.lang.UnsatisfiedLinkError: 
>> /data/hadoop/tmp/rocksdb-lib-32bc6a5551b331596649309a808b287d/librocksdbjni-linux64.so:
>>  
>> /data/hadoop/tmp/rocksdb-lib-32bc6a5551b331596649309a808b287d/librocksdbjni-linux64.so:
>>  ELF file data encoding not big-endian (Possible cause: endianness mismatch)
>> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>> at java.lang.Runtime.load0(Runtime.java:809)
>> at java.lang.System.load(System.java:1086)
>> at 
>> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>> at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>> 

Re: Kafka and Flink integration

2017-06-16 Thread nragon
My custom object is used across all job, so it'll be part of checkpoints. Can
you point me some references with some examples?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13802.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How choose between YARN/Mesos/StandAlone Flink

2017-06-16 Thread Biplob Biswas
Hi Andrea,

If you are using Flink for research and/or testing purpose, standalone Flink
is more or less sufficient. Although if you have a huge amount of data, it
may take forever to process data with only one node/machine and that's where
a cluster would be needed. A yarn and mesos cluster could provide you high
availability and fault tolerance so that you don't lose your data if
something happens to one of the nodes in your cluster setup. Also, AFAIK,
flink relies on a resource manager like Yarn or Mesos to distribute the task
between multiple nodes so that you don't have to worry about that
distribution.

For the rest, I would like the experts here to correct me and add more info
here.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-choose-between-YARN-Mesos-StandAlone-Flink-tp13793p13801.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Biplob Biswas
Hi Kostas,

Thanks for the reply, makes things a bit more clear.

Also, I went through this link and it is something similar I am trying to
observe. 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-td9371.html

I am checking for timed out events and when I am using event time, its
behaviour is non-deterministic. For one pattern it's generating a few
'matched events' and for a different pattern no 'matched events'. And almost
no timedout events in any of the patterns unless I run the series of mock
events a second time, during which I get a series of anomaly events. 

I had a topic created with this issue but I didn't get any satisfactory
solutions there, so was testing it with processing time whether it works
even or not. 

https://gist.github.com/revolutionisme/cf675ceee1492b93be020d4526bc9d38
https://gist.github.com/revolutionisme/38578e631f7a15f02cb2488f9fe56c76

I would really like to know how to increment the watermark without any
events coming in, such that at least the timedout events are emitted by the
system. 




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13800.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Add custom configuration files to TMs classpath on YARN

2017-06-16 Thread Mikhail Pryakhin
Hi all,

I run my flink job on yarn cluster and need to supply job configuration 
parameters via configuration file alongside with the job jar. (configuration 
file can't be packaged into jobs jar file).
I tried to put the configuration file into the folder that is passed via 
--yarnship option to the flink run command, then this file is copied to the 
yarn cluster and added to JVM class path like 'path/application.conf' but is 
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the ENV_FLINK_CLASSPATH 
is built and haven't found any option to to tell flink (YarnClusterDescriptor 
especially) to add my configuration file to the TM JVM classpath... Is there 
any way to do so? If not do you consider to have such an ability to add files? 
(like in spark I just can pass any files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin











smime.p7s
Description: S/MIME cryptographic signature


Re: Latency and Throughput

2017-06-16 Thread Chesnay Schepler

Hello,

You don't have to measure anything yourself, since Flink exposes 
throughput/latency metrics as described in the System metrics/latency 
tracking sections of the metrics documentation.


You only have to setup a reporter that fetches these metrics (see the 
reporter section) and calculate sum/averages across the job.


Regards,
Chesnay

On 16.06.2017 14:42, Paolo Cristofanelli wrote:

Hi,
it is my first question that I am asking in this mailing list, so I 
hope you would forgive me if I miss something.
I have started using flink recently, and now I would like to compute 
some statistics, like throughput and latency, for my programs.
I was reading this URL from the documentation ( 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html 
), but I do not understand how to use meters and how to measure 
latency from this classes.
I have also asked on stack overflow. ( 
https://stackoverflow.com/questions/44587645/throughput-and-latency-on-apache-flink 
)


Thank for you time,
Best Regards
Paolo





Latency and Throughput

2017-06-16 Thread Paolo Cristofanelli
Hi,
it is my first question that I am asking in this mailing list, so I hope
you would forgive me if I miss something.
I have started using flink recently, and now I would like to compute some
statistics, like throughput and latency, for my programs.
I was reading this URL from the documentation (
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
), but I do not understand how to use meters and how to measure latency
from this classes.
I have also asked on stack overflow. (
https://stackoverflow.com/questions/44587645/throughput-and-latency-on-apache-flink
)

Thank for you time,
Best Regards
Paolo


Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Kostas Kloudas
Hi Biplob,

With processing time there are no watermarks in the stream. 
The problem that you are seeing is because in processing time, the CEP
library expects the “next” element to come, in order to investigate if 
some of the patterns have timed-out.

Kostas

> On Jun 16, 2017, at 1:29 PM, Biplob Biswas  wrote:
> 
> Hi,
> 
> I am having some issues with FlinkCEP again. This time I am using processing
> time for my CEP job where I am reading from multiple kafka topics and using
> the pattern API to create a rule. I am outputting both, the matched events
> as well as timeout events.
> 
> Now my problem is, I am sending some event over one of the topics such that
> subsequent events wouldn't be generated within the time specified and I
> expect a timed out event.
> 
> But it is not generating the timed out event even after 2 minutes (specified
> interval) and it's only generating the previous timed out events when I am
> sending an extra message over the kafka topic. 
> 
> I am not sure why is this happening, for example:
> 
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da5 []
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da2 []]} @ 1497612386342
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da1 []]} @ 1497612386342
> 
> in the example above, the anomaly events are generated only after sending
> the event with event id - 044023a4-edec-439c-b221-806740972da5 
> 
> and that too the anomaly event for this particular event is not generated.
> 
> I suspected that the watermark was not updated automatically for the last
> event and it's only updated when a new event comes in the system. So, I
> added the setAutoWatermarkInterval(1000) to the code but no avail.
> 
> 
> Thanks & Regards,
> Biplob Biswas
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Kafka and Flink integration

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi!

It’s usually always recommended to register your classes with Kryo, to avoid 
the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really 
beats specific custom serialization. So, you can also register specific 
serializers for Kryo to use for the type.
If you need to store these custom objects as managed state for your operators, 
you can also have your own custom Flink TypeSerializer for that.

Best,
Gordon

On 16 June 2017 at 12:27:06 PM, nragon (nuno.goncal...@wedotechnologies.com) 
wrote:

I have to produce custom objects into kafka and read them with flink. Any  
tuning advices to use kryo? Such as class registration or something like  
that? Any examples?  

Thanks  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Flink CEP not emitting timed out events properly

2017-06-16 Thread Biplob Biswas
Hi,

I am having some issues with FlinkCEP again. This time I am using processing
time for my CEP job where I am reading from multiple kafka topics and using
the pattern API to create a rule. I am outputting both, the matched events
as well as timeout events.

Now my problem is, I am sending some event over one of the topics such that
subsequent events wouldn't be generated within the time specified and I
expect a timed out event.

But it is not generating the timed out event even after 2 minutes (specified
interval) and it's only generating the previous timed out events when I am
sending an extra message over the kafka topic. 

I am not sure why is this happening, for example:

2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []
2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []
2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da5 []
2> Anomaly Events: {first=[RecordReadEventType 1483278179000
044023a4-edec-439c-b221-806740972da2 []]} @ 1497612386342
2> Anomaly Events: {first=[RecordReadEventType 1483278179000
044023a4-edec-439c-b221-806740972da1 []]} @ 1497612386342

in the example above, the anomaly events are generated only after sending
the event with event id - 044023a4-edec-439c-b221-806740972da5 

and that too the anomaly event for this particular event is not generated.

I suspected that the watermark was not updated automatically for the last
event and it's only updated when a new event comes in the system. So, I
added the setAutoWatermarkInterval(1000) to the code but no avail.


Thanks & Regards,
Biplob Biswas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


How choose between YARN/Mesos/StandAlone Flink

2017-06-16 Thread AndreaKinn
Hi,
I browsed Flink documentation but I don't find a deep comparison between the
feature of Flink in standalone deployment/YARN/Mesos except technical guides
to setup them. 

I'm a newbie in cluster computing so I have never used YARN or Mesos. I've
just learned something about their functionalities on google. Anyway I would
like to understand how can I choose which deployment mode I have to use. 
Apparently it seems that with YARN or Mesos I will have automatic failure
recovery but if this is true... which are the reason to setup Flink without
them?

I would to develop a cluster where a stream computing is performed, I use
Kafka as source and Cassandra as sink of data so the cluster is just a
real-time processor which doesn't store data: they just pass through it. 
There are no pre-existing cluster, I have research interest so I would to
understand better setup to fit my purpose.

Can you explain me how to choose between these cluster setup or provide me
some links to learn it?

Thanks in advance,
Andrea





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-choose-between-YARN-Mesos-StandAlone-Flink-tp13793.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Kafka and Flink integration

2017-06-16 Thread nragon
I have to produce custom objects into kafka and read them with flink. Any
tuning advices to use kryo? Such as class registration or something like
that? Any examples?

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Guava version conflict

2017-06-16 Thread Tzu-Li (Gordon) Tai
That’s actually what I’m trying to figure out right now, what had changed since 
then ...


On 16 June 2017 at 12:23:57 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1. Didn't 
you say that you also had non-shaded guava dependencies in the flink dist jar 
some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai  
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai  wrote:
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio

Re: Guava version conflict

2017-06-16 Thread Flavio Pompermaier
I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1.
Didn't you say that you also had non-shaded guava dependencies in the flink
dist jar some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Flavio,
>
> I was just doing some end-to-end rebuild Flink + cluster execution with ES
> sink tests, and it seems like the Guava shading problem isn’t there anymore
> in the flink-dist jar.
>
> On the `release-1.3` branch, built with Maven 3.0.5, the Guava
> dependencies in flink-dist are all properly shaded.
> Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and
> ES 2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all
> worked fine without the Guava conflict issue.
>
> So I’m pretty sure that if the problem still exists for you, the conflict
> would have came from other dependencies in your code.
>
> Cheers,
> Gordon
>
>
> On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it)
> wrote:
>
> Hi Gordon,
> any news on this?
>
> On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> This seems like a shading problem then.
>> I’ve tested this again with Maven 3.0.5, even without building against
>> CDH Hadoop binaries the flink-dist jar contains non-shaded Guava
>> dependencies.
>>
>> Let me investigate a bit and get back to this!
>>
>> Cheers,
>> Gordon
>>
>>
>> On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it)
>> wrote:
>>
>> On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local
>> repo I did:
>>
>>1. git clone https://github.com/apache/flink.git && cd flink && git
>>checkout tags/release-1.2.1
>>2. /opt/devel/apache-*maven-3.3.9*/bin/mvn clean install
>>-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0
>>-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
>>3. cd flink-dist
>>4. /opt/devel/apache-maven-3.3.9/bin/mvn clean install
>>-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0
>>-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
>>5. jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar
>> | grep MoreExecutors
>>
>> And I still see guava dependencies:
>>
>> org/apache/flink/hadoop/shaded/com/google/common/util/concur
>> rent/MoreExecutors$1.class
>> org/apache/flink/hadoop/shaded/com/google/common/util/concur
>> rent/MoreExecutors$SameThreadExecutorService.class
>> org/apache/flink/hadoop/shaded/com/google/common/util/concur
>> rent/MoreExecutors$ListeningDecorator.class
>> org/apache/flink/hadoop/shaded/com/google/common/util/concur
>> rent/MoreExecutors$ScheduledListeningDecorator.class
>> org/apache/flink/hadoop/shaded/com/google/common/util/concur
>> rent/MoreExecutors.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$1.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$2.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$3.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$4.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$Application$1.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$Application.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$DirectExecutor.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$DirectExecutorService.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$ListeningDecorator.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$ScheduledListeningDecorator$NeverSuccessfulListe
>> nableFutureTask.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors$ScheduledListeningDecorator.class
>> org/apache/flink/shaded/com/google/common/util/concurrent/Mo
>> reExecutors.class
>> com/google/common/util/concurrent/MoreExecutors$1.class
>> com/google/common/util/concurrent/MoreExecutors$SameThreadEx
>> ecutorService.class
>> com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
>> com/google/common/util/concurrent/MoreExecutors$ScheduledLis
>> teningDecorator.class
>> com/google/common/util/concurrent/MoreExecutors.class
>>
>> It seems that it mix togheter guava 11 (probably coming from CDH
>> dependencies) and guava 18 classes.
>>
>> Also using *maven 3.0.5* lead to the same output :(
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Yes, those should not be in the flink-dist jar, so the root reason
>>> should be that the shading isn’t working properly for your custom build.
>>>
>>> If possible, could you try building Flink again with a lower Maven
>>> versi

Re: Guava version conflict

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai  
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai  wrote:
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

What I did was to take the sources of the new ES connector and I took them into 
my code.
Flink was compiled with maven 3.3+ but I did the double compilation as 
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorServi

Re: Using Custom Partitioner in Streaming with parallelism 1 adding latency

2017-06-16 Thread Aljoscha Krettek
Hi,

These two documentation pages might be interesting:
 - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html
 

 - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups
 


Flink will chain operations together whenever this is possible and the basic 
prerequisites for this are: the parallelism is the same and the connection 
pattern is “forwarding”, i.e. there is no broadcast, shuffle, or custom 
partitioning scheme.

Best,
Aljoscha 
> On 16. Jun 2017, at 06:42, sohimankotia  wrote:
> 
> You are right Aljoscha . Jog graph is splitted after introducing partitioner
> .
> 
> I was under impression that If parallelism is set everything will be chained
> together .
> Can you explain how data will flow for  map -> partitioner -> flatmap  if
> parallelism or It would be great if point me to right documentation ?
> 
> Can is this possible it can add more than 100 seconds delay if partitioner
> is added ?
> 
> Sorry for dumb question but I did not find any detailed documentation on
> flink website . Most of links
> https://cwiki.apache.org/confluence/display/FLINK/Parallelism+and+Scheduling
> are also empty as of now .
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Custom-Partitioner-in-Streaming-with-parallelism-1-adding-latency-tp13766p13774.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: User self resource file.

2017-06-16 Thread Aljoscha Krettek
Yes, this is what I’m suggesting. I think you could clear the path when the 
operator/function shuts down, i.e. in the close() method.

> On 15. Jun 2017, at 14:25, yunfan123  wrote:
> 
> So your suggestion is I create an archive of all the file in the resources.
> Then I get the distributed cache of this file and extracted it to a path.
> Use this path as my resource path?
> But in which time I should clear the temp path?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-self-resource-file-tp13693p13765.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Streaming use case: Row enrichment

2017-06-16 Thread Flavio Pompermaier
Understood..Thanks anyway Aljoscha!

On Fri, Jun 16, 2017 at 11:55 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> The problem with that is that the file is being read by (possibly, very
> likely) multiple operators in parallel. The file source works like this:
> there is a ContinuousFileMonitoringFunction (this is an actual Flink
> source) that monitors a directory and when a new file appears sends several
> (non overlapping) input splits downstream. After the source, there is an
> operator (ContinuousFileReaderOperator) that receives splits, reads the
> contents of the file at the split and sends it downstream. There is thus no
> central point where we would know that a file was completely processed.
>
> Best,
> Aljoscha
>
> On 16. Jun 2017, at 11:26, Flavio Pompermaier 
> wrote:
>
> Is it really necessary to wait for the file to reach the end of the
> pipeline? Isn't sufficient to know that it has been read and the source
> operator has been checkpointed (I don't know if I'm using this word
> correctly...I mean that all the file splits has been processed and Flink
> won't reprocess them anymore).
>
> Best,
> Flavio
>
> On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I was referring to
>>
>> StreamExecutionEnvironment.readFile(
>>   FileInputFormat inputFormat,
>>   String filePath,
>>   FileProcessingMode watchType,
>>   long interval)
>>
>> Where you can specify whether the source should shutdown once all files
>> have been had (PROCESS_ONCE) or whether the source should continue to
>> monitor the input directory for new files (PROCESS_CONTINUOUSLY).
>>
>> I think there is currently no built-in way of removing files from the
>> input directory once they have been processed because it’s not possible to
>> know when the contents of a given files have passed through the complete
>> pipeline.
>>
>> Best,
>> Aljoscha
>>
>> On 15. Jun 2017, at 20:00, Flavio Pompermaier 
>> wrote:
>>
>> Hi Aljosha,
>> thanks for the great suggestions, I wasn't aware of
>> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
>> Most probably that's exactly what I was looking for...(I should just have
>> the time to test it.
>> Just one last question: what are you referring to with "you could use a
>> different readFile() method where you can specify  that you want to
>> continue monitoring the directory for new files"?  Is there a way to delete
>> or move to a backup dir the new input files once enriched?
>>
>> Best Flavio
>>
>>
>>
>> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Ok, just trying to make sure I understand everything: You have this:
>>>
>>> 1. A bunch of data in HDFS that you want to enrich
>>> 2. An external service (Solr/ES) that you query for enriching the data
>>> rows stored in 1.
>>> 3. You need to store the enriched rows in HDFS again
>>>
>>> I think you could just do this (roughly):
>>>
>>> StreamExecutionEnvironment env = …;
>>>
>>> DataStream input = env.readFile(new RowCsvInputFormat(…), “>> path>”);
>>>
>>> DataStream enriched = input.flatMap(new MyEnricherThatCallsES());
>>> // or
>>> DataStream enriched = AsyncDataStream.unorderedWait(input, …) //
>>> yes, the interface for this is a bit strange
>>>
>>> BucketingSink sink = new BucketingSink(“”);
>>> // this is responsible for putting files into buckets, so that you don’t
>>> have to many small HDFS files
>>> sink.setBucketer(new MyBucketer());
>>> enriched.addSink(sink)
>>>
>>> In this case, the file source will close once all files are read and the
>>> job will finish. If you don’t want this you can also use a different
>>> readFile() method where you can specify  that you want to continue
>>> monitoring the directory for new files.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Jun 2017, at 17:38, Flavio Pompermaier 
>>> wrote:
>>>
>>> Hi Aljosha,
>>> thanks for getting back to me on this! I'll try to simplify the thread
>>> starting from what we want to achieve.
>>>
>>> At the moment we execute some queries to a db and we store the data into
>>> Parquet directories (one for each query).
>>> Let's say we then create a DataStream from each dir, what we would
>>> like to achieve is to perform some sort of throttling of the queries to
>>> perfrom to this external service (in order to not overload it with too many
>>> queries...but we also need to run as much queries as possible in order to
>>> execute this process in a reasonable time).
>>>
>>> The current batch process has the downside that you must know at priori
>>> the right parallelism of the job while the streaming process should be able
>>> to rescale when needed [1] so it should be easier to tune the job
>>> parallelism without loosing all the already performed queries [2].
>>> Moreover, it the job crash you loose all the work done up to that moment
>>> because there's no checkpointing...
>>> My initial idea was to read from HDFS and put the data into Kafka to be
>>> able to change the number of consumers at runtime (accordingly to the
>>> maxmi

Re: Stateful streaming question

2017-06-16 Thread Aljoscha Krettek
I think it might be possible to do but I’m not aware of anyone working on that 
and I haven’t seen anyone on the mailing lists express interest in working on 
that.

> On 16. Jun 2017, at 11:31, Flavio Pompermaier  wrote:
> 
> Ok thanks for the clarification. Do you think it could be possible (sooner or 
> later) to have in Flink some sort of synchronization between jobs (as in this 
> case where the input datastream should be "paused" until the second job 
> finishes)? I know I coould use something like Oozie or Falcon to orchestrate 
> jobs but I'd prefer to avoid to add them to our architecture..
> 
> Best,
> Flavio
> 
> On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> I’m afraid not. You would have to wait for one job to finish before starting 
> the next one.
> 
> Best,
> Aljoscha
>> On 15. Jun 2017, at 20:11, Flavio Pompermaier > > wrote:
>> 
>> Hi Aljoscha,
>> we're still investigating possible solutions here. Yes, as you correctly 
>> said there are links between data of different keys so we can only proceed 
>> with the next job only once we are sure at 100% that all input data has been 
>> consumed and no other data will be read until this last jobs ends.
>> There should be some sort of synchronization between these 2 jobs...is that 
>> possible right now in Flink?
>> 
>> Thanks a lot for the support,
>> Flavio
>> 
>> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek > > wrote:
>> Hi,
>> 
>> Trying to revive this somewhat older thread: have you made any progress? I 
>> think going with a ProcessFunction that keeps all your state internally and 
>> periodically outputs to, say, Elasticsearch using a sink seems like the way 
>> to go? You can do the periodic emission using timers in the ProcessFunction. 
>> 
>> In your use case, does the data you would store in the Flink managed state 
>> have links between data of different keys? This sounds like it could be a 
>> problem when it comes to consistency when outputting to an external system.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 17. May 2017, at 14:12, Flavio Pompermaier >> > wrote:
>>> 
>>> Hi to all,
>>> there are a lot of useful discussion points :)
>>> 
>>> I'll try to answer to everybody.
>>> 
>>> @Ankit: 
>>> right now we're using Parquet on HDFS to store thrift objects. Those 
>>> objects are essentially structured like
>>> key
>>> alternative_key
>>> list of tuples (representing the state of my Object)
>>> This model could be potentially modeled as a Monoid and it's very well 
>>> suited for a stateful streaming computation where updates to a single key 
>>> state are not as expansive as a call to any db to get the current list of 
>>> tuples and update back that list with for an update (IMHO). Maybe here I'm 
>>> overestimating Flink streaming capabilities...
>>> serialization should be ok using thrift, but Flink advice to use tuples to 
>>> have better performance so just after reading the data from disk (as a 
>>> ThriftObject) we convert them to its equivalent representation as 
>>> Tuple3> version
>>> Since I currently use Flink to ingest data that (in the end) means adding 
>>> tuples to my objects, it would be perfect to have an "online" state of the 
>>> grouped tuples in order to:
>>> add/remove tuples to my object very quickly
>>> from time to time, scan the whole online data (or a part of it) and 
>>> "translate" it into one ore more JSON indices (and put them into 
>>> Elasticsearch)
>>> @Fabian:
>>> You're right that batch processes are bot very well suited to work with 
>>> services that can fail...if in a map function the remote call fails all the 
>>> batch job fails...this should be less problematic with streaming because 
>>> there's checkpointing and with async IO  is should be the possibile to add 
>>> some retry/backoff policies in order to not overload remote services like 
>>> db or solr/es indices (maybe it's not already there but it should be 
>>> possible to add). Am I wrong?
>>> 
>>> @Kostas:
>>> 
>>> From what I understood Queryable state is usefult for gets...what if I need 
>>> to scan the entire db? For us it could be better do periodically dump the 
>>> state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe 
>>> to start a batch job that reads the dumped data while, in the meantime, a 
>>> possible update of this dump could happen...is there any potential problem 
>>> to data consistency (indeed tuples within grouped objects have references 
>>> to other objects keys)?
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas 
>>> mailto:k.klou...@data-artisans.com>> wrote:
>>> Hi Flavio,
>>> 
>>> For setting the retries, unfortunately there is no such setting yet and, if 
>>> I am not wrong, in case of a failure of a request, 
>>> an exception will be thrown and the job will restart. I am also including 
>>> Till in the thread as 

Re: Streaming use case: Row enrichment

2017-06-16 Thread Aljoscha Krettek
Hi,

The problem with that is that the file is being read by (possibly, very likely) 
multiple operators in parallel. The file source works like this: there is a 
ContinuousFileMonitoringFunction (this is an actual Flink source) that monitors 
a directory and when a new file appears sends several (non overlapping) input 
splits downstream. After the source, there is an operator 
(ContinuousFileReaderOperator) that receives splits, reads the contents of the 
file at the split and sends it downstream. There is thus no central point where 
we would know that a file was completely processed.

Best,
Aljoscha

> On 16. Jun 2017, at 11:26, Flavio Pompermaier  wrote:
> 
> Is it really necessary to wait for the file to reach the end of the pipeline? 
> Isn't sufficient to know that it has been read and the source operator has 
> been checkpointed (I don't know if I'm using this word correctly...I mean 
> that all the file splits has been processed and Flink won't reprocess them 
> anymore).
> 
> Best,
> Flavio
> 
> On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> I was referring to
> 
> StreamExecutionEnvironment.readFile(
>   FileInputFormat inputFormat,
>   String filePath,
>   FileProcessingMode watchType,
>   long interval)
> 
> Where you can specify whether the source should shutdown once all files have 
> been had (PROCESS_ONCE) or whether the source should continue to monitor the 
> input directory for new files (PROCESS_CONTINUOUSLY).
> 
> I think there is currently no built-in way of removing files from the input 
> directory once they have been processed because it’s not possible to know 
> when the contents of a given files have passed through the complete pipeline.
> 
> Best,
> Aljoscha
> 
>> On 15. Jun 2017, at 20:00, Flavio Pompermaier > > wrote:
>> 
>> Hi Aljosha,
>> thanks for the great suggestions, I wasn't aware of 
>> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
>> Most probably that's exactly what I was looking for...(I should just have 
>> the time to test it. 
>> Just one last question: what are you referring to with "you could use a 
>> different readFile() method where you can specify  that you want to continue 
>> monitoring the directory for new files"?  Is there a way to delete or move 
>> to a backup dir the new input files once enriched?
>> 
>> Best Flavio
>> 
>> 
>> 
>> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek > > wrote:
>> Ok, just trying to make sure I understand everything: You have this:
>> 
>> 1. A bunch of data in HDFS that you want to enrich
>> 2. An external service (Solr/ES) that you query for enriching the data rows 
>> stored in 1.
>> 3. You need to store the enriched rows in HDFS again
>> 
>> I think you could just do this (roughly):
>> 
>> StreamExecutionEnvironment env = …;
>> 
>> DataStream input = env.readFile(new RowCsvInputFormat(…), “> path>”);
>> 
>> DataStream enriched = input.flatMap(new MyEnricherThatCallsES());
>> // or
>> DataStream enriched = AsyncDataStream.unorderedWait(input, …) // yes, 
>> the interface for this is a bit strange
>> 
>> BucketingSink sink = new BucketingSink(“”);
>> // this is responsible for putting files into buckets, so that you don’t 
>> have to many small HDFS files
>> sink.setBucketer(new MyBucketer());
>> enriched.addSink(sink)
>> 
>> In this case, the file source will close once all files are read and the job 
>> will finish. If you don’t want this you can also use a different readFile() 
>> method where you can specify  that you want to continue monitoring the 
>> directory for new files.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Jun 2017, at 17:38, Flavio Pompermaier >> > wrote:
>>> 
>>> Hi Aljosha,
>>> thanks for getting back to me on this! I'll try to simplify the thread 
>>> starting from what we want to achieve.
>>> 
>>> At the moment we execute some queries to a db and we store the data into 
>>> Parquet directories (one for each query). 
>>> Let's say we then create a DataStream from each dir, what we would 
>>> like to achieve is to perform some sort of throttling of the queries to 
>>> perfrom to this external service (in order to not overload it with too many 
>>> queries...but we also need to run as much queries as possible in order to 
>>> execute this process in a reasonable time). 
>>> 
>>> The current batch process has the downside that you must know at priori the 
>>> right parallelism of the job while the streaming process should be able to 
>>> rescale when needed [1] so it should be easier to tune the job parallelism 
>>> without loosing all the already performed queries [2]. Moreover, it the job 
>>> crash you loose all the work done up to that moment because there's no 
>>> checkpointing...
>>> My initial idea was to read from HDFS and put the data into Kafka to be 
>>> able to change the number of consumers at runtime (accordingly to the 
>>> maxmimum 

Re: Question about the custom partitioner

2017-06-16 Thread Xingcan Cui
Hi Aljoscha,

Thanks for your explanation. I'll try what you suggests.

Best,
Xingcan

On Fri, Jun 16, 2017 at 5:19 PM, Aljoscha Krettek 
wrote:

> Hi,
>
> I’m afraid that’s not possible out-of-box with the current APIs. I
> actually don’t know why the user-facing Partitioner only allows returning
> one target because the internal StreamPartitioner (which extends
> ChannelSelector) allows returning multiple target partitions.
>
> You can hack around the API by manually creating your own
> StreamPartitioner and applying it to the DataStream as
> DataStream.partitionCustom() and DataStream.setConnectionType() (the first
> calls the latter) do.
>
> Best,
> Aljoscha
>
> On 14. Jun 2017, at 09:09, Xingcan Cui  wrote:
>
> Hi all,
>
> I want to duplicate records to multiple downstream tasks (not all of them
> thus the
> Broadcasting should not work) in stream environment.
> However, it seems that the current custom partitioner can return only one
> partition index.
> Why this restriction exists or do I miss something?
>
> Thanks,
> Xingcan
>
>
>


Re: Stateful streaming question

2017-06-16 Thread Flavio Pompermaier
Ok thanks for the clarification. Do you think it could be possible (sooner
or later) to have in Flink some sort of synchronization between jobs (as in
this case where the input datastream should be "paused" until the second
job finishes)? I know I coould use something like Oozie or Falcon to
orchestrate jobs but I'd prefer to avoid to add them to our architecture..

Best,
Flavio

On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I’m afraid not. You would have to wait for one job to finish before
> starting the next one.
>
> Best,
> Aljoscha
>
> On 15. Jun 2017, at 20:11, Flavio Pompermaier 
> wrote:
>
> Hi Aljoscha,
> we're still investigating possible solutions here. Yes, as you correctly
> said there are links between data of different keys so we can only proceed
> with the next job only once we are sure at 100% that all input data has
> been consumed and no other data will be read until this last jobs ends.
> There should be some sort of synchronization between these 2 jobs...is
> that possible right now in Flink?
>
> Thanks a lot for the support,
> Flavio
>
> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Trying to revive this somewhat older thread: have you made any progress?
>> I think going with a ProcessFunction that keeps all your state internally
>> and periodically outputs to, say, Elasticsearch using a sink seems like the
>> way to go? You can do the periodic emission using timers in the
>> ProcessFunction.
>>
>> In your use case, does the data you would store in the Flink managed
>> state have links between data of different keys? This sounds like it could
>> be a problem when it comes to consistency when outputting to an external
>> system.
>>
>> Best,
>> Aljoscha
>>
>> On 17. May 2017, at 14:12, Flavio Pompermaier 
>> wrote:
>>
>> Hi to all,
>> there are a lot of useful discussion points :)
>>
>> I'll try to answer to everybody.
>>
>> @Ankit:
>>
>>- right now we're using Parquet on HDFS to store thrift objects.
>>Those objects are essentially structured like
>>   - key
>>   - alternative_key
>>   - list of tuples (representing the state of my Object)
>>   - This model could be potentially modeled as a Monoid and it's
>>   very well suited for a stateful streaming computation where updates to 
>> a
>>   single key state are not as expansive as a call to any db to get the
>>   current list of tuples and update back that list with for an update 
>> (IMHO).
>>   Maybe here I'm overestimating Flink streaming capabilities...
>>- serialization should be ok using thrift, but Flink advice to use
>>tuples to have better performance so just after reading the data from disk
>>(as a ThriftObject) we convert them to its equivalent representation as
>>Tuple3> version
>>- Since I currently use Flink to ingest data that (in the end) means
>>adding tuples to my objects, it would be perfect to have an "online" state
>>of the grouped tuples in order to:
>>   - add/remove tuples to my object very quickly
>>   - from time to time, scan the whole online data (or a part of it)
>>   and "translate" it into one ore more JSON indices (and put them into
>>   Elasticsearch)
>>
>> @Fabian:
>> You're right that batch processes are bot very well suited to work with
>> services that can fail...if in a map function the remote call fails all the
>> batch job fails...this should be less problematic with streaming because
>> there's checkpointing and with async IO  is should be the possibile to add
>> some retry/backoff policies in order to not overload remote services like
>> db or solr/es indices (maybe it's not already there but it should be
>> possible to add). Am I wrong?
>>
>> @Kostas:
>>
>> From what I understood Queryable state is usefult for gets...what if I
>> need to scan the entire db? For us it could be better do periodically dump
>> the state to RocksDb or HDFS but, as I already said, I'm not sure if it is
>> safe to start a batch job that reads the dumped data while, in the
>> meantime, a possible update of this dump could happen...is there any
>> potential problem to data consistency (indeed tuples within grouped objects
>> have references to other objects keys)?
>>
>> Best,
>> Flavio
>>
>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> For setting the retries, unfortunately there is no such setting yet and,
>>> if I am not wrong, in case of a failure of a request,
>>> an exception will be thrown and the job will restart. I am also
>>> including Till in the thread as he may know better.
>>>
>>> For consistency guarantees and concurrency control, this depends on your
>>> underlying backend. But if you want to
>>> have end-to-end control, then you could do as Ankit suggested at his
>>> point 3), i.e have a single job for the whole pipeline
>>>  (if this fits your needs of course). This will allow you to set your
>>>

Re: Streaming use case: Row enrichment

2017-06-16 Thread Flavio Pompermaier
Is it really necessary to wait for the file to reach the end of the
pipeline? Isn't sufficient to know that it has been read and the source
operator has been checkpointed (I don't know if I'm using this word
correctly...I mean that all the file splits has been processed and Flink
won't reprocess them anymore).

Best,
Flavio

On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I was referring to
>
> StreamExecutionEnvironment.readFile(
>   FileInputFormat inputFormat,
>   String filePath,
>   FileProcessingMode watchType,
>   long interval)
>
> Where you can specify whether the source should shutdown once all files
> have been had (PROCESS_ONCE) or whether the source should continue to
> monitor the input directory for new files (PROCESS_CONTINUOUSLY).
>
> I think there is currently no built-in way of removing files from the
> input directory once they have been processed because it’s not possible to
> know when the contents of a given files have passed through the complete
> pipeline.
>
> Best,
> Aljoscha
>
> On 15. Jun 2017, at 20:00, Flavio Pompermaier 
> wrote:
>
> Hi Aljosha,
> thanks for the great suggestions, I wasn't aware of 
> AsyncDataStream.unorderedWait
> and BucketingSink setBucketer().
> Most probably that's exactly what I was looking for...(I should just have
> the time to test it.
> Just one last question: what are you referring to with "you could use a
> different readFile() method where you can specify  that you want to
> continue monitoring the directory for new files"?  Is there a way to delete
> or move to a backup dir the new input files once enriched?
>
> Best Flavio
>
>
>
> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek 
> wrote:
>
>> Ok, just trying to make sure I understand everything: You have this:
>>
>> 1. A bunch of data in HDFS that you want to enrich
>> 2. An external service (Solr/ES) that you query for enriching the data
>> rows stored in 1.
>> 3. You need to store the enriched rows in HDFS again
>>
>> I think you could just do this (roughly):
>>
>> StreamExecutionEnvironment env = …;
>>
>> DataStream input = env.readFile(new RowCsvInputFormat(…), “> path>”);
>>
>> DataStream enriched = input.flatMap(new MyEnricherThatCallsES());
>> // or
>> DataStream enriched = AsyncDataStream.unorderedWait(input, …) //
>> yes, the interface for this is a bit strange
>>
>> BucketingSink sink = new BucketingSink(“”);
>> // this is responsible for putting files into buckets, so that you don’t
>> have to many small HDFS files
>> sink.setBucketer(new MyBucketer());
>> enriched.addSink(sink)
>>
>> In this case, the file source will close once all files are read and the
>> job will finish. If you don’t want this you can also use a different
>> readFile() method where you can specify  that you want to continue
>> monitoring the directory for new files.
>>
>> Best,
>> Aljoscha
>>
>> On 6. Jun 2017, at 17:38, Flavio Pompermaier 
>> wrote:
>>
>> Hi Aljosha,
>> thanks for getting back to me on this! I'll try to simplify the thread
>> starting from what we want to achieve.
>>
>> At the moment we execute some queries to a db and we store the data into
>> Parquet directories (one for each query).
>> Let's say we then create a DataStream from each dir, what we would
>> like to achieve is to perform some sort of throttling of the queries to
>> perfrom to this external service (in order to not overload it with too many
>> queries...but we also need to run as much queries as possible in order to
>> execute this process in a reasonable time).
>>
>> The current batch process has the downside that you must know at priori
>> the right parallelism of the job while the streaming process should be able
>> to rescale when needed [1] so it should be easier to tune the job
>> parallelism without loosing all the already performed queries [2].
>> Moreover, it the job crash you loose all the work done up to that moment
>> because there's no checkpointing...
>> My initial idea was to read from HDFS and put the data into Kafka to be
>> able to change the number of consumers at runtime (accordingly to the
>> maxmimum parallelism we can achieve with the external service) but maybe
>> this could be done in a easier way (we've started using streaming from a
>> few time so we can see things more complicated than they are).
>>
>> Moreover, as the last step, we need to know when all the data has been
>> enriched so we can stop this first streaming job and we can start with the
>> next one (that cannot run if the acquisition job is still in progress
>> because it can break referential integrity). Is there any example of such a
>> use case?
>>
>> [1] at the moment manually..maybe automatically in the future, right?
>> [2] with the batch job if we want to change the parallelism we need to
>> stop it and relaunch it, loosing all the already enriched data because
>> there's no checkpointing there
>>
>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I’ll try and answer your

Re: Stateful streaming question

2017-06-16 Thread Aljoscha Krettek
Hi,

I’m afraid not. You would have to wait for one job to finish before starting 
the next one.

Best,
Aljoscha
> On 15. Jun 2017, at 20:11, Flavio Pompermaier  wrote:
> 
> Hi Aljoscha,
> we're still investigating possible solutions here. Yes, as you correctly said 
> there are links between data of different keys so we can only proceed with 
> the next job only once we are sure at 100% that all input data has been 
> consumed and no other data will be read until this last jobs ends.
> There should be some sort of synchronization between these 2 jobs...is that 
> possible right now in Flink?
> 
> Thanks a lot for the support,
> Flavio
> 
> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek  > wrote:
> Hi,
> 
> Trying to revive this somewhat older thread: have you made any progress? I 
> think going with a ProcessFunction that keeps all your state internally and 
> periodically outputs to, say, Elasticsearch using a sink seems like the way 
> to go? You can do the periodic emission using timers in the ProcessFunction. 
> 
> In your use case, does the data you would store in the Flink managed state 
> have links between data of different keys? This sounds like it could be a 
> problem when it comes to consistency when outputting to an external system.
> 
> Best,
> Aljoscha
> 
>> On 17. May 2017, at 14:12, Flavio Pompermaier > > wrote:
>> 
>> Hi to all,
>> there are a lot of useful discussion points :)
>> 
>> I'll try to answer to everybody.
>> 
>> @Ankit: 
>> right now we're using Parquet on HDFS to store thrift objects. Those objects 
>> are essentially structured like
>> key
>> alternative_key
>> list of tuples (representing the state of my Object)
>> This model could be potentially modeled as a Monoid and it's very well 
>> suited for a stateful streaming computation where updates to a single key 
>> state are not as expansive as a call to any db to get the current list of 
>> tuples and update back that list with for an update (IMHO). Maybe here I'm 
>> overestimating Flink streaming capabilities...
>> serialization should be ok using thrift, but Flink advice to use tuples to 
>> have better performance so just after reading the data from disk (as a 
>> ThriftObject) we convert them to its equivalent representation as 
>> Tuple3> version
>> Since I currently use Flink to ingest data that (in the end) means adding 
>> tuples to my objects, it would be perfect to have an "online" state of the 
>> grouped tuples in order to:
>> add/remove tuples to my object very quickly
>> from time to time, scan the whole online data (or a part of it) and 
>> "translate" it into one ore more JSON indices (and put them into 
>> Elasticsearch)
>> @Fabian:
>> You're right that batch processes are bot very well suited to work with 
>> services that can fail...if in a map function the remote call fails all the 
>> batch job fails...this should be less problematic with streaming because 
>> there's checkpointing and with async IO  is should be the possibile to add 
>> some retry/backoff policies in order to not overload remote services like db 
>> or solr/es indices (maybe it's not already there but it should be possible 
>> to add). Am I wrong?
>> 
>> @Kostas:
>> 
>> From what I understood Queryable state is usefult for gets...what if I need 
>> to scan the entire db? For us it could be better do periodically dump the 
>> state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe 
>> to start a batch job that reads the dumped data while, in the meantime, a 
>> possible update of this dump could happen...is there any potential problem 
>> to data consistency (indeed tuples within grouped objects have references to 
>> other objects keys)?
>> 
>> Best,
>> Flavio
>> 
>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas 
>> mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Flavio,
>> 
>> For setting the retries, unfortunately there is no such setting yet and, if 
>> I am not wrong, in case of a failure of a request, 
>> an exception will be thrown and the job will restart. I am also including 
>> Till in the thread as he may know better.
>> 
>> For consistency guarantees and concurrency control, this depends on your 
>> underlying backend. But if you want to 
>> have end-to-end control, then you could do as Ankit suggested at his point 
>> 3), i.e have a single job for the whole pipeline
>>  (if this fits your needs of course). This will allow you to set your own 
>> “precedence” rules for your operations.
>> 
>> Now finally, there is no way currently to expose the state of a job to 
>> another job. The way to do so is either Queryable
>> State, or writing to a Sink. If the problem for having one job is that you 
>> emit one element at a time, you can always group
>> elements together and emit downstream less often, in batches.
>>  
>> Finally, if  you need 2 jobs, you can always use a hybrid solution where you 
>> keep your current state in Flink, and you d

Re: Streaming use case: Row enrichment

2017-06-16 Thread Aljoscha Krettek
Hi,

I was referring to

StreamExecutionEnvironment.readFile(
  FileInputFormat inputFormat,
  String filePath,
  FileProcessingMode watchType,
  long interval)

Where you can specify whether the source should shutdown once all files have 
been had (PROCESS_ONCE) or whether the source should continue to monitor the 
input directory for new files (PROCESS_CONTINUOUSLY).

I think there is currently no built-in way of removing files from the input 
directory once they have been processed because it’s not possible to know when 
the contents of a given files have passed through the complete pipeline.

Best,
Aljoscha

> On 15. Jun 2017, at 20:00, Flavio Pompermaier  wrote:
> 
> Hi Aljosha,
> thanks for the great suggestions, I wasn't aware of 
> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
> Most probably that's exactly what I was looking for...(I should just have the 
> time to test it. 
> Just one last question: what are you referring to with "you could use a 
> different readFile() method where you can specify  that you want to continue 
> monitoring the directory for new files"?  Is there a way to delete or move to 
> a backup dir the new input files once enriched?
> 
> Best Flavio
> 
> 
> 
> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek  > wrote:
> Ok, just trying to make sure I understand everything: You have this:
> 
> 1. A bunch of data in HDFS that you want to enrich
> 2. An external service (Solr/ES) that you query for enriching the data rows 
> stored in 1.
> 3. You need to store the enriched rows in HDFS again
> 
> I think you could just do this (roughly):
> 
> StreamExecutionEnvironment env = …;
> 
> DataStream input = env.readFile(new RowCsvInputFormat(…), “”);
> 
> DataStream enriched = input.flatMap(new MyEnricherThatCallsES());
> // or
> DataStream enriched = AsyncDataStream.unorderedWait(input, …) // yes, 
> the interface for this is a bit strange
> 
> BucketingSink sink = new BucketingSink(“”);
> // this is responsible for putting files into buckets, so that you don’t have 
> to many small HDFS files
> sink.setBucketer(new MyBucketer());
> enriched.addSink(sink)
> 
> In this case, the file source will close once all files are read and the job 
> will finish. If you don’t want this you can also use a different readFile() 
> method where you can specify  that you want to continue monitoring the 
> directory for new files.
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 17:38, Flavio Pompermaier > > wrote:
>> 
>> Hi Aljosha,
>> thanks for getting back to me on this! I'll try to simplify the thread 
>> starting from what we want to achieve.
>> 
>> At the moment we execute some queries to a db and we store the data into 
>> Parquet directories (one for each query). 
>> Let's say we then create a DataStream from each dir, what we would like 
>> to achieve is to perform some sort of throttling of the queries to perfrom 
>> to this external service (in order to not overload it with too many 
>> queries...but we also need to run as much queries as possible in order to 
>> execute this process in a reasonable time). 
>> 
>> The current batch process has the downside that you must know at priori the 
>> right parallelism of the job while the streaming process should be able to 
>> rescale when needed [1] so it should be easier to tune the job parallelism 
>> without loosing all the already performed queries [2]. Moreover, it the job 
>> crash you loose all the work done up to that moment because there's no 
>> checkpointing...
>> My initial idea was to read from HDFS and put the data into Kafka to be able 
>> to change the number of consumers at runtime (accordingly to the maxmimum 
>> parallelism we can achieve with the external service) but maybe this could 
>> be done in a easier way (we've started using streaming from a few time so we 
>> can see things more complicated than they are).
>> 
>> Moreover, as the last step, we need to know when all the data has been 
>> enriched so we can stop this first streaming job and we can start with the 
>> next one (that cannot run if the acquisition job is still in progress 
>> because it can break referential integrity). Is there any example of such a 
>> use case?
>> 
>> [1] at the moment manually..maybe automatically in the future, right?
>> [2] with the batch job if we want to change the parallelism we need to stop 
>> it and relaunch it, loosing all the already enriched data because there's no 
>> checkpointing there
>> 
>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek > > wrote:
>> Hi Flavio,
>> 
>> I’ll try and answer your questions:
>> 
>> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or 
>> another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, 
>> String, FileProcessingMode, long) you can monitor a directory in HDFS and 
>> process the files that are there and any newly arriving files. For batching

Re: Flink cluster : Client is not connected to any Elasticsearch nodes!

2017-06-16 Thread Flavio Pompermaier
When this connector was improved to be resilient to ES problems we used to
use Logstash to index on ES and it was really cumbersome...this connector
ease a lot the work of indexing to ES: it's much faster, it can index
directly without persisting to file and it's a lot much easier to filter
documents or remove attributes. Moreover it removes a somehow useless
component from our architecture. We still have only the problem with the
shading of Flink and conflict between guava versions but we solved the
problem shading ES and using this version as dependency of the Flink ES
connector.

Best,
Flavio

On Fri, Jun 16, 2017 at 11:14 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Dhinesh,
>
> One other thing that came to mind: the Elasticsearch 2 connector, by
> default, uses ES 2.3.5.
> If you’re using an Elasticsearch 2 with major version higher than that,
> you need to build the connector with the matching version.
> When running lower major version clients against a higher major version ES
> installation, this exception is very common.
>
> Best,
> Gordon
>
>
> On 6 June 2017 at 6:39:52 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org)
> wrote:
>
> Hi Dhinesh,
>
> Could it be that you didn’t configure the network binding address of the
> ES installation properly?
> You need to make sure it isn’t binded to 127.0.0.1, which I think in some
> Elasticsearch versions is the default binding.
>
> Also, just a reminder if you haven’t done so, please make sure that the ES
> dependencies is properly bundled for cluster execution. See [1] for details
> on this.
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/linking.html
>
> On 6 June 2017 at 12:01:54 AM, dhinesh raja (dhinesh.r...@bizruntime.com)
> wrote:
>
> Dear Team,
>
> I am running flink streaming job with Elasticsearch connector2. I am able
> to run in my eclipse but when I run in flink local cluster I got this
> error. could you please help me in this?I have attached my code. I am using
> flink 1.2.0 and elastic search 2.x
>
>
> java.lang.RuntimeException: Client is not connected to any Elasticsearch 
> nodes!
> at 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
>
>
> --
> Thanks & Regards
>
> Dhinesh Raja. M
>
>
>


Re: Question about the custom partitioner

2017-06-16 Thread Aljoscha Krettek
Hi,

I’m afraid that’s not possible out-of-box with the current APIs. I actually 
don’t know why the user-facing Partitioner only allows returning one target 
because the internal StreamPartitioner (which extends ChannelSelector) allows 
returning multiple target partitions.

You can hack around the API by manually creating your own StreamPartitioner and 
applying it to the DataStream as DataStream.partitionCustom() and 
DataStream.setConnectionType() (the first calls the latter) do.

Best,
Aljoscha

> On 14. Jun 2017, at 09:09, Xingcan Cui  wrote:
> 
> Hi all,
> 
> I want to duplicate records to multiple downstream tasks (not all of them 
> thus the
> Broadcasting should not work) in stream environment.
> However, it seems that the current custom partitioner can return only one 
> partition index.
> Why this restriction exists or do I miss something?
> 
> Thanks,
> Xingcan



Re: Flink cluster : Client is not connected to any Elasticsearch nodes!

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Dhinesh,

One other thing that came to mind: the Elasticsearch 2 connector, by default, 
uses ES 2.3.5.
If you’re using an Elasticsearch 2 with major version higher than that, you 
need to build the connector with the matching version.
When running lower major version clients against a higher major version ES 
installation, this exception is very common.

Best,
Gordon


On 6 June 2017 at 6:39:52 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Dhinesh,

Could it be that you didn’t configure the network binding address of the ES 
installation properly?
You need to make sure it isn’t binded to 127.0.0.1, which I think in some 
Elasticsearch versions is the default binding.

Also, just a reminder if you haven’t done so, please make sure that the ES 
dependencies is properly bundled for cluster execution. See [1] for details on 
this.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html

On 6 June 2017 at 12:01:54 AM, dhinesh raja (dhinesh.r...@bizruntime.com) wrote:

Dear Team,

I am running flink streaming job with Elasticsearch connector2. I am able to 
run in my eclipse but when I run in flink local cluster I got this error. could 
you please help me in this?I have attached my code. I am using flink 1.2.0 and 
elastic search 2.x


 java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)

--
Thanks & Regards

Dhinesh Raja. M




Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Andrea,

I’ve rallied back to this and wanted to check on the status. Have you managed 
to solve this in the end, or is this still a problem for you?

If it’s still a problem, would you be able to provide a complete runnable 
example job that can reproduce the problem (ideally via a git branch I can 
clone and run :))?
This would help me with digging a bit more into the issue. Thanks a lot!

Best,
Gordon


On 8 June 2017 at 6:58:46 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

Hi guys,  

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0  
versions.  
Following Gordon suggestion I tried to put setReference to false but sadly  
it didn't help. What I did then was to declare a custom serializer as the  
following:  

class BlockSerializer extends Serializer[Block] with Serializable {  

override def read(kryo: Kryo, input: Input, block: Class[Block]): Block  
= {  
val serializer = new SparseMatrixSerializer  

val blockData = kryo.readObject(input, classOf[SparseMatrix],  
serializer)  
new Block(blockData)  
}  

override def write(kryo: Kryo, output: Output, block: Block): Unit = {  
val serializer = new SparseMatrixSerializer  

kryo.register(classOf[SparseMatrix], serializer)  
kryo.writeObject(output, block.blockData, serializer)  

output.close()  
}  

}  

class SparseMatrixSerializer extends Serializer[SparseMatrix] with  
Serializable {  

override def read(kryo: Kryo, input: Input, sparse:  
Class[SparseMatrix]): SparseMatrix = {  
val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  
val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

val numRows = input.readInt  
val numCols = input.readInt  
val colPtrs = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val rowIndices = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val data = kryo.readObject(input,  
classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer).asScala.toArray  

input.close()  

new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =  
colPtrs, rowIndices = rowIndices, data = data)  
}  

override def write(kryo: Kryo, output: Output, sparseMatrix:  
SparseMatrix): Unit = {  

val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  

val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

kryo.register(classOf[java.util.ArrayList[Int]],  
collectionIntSerializer)  
kryo.register(classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer)  

output.writeInt(sparseMatrix.numRows)  
output.writeInt(sparseMatrix.numCols)  
kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.data.toList.asJava,  
collectionDoubleSerializer)  

output.close()  
}  

}  

What I obtained is the same previous exception but on different accessed  
index and size.  

Caused by: java.lang.Exception: The data preparation for task 'CHAIN  
GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
  
, caused an error: Error obtaining the sorted input: Thread 'SortMerger  
Reading Thread' terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,  
Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'  
terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at java.util.ArrayL