Deterministic Update

2017-06-07 Thread rhashmi
Is there any possibility to trigger sink operator on completion of
checkpoint? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deterministic-Update-tp13580.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink streaming Python

2017-06-07 Thread yunfan123
Vote for python +1.
I find it can't support kafka source from code.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-streaming-Python-tp13573p13578.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


回复:Question regarding configuring number of network buffers

2017-06-07 Thread Zhijiang(wangzhijiang999)
Hi Ray,
For your question : Does that say that each parallel task inside the 
TaskManager talk to all parallel tasks inside the same TaskManager or to all 
parallel tasks across all task managers? Each task will talk to all parallel 
upstream and downstream tasks that both include the same TaskManager and across 
different task managers.The consumer and producer tasks may be deployed in the 
same TaskManager or different TaskManagers.For the case of same TaskManager, 
the local data shuffle is directly done by memory copy and the required buffers 
can be determined by #slots-per-TM^2.For the case of across TaskManagers, the 
remote data shuffle is done by network transport and only one tcp connection 
between two TaskManagers can be reused by all the internal tasks. So the 
required buffers can be determined by #TMs.
Considering both cases, the formular is #slots-per-TM^2 * #TMs, hope it can 
help you.
Cheers,Zhijiang 
--发件人:Ray 
Ruvinskiy 发送时间:2017年6月7日(星期三) 
23:59收件人:user@flink.apache.org 主 题:Question regarding 
configuring number of network buffers
The documentation provides the formula #slots-per-TM^2 * #TMs * 4 to determine 
the number of network buffers we should configure. The documentation also says, 
“A logical network connection exists for each point-to-point exchange of data 
over the network, which typically happens at repartitioning- or broadcasting 
steps (shuffle phase). In those, each parallel task inside the TaskManager has 
to be able to talk to all other parallel tasks.” Does that say that each 
parallel task inside the TaskManager talk to all parallel tasks inside the same 
TaskManager or to all parallel tasks across all task managers? Intuitively, I 
would assume the latter, but then wouldn’t the formula for determining the 
number of network buffers be more along the lines of (#slots-per-TM * #TMs)^2? 
Thanks, Ray 


Re: How to run a Flink job in EMR?

2017-06-07 Thread Foster, Craig
Ah, maybe (1) wasn’t entirely clear so here’s the copy/pasted example with what 
I suggested:

HadoopJarStepConfig copyJar = new HadoopJarStepConfig()
  .withJar("command-runner.jar")
  .withArgs("bash","-c", "aws s3 cp s3://mybucket/myjar.jar /home/hadoop"
);


From: "Foster, Craig" 
Date: Wednesday, June 7, 2017 at 7:21 PM
To: Chris Schneider , "user@flink.apache.org" 

Subject: Re: How to run a Flink job in EMR?


1)  Since the jar is only required on the master node you should be able to 
just run a step with a very simple script like ‘bash –c “aws s3 cp 
s3://mybucket/myjar.jar .”’
So if you were to do that using the step similar to outlined in the EMR 
documentation, but replacing withArgs with the above command as args (I think 
there’s an example of this on that same EMR docs page you refer to).
Then add another step after that which actually runs the flink job. The jar 
will be located in /home/hadoop. In the future, I’m hoping this can just be 
simplified to flink run -yn 2 -p 4 s3://mybucket/myjar.jar … but it doesn’t 
seem to be the case right now.

2)  If you ran this as a step, you should be able to see the error the 
Flink driver gives in the step’s logs.

3)  Provided your S3 bucket and EMR cluster EC2 IAM role/”instance profile” 
belong to the same account (or at least the permissions are setup such that you 
can download a file from S3 to your EC2 instances), you should be able to use 
the 
DefaultAWSCredentialsProviderChain,
 which won’t require you enter any credentials as it uses the EC2 instance 
profile credentials provider.


Hope that helps.

Thanks,
Craig


From: Chris Schneider 
Date: Wednesday, June 7, 2017 at 6:16 PM
To: "user@flink.apache.org" 
Subject: How to run a Flink job in EMR?

Hi Gang,

I’ve been trying to get some Flink code running in Amazon Web Services’s 
Elastic MapReduce, but so far the only success I’ve had required me to log into 
the master node, download my jar from S3 to there, and then run it on the 
master node from the command line using something like the following:

% bin/flink run -m yarn-cluster -yn 2 -p 4  

The two other approaches I’ve tried (based on the AWS EMR Flink 
documentation)
 that didn’t work were:

1) Add an EMR Step to launch my program as part of a Flink session - I couldn’t 
figure out how to get my job jar deployed as part of the step, and I couldn’t 
successfully configure a Bootstrap 
Action
 to deploy it before running that step.

2) Start a Long-Running Flink Session via an EMR Step (which worked) and then 
use the Flink Web UI to upload my job jar from my workstation - It killed the 
ApplicationMaster that was running the Flink Web UI without providing much 
interesting logging. I’ve appended both the container log output and the 
jobmanager.log contents to the end of this email.
In addition, it would be nice to gain access to S3 resources using credentials. 
I’ve tried using an AmazonS3ClientBuilder, and passing an 
EnvironmentVariableCredentialsProvider
 to its setCredentials method. I’d hoped that this might pick up the 
credentials I set up on my master node in the $AWS_ACCESS_KEY_ID and 
$AWS_SECRET_KEY environment variables I've exported, but I’m guessing that the 
shell this code is running in (on the slaves?) doesn’t have access to those 
variables.
Here’s a list of interesting version numbers:

flink-java-1.2.0.jar
flink-core-1.2.0.jar
flink-annotations-1.2.0.jar
emr-5.4.0 with Flink 1.2.0 installed

Any help would be greatly appreciated. I’m lusting after an example showing how 
to deploy a simple Flink jar from S3 to a running EMR cluster and then get 
Flink to launch it with an arbitrary set of Flink and user arguments. Bonus 
points for setting up an AmazonS3 Java client object without including those 
credentials within my Java source code.

Best Regards,

- Chris

Here’s the container logging from my attempt to submit my job via the Flink web 
UI:
Application application_1496707031947_0002 failed 1 times due to AM Container 
for appattempt_1496707031947_0002_01 exited with exitCode: 255
For more detailed output, check application tracking 
page:http://ip-10-85-61-122.ec2.internal:8088/cluster/app/application_1496707031947_0002
 Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1496707031947_0002_01_01
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at 

Re: How to run a Flink job in EMR?

2017-06-07 Thread Foster, Craig
1)   Since the jar is only required on the master node you should be able 
to just run a step with a very simple script like ‘bash –c “aws s3 cp 
s3://mybucket/myjar.jar .”’
So if you were to do that using the step similar to outlined in the EMR 
documentation, but replacing withArgs with the above command as args (I think 
there’s an example of this on that same EMR docs page you refer to).
Then add another step after that which actually runs the flink job. The jar 
will be located in /home/hadoop. In the future, I’m hoping this can just be 
simplified to flink run -yn 2 -p 4 s3://mybucket/myjar.jar … but it doesn’t 
seem to be the case right now.

2)   If you ran this as a step, you should be able to see the error the 
Flink driver gives in the step’s logs.

3)   Provided your S3 bucket and EMR cluster EC2 IAM role/”instance 
profile” belong to the same account (or at least the permissions are setup such 
that you can download a file from S3 to your EC2 instances), you should be able 
to use the 
DefaultAWSCredentialsProviderChain,
 which won’t require you enter any credentials as it uses the EC2 instance 
profile credentials provider.


Hope that helps.

Thanks,
Craig


From: Chris Schneider 
Date: Wednesday, June 7, 2017 at 6:16 PM
To: "user@flink.apache.org" 
Subject: How to run a Flink job in EMR?

Hi Gang,

I’ve been trying to get some Flink code running in Amazon Web Services’s 
Elastic MapReduce, but so far the only success I’ve had required me to log into 
the master node, download my jar from S3 to there, and then run it on the 
master node from the command line using something like the following:

% bin/flink run -m yarn-cluster -yn 2 -p 4  

The two other approaches I’ve tried (based on the AWS EMR Flink 
documentation)
 that didn’t work were:

1) Add an EMR Step to launch my program as part of a Flink session - I couldn’t 
figure out how to get my job jar deployed as part of the step, and I couldn’t 
successfully configure a Bootstrap 
Action
 to deploy it before running that step.

2) Start a Long-Running Flink Session via an EMR Step (which worked) and then 
use the Flink Web UI to upload my job jar from my workstation - It killed the 
ApplicationMaster that was running the Flink Web UI without providing much 
interesting logging. I’ve appended both the container log output and the 
jobmanager.log contents to the end of this email.
In addition, it would be nice to gain access to S3 resources using credentials. 
I’ve tried using an AmazonS3ClientBuilder, and passing an 
EnvironmentVariableCredentialsProvider
 to its setCredentials method. I’d hoped that this might pick up the 
credentials I set up on my master node in the $AWS_ACCESS_KEY_ID and 
$AWS_SECRET_KEY environment variables I've exported, but I’m guessing that the 
shell this code is running in (on the slaves?) doesn’t have access to those 
variables.
Here’s a list of interesting version numbers:

flink-java-1.2.0.jar
flink-core-1.2.0.jar
flink-annotations-1.2.0.jar
emr-5.4.0 with Flink 1.2.0 installed

Any help would be greatly appreciated. I’m lusting after an example showing how 
to deploy a simple Flink jar from S3 to a running EMR cluster and then get 
Flink to launch it with an arbitrary set of Flink and user arguments. Bonus 
points for setting up an AmazonS3 Java client object without including those 
credentials within my Java source code.

Best Regards,

- Chris

Here’s the container logging from my attempt to submit my job via the Flink web 
UI:
Application application_1496707031947_0002 failed 1 times due to AM Container 
for appattempt_1496707031947_0002_01 exited with exitCode: 255
For more detailed output, check application tracking 
page:http://ip-10-85-61-122.ec2.internal:8088/cluster/app/application_1496707031947_0002
 Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1496707031947_0002_01_01
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at 

Flink streaming Python

2017-06-07 Thread Madhukar Thota
Hi

I have asked the same question back in Jan 2016 and checking again with
community to see if there is any update or plan for supporting streaming
Flink in python.


Re: Guava version conflict

2017-06-07 Thread Tzu-Li (Gordon) Tai
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

What I did was to take the sources of the new ES connector and I took them into 
my code.
Flink was compiled with maven 3.3+ but I did the double compilation as 
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

Is it a problem of the shading with Maven 3.3+?

Best,
Flavio

On Wed, Jun 7, 2017 at 5:48 PM, Tzu-Li (Gordon) Tai  wrote:
Ah, I assumed you were running 1.3.0 (since you mentioned “new” ES connector).

Another thing to check, if you built Flink yourself, make sure you’re not using 
Maven 3.3+. There are shading problems when Flink is built with Maven versions 
higher then that.
The flink-dist jar should not contain any non-shaded Guava dependencies, could 
you also quickly check that?

On 7 June 2017 at 5:42:28 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I shaded the Elasticsearch dependency [1] and now the job works.
So I cannot run a job that needs guava 18 on Flink 1.2.1...

[1]  https://www.elastic.co/blog/to-shade-or-not-to-shade

On Wed, Jun 7, 2017 at 5:33 PM, Tzu-Li (Gordon) Tai  wrote:
Hi Flavio,

Could there be another dependency in your job that requires a conflicting 
version (w.r.t. ES 2.4.1) of Guava?
I’ve just double checked the flink-dist jar, there doesn’t seem to be any 
non-shaded Guava dependencies there, so the conflict should not have been 
caused by Flink.

Cheers,
Gordon


On 7 June 2017 at 4:12:04 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi to all,
I'm trying to use the new ES connector to index data from Flink (with ES 2.4.1).
When I try to run it from Eclipse everything is ok, when I run it from the 
cluster I get the following exception:

java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
    at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
    at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)

In my fat jar there are the classes of guava 18 (ES requires that version), 
Flink runs on CDH 5.9 (that use guava 11), in flink-dist jar I think that 
there's guava 11 classes while in flink-hadoop-compatibility there are shade 
guava 18 dependencies.

How can I make the job successfully run on the cluster?

Best,
Flavio




Re: Guava version conflict

2017-06-07 Thread Flavio Pompermaier
What I did was to take the sources of the new ES connector and I took them
into my code.
Flink was compiled with maven 3.3+ but I did the double compilation as
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

Is it a problem of the shading with Maven 3.3+?

Best,
Flavio

On Wed, Jun 7, 2017 at 5:48 PM, Tzu-Li (Gordon) Tai 
wrote:

> Ah, I assumed you were running 1.3.0 (since you mentioned “new” ES
> connector).
>
> Another thing to check, if you built Flink yourself, make sure you’re not
> using Maven 3.3+. There are shading problems when Flink is built with Maven
> versions higher then that.
> The flink-dist jar should not contain any non-shaded Guava dependencies,
> could you also quickly check that?
>
> On 7 June 2017 at 5:42:28 PM, Flavio Pompermaier (pomperma...@okkam.it)
> wrote:
>
> I shaded the Elasticsearch dependency [1] and now the job works.
> So I cannot run a job that needs guava 18 on Flink 1.2.1...
>
> [1]  https://www.elastic.co/blog/to-shade-or-not-to-shade
>
> On Wed, Jun 7, 2017 at 5:33 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Flavio,
>>
>> Could there be another dependency in your job that requires a conflicting
>> version (w.r.t. ES 2.4.1) of Guava?
>> I’ve just double checked the flink-dist jar, there doesn’t seem to be any
>> non-shaded Guava dependencies there, so the conflict should not have been
>> caused by Flink.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 7 June 2017 at 4:12:04 PM, Flavio Pompermaier (pomperma...@okkam.it)
>> wrote:
>>
>> Hi to all,
>> I'm trying to use the new ES connector to index data from Flink (with ES
>> 2.4.1).
>> When I try to run it from Eclipse everything is ok, when I run it from
>> the cluster I get the following exception:
>>
>> java.lang.NoSuchMethodError: com.google.common.util.concurr
>> ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
>> at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.
>> java:192)
>> at org.elasticsearch.client.transport.TransportClient$Builder.b
>> uild(TransportClient.java:131)
>>
>> In my fat jar there are the classes of guava 18 (ES requires that
>> version), Flink runs on CDH 5.9 (that use guava 11), in flink-dist jar I
>> think that there's guava 11 classes while in flink-hadoop-compatibility
>> there are shade guava 18 dependencies.
>>
>> How can I make the job successfully run on the cluster?
>>
>> Best,
>> Flavio
>>
>>
>


Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-07 Thread Tzu-Li (Gordon) Tai
Hi Andrea,

I did some quick issue searching, and it seems like this is a frequently asked 
issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.

I can’t be sure at the moment if the resolution / workaround mentioned in there 
makes sense, I’ll have to investigate a bit more.

Also, to clarify: from the stack trace, it seems like you’re simply using 
whatever serializer Kryo defaults to (i.e. FieldSerializer), and not 
registering your own, is that correct?

In the meanwhile, could you also try the following and rebuild Flink, and test 
to see if it works?:
on 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349,
 change setReferences to false.

Cheers,
Gordon


On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

Good afternoon dear Community,  

Since few days I'm really struggling to understand the reason behind this  
KryoException. Here the stack trace.  

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code: CHAIN GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat  
ion$.main(MatrixMultiplication.scala:46)) (1/1)  
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce  
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B  
lockMatrix.scala:103)) -> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
  
, caused an error: E  
rror obtaining the sorted input: Thread 'SortMerger spilling thread'  
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:  
109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger spilling thread' terminated due to an exception:  
java.lang.IndexOu  
tOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'  
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:  
109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
  
Caused by: com.esotericsoftware.kryo.KryoException:  
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)  
at  
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at java.util.ArrayList.get(ArrayList.java:429)  
at  
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
  
at 

Re: Queries regarding FlinkCEP

2017-06-07 Thread Biplob Biswas
Hi Dawid,


Yes, now I understood what you meant. Although I added exactly the input you
asked me to and I still get no alerts.

I also observed that I am not getting alerts even with normal ordering of
timestamp and with ascedingTimestampExtractor.

I am adding an image where I entered the data from the console producer and
the console should printout the alerts along with the events, but only the
events are printed.



  

 






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454p13567.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Guava version conflict

2017-06-07 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

Could there be another dependency in your job that requires a conflicting 
version (w.r.t. ES 2.4.1) of Guava?
I’ve just double checked the flink-dist jar, there doesn’t seem to be any 
non-shaded Guava dependencies there, so the conflict should not have been 
caused by Flink.

Cheers,
Gordon


On 7 June 2017 at 4:12:04 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi to all,
I'm trying to use the new ES connector to index data from Flink (with ES 2.4.1).
When I try to run it from Eclipse everything is ok, when I run it from the 
cluster I get the following exception:

java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
    at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
    at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)

In my fat jar there are the classes of guava 18 (ES requires that version), 
Flink runs on CDH 5.9 (that use guava 11), in flink-dist jar I think that 
there's guava 11 classes while in flink-hadoop-compatibility there are shade 
guava 18 dependencies.

How can I make the job successfully run on the cluster?

Best,
Flavio


Guava version conflict

2017-06-07 Thread Flavio Pompermaier
Hi to all,
I'm trying to use the new ES connector to index data from Flink (with ES
2.4.1).
When I try to run it from Eclipse everything is ok, when I run it from the
cluster I get the following exception:

java.lang.NoSuchMethodError: com.google.common.util.
concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.(
ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$
Builder.build(TransportClient.java:131)

In my fat jar there are the classes of guava 18 (ES requires that version),
Flink runs on CDH 5.9 (that use guava 11), in flink-dist jar I think that
there's guava 11 classes while in flink-hadoop-compatibility there are
shade guava 18 dependencies.

How can I make the job successfully run on the cluster?

Best,
Flavio


Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-07 Thread Andrea Spina
Good afternoon dear Community,

Since few days I'm really struggling to understand the reason behind this
KryoException. Here the stack trace.

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
   
- Error in task code:  CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more
2017-06-07 10:18:52,594 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Direct
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Off-heap
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 

Re: Queryable State Client with 1.3.0-rc0

2017-06-07 Thread Aljoscha Krettek
Hi Claudio,

Quick question: what exactly was your call for getting the local environment 
with web UI? Did you also have a custom Configuration where you specified, for 
example, that the queryable state server should be enabled?

I can make an example work where I start a local cluster in one process (in the 
IDE) and then query from another process (also started in the IDE) but only if 
I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m 
talking about Flink 1.2.x here.

Best,
Aljoscha

> On 6. Jun 2017, at 17:23, Aljoscha Krettek  wrote:
> 
> Hi Claudio,
> 
> Quick follow up: querying a locally started cluster does not work out-of-box 
> anymore in Flink 1.3. You can manually start a mini cluster that has the 
> required settings, though. You would do something like this:
> 
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
> 
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
> configuration.setString(JobManagerOptions.ADDRESS, "localhost");
> configuration.setInteger(JobManagerOptions.PORT, 6123);
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
> 
> flinkMiniCluster = new LocalFlinkMiniCluster(
>configuration,
>HighAvailabilityServicesUtils.createHighAvailabilityServices(
>configuration,
>Executors.directExecutor(),
>
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
>false);
> 
> flinkMiniCluster.start();
> 
> And then you can create a remote StreamExecutionEnvironment using 
> StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to 
> that cluster.
> 
> You can stop the cluster using flinkMiniCluster.stop()
> 
> I hope this helps?
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 16:33, Aljoscha Krettek  wrote:
>> 
>> Hi Claudio,
>> 
>> The documentation for this was recently updated: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state.
>>  Could you see if that helps? The important bit for you is probably this:
>> 
>> final HighAvailabilityServices highAvailabilityServices =
>>  HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>   config,
>>   Executors.newSingleThreadScheduledExecutor(),
>>   
>> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>> 
>> If that doesn’t help we’ll need to delve deeper.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 11. May 2017, at 22:21, Fahey, Claudio  wrote:
>>> 
>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now 
>>> upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a 
>>> HighAvailabilityServices parameter. The documentation hasn’t been updated 
>>> on using HighAvailabilityServices so I’m a bit lost on what exactly I 
>>> should specify for that parameter. For development, I want to connect to a 
>>> Flink Job Manager that I created from a different process using 
>>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody 
>>> provide the code needed to create the appropriate HighAvailabilityServices 
>>> parameter?
>>> 
>>> I have tried the following code:
>>> 
>>>  val jobManagerIpcAddress = “localhost”
>>>  val jobManagerIpcPort = 6123
>>>  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>>  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>>  private val highAvailabilityServices = new 
>>> StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>>  private val client = new QueryableStateClient(configuration, 
>>> highAvailabilityServices)
>>> 
>>> It results in:
>>> 
>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
>>> ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>> 
>>> 
>>> Claudio Fahey
>>> Chief Solutions Architect, Analytics
>>> Dell EMC | Emerging Technologies Team
>>> 
>> 
> 



Re: AsyncCollector Does not release the thread (1.2.1)

2017-06-07 Thread Aljoscha Krettek
Hi Steve,

I’m assuming you are using Flink 1.2.x? If yes, then I’m afraid you 
re-discovered this issue: https://issues.apache.org/jira/browse/FLINK-6435 
. It was fixed in Flink 
1.3.0. Is it possible for you to update to that version or do you think it’s 
important that we back port that fix to the Flink 1.2.x line?

Best,
Aljoscha

> On 6. Jun 2017, at 19:34, Aljoscha Krettek  wrote:
> 
> Ok, thanks for letting us know. I’ll investigate.
>> On 6. Jun 2017, at 19:28, Steve Robert > > wrote:
>> 
>> Hi Aljoscha ,
>> 
>> thank you for your reply,
>>  yes the queue being filled up and no more elements are being processed.(In 
>> relation to the limit defined at the "orderedWait" function call).
>> To add additional information, if I run the test on a local cluster I can 
>> see that the job never ends because the AsyncFunction stay blocked As if 
>> there was no call to  the "collect" method
>> Best,
>> Steve
>> 
>> On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek > > wrote:
>> Hi,
>> 
>> As far as I know calling collect(Throwable) should also finish the promise 
>> that would otherwise fulfilled by successfully collecting a result. If not 
>> then you might have found a bug. What makes you think that the Thread is not 
>> being released? Is your queue being filled up and no more elements are being 
>> processed?
>> 
>> Regarding your other question, yes, you can collect an empty Collection for 
>> signalling that there was no result.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 8. May 2017, at 21:47, Steve Robert >> > wrote:
>>> 
>>> Hi guys, 
>>> 
>>> AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
>>> This scenario may be problematic when calling an external API
>>> In the case of a timeout error there is no data to collect.
>>> 
>>> for example :
>>> 
>>>   CompletableFuture.supplyAsync(() -> asyncCallTask(input))
>>> .thenAccept((Collection> 
>>> result) -> {
>>> 
>>> this.tupleEmited.getAndIncrement();
>>> 
>>> asyncCollector.collect(result);
>>> })
>>> .exceptionally((ex) -> {
>>> asyncCollector.collect(ex);
>>> return null;
>>> });
>>> }
>>> it is possible to create an empty Collection and collect this empty 
>>> collection to force the Thread to be released but this workflow seems 
>>> strange to me.
>>> thank for your help
>>>  
>>> 
>>> -- 
>>> Steve Robert  
>>> Software Engineer
>>> srob...@qualys.com 
>>> T <>
>>> Qualys, Inc. – Continuous Security
>>> Blog  | Community  
>>> | Twitter 
>>>  
>> 
>> 
>> 
>> -- 
>> Steve Robert  
>> Software Engineer
>> srob...@qualys.com 
>> T <>
>> Qualys, Inc. – Continuous Security
>> Blog  | Community  | 
>> Twitter 
>>  



Re: Flink and swapping question

2017-06-07 Thread Flavio Pompermaier
I forgot to mention that my jobs are all batch (at the moment).

Do you think that this problem could be related to

   - http://www.evanjones.ca/java-bytebuffer-leak.html#comment-3240054880
   - and http://www.evanjones.ca/java-native-leak-bug.html

Kurt told me also to add "env.java.opts: -Dio.netty.
recycler.maxCapacity.default=1" .

Best,
Flavio

On Tue, Jun 6, 2017 at 7:42 PM, Flavio Pompermaier 
wrote:

> Hi Stephan,
> I also think that the error is more related to netty.
> The only suspicious library I use are parquet or thrift.
> I'm not using off-heap memory.
> What do you mean for "crazy high number of concurrent network
> shuffles"?how can I count that?
> We're using java 8.
>
> Thanks a lot,
> Flavio
>
>
>
> On 6 Jun 2017 7:13 pm, "Stephan Ewen"  wrote:
>
> Hi!
>
> I would actually be surprised if this is an issue in core Flink.
>
>   - The MaxDirectMemory parameter is pretty meaningless, it really is a
> max and does not have an impact on how much is actually allocated.
>
>   - In most cases we had reported so far, the leak was in a library that
> was used in the user code
>
>   - If you do not use offheap memory in Flink, then there are few other
> culprits that can cause high virtual memory consumption:
>   - Netty, if you bumped the Netty version in a custom build
>   - Flink's Netty, if the job has a crazy high number of concurrent
> network shuffles (we are talking 1000s here)
>   - Some old Java versions have I/O memory leaks (I think some older
> Java 6 and Java 7 versions were affected)
>
>
> To diagnose that better:
>
>   - Are these batch or streaming jobs?
>   - If it is streaming, which state backend are you using?
>
> Stephan
>
>
> On Tue, Jun 6, 2017 at 12:00 PM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> can you post the all memory configuration parameters of your workers?
>> Did you investigate which whether the direct or heap memory grew?
>>
>> Thanks, Fabian
>>
>> 2017-05-29 20:53 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>> I'm still trying to understand what's going on our production Flink
>>> cluster.
>>> The facts are:
>>>
>>> 1. The Flink cluster runs on 5 VMWare VMs managed by ESXi
>>> 2. On a specific  job we have, without limiting the direct memory to 5g,
>>> the TM gets killed by the OS almost immediately because the memory required
>>> by the TM, at some point, becomes huge, like > 100 GB (others jobs seem to
>>> be less affected by the problem )
>>> 3. Although the memory consumption is much better this way, the Flink TM
>>> memory continuously grow job after job (of this problematic type): we set
>>> TM max heap to 14 GB and the JVM required memory can be ~ 30 Gb. How is
>>> that possible?
>>>
>>> My fear is that there's some annoying memory leak / bad memory
>>> allocation in the Flink network level, but I can't have any evidence of
>>> this (except the fact that the vm which doesn't have a hdfs datanode
>>> underneath the Flink TM is the one with the biggest TM virtual memory
>>> consumption).
>>>
>>> Thanks for the help ,
>>> Flavio
>>>
>>> On 29 May 2017 15:37, "Nico Kruber"  wrote:
>>>
 FYI: taskmanager.sh sets this parameter but also states the following:

   # Long.MAX_VALUE in TB: This is an upper bound, much less direct
 memory will
 be used
   TM_MAX_OFFHEAP_SIZE="8388607T"


 Nico

 On Monday, 29 May 2017 15:19:47 CEST Aljoscha Krettek wrote:
 > Hi Flavio,
 >
 > Is this running on YARN or bare metal? Did you manage to find out
 where this
 > insanely large parameter is coming from?
 >
 > Best,
 > Aljoscha
 >
 > > On 25. May 2017, at 19:36, Flavio Pompermaier 
 > > wrote:
 > >
 > > Hi to all,
 > > I think we found the root cause of all the problems. Looking ad
 dmesg
 > > there was a "crazy" total-vm size associated to the OOM error, a
 LOT much
 > > bigger than the TaskManager's available memory. In our case, the TM
 had a
 > > max heap of 14 GB while the dmsg error was reporting a required
 amount of
 > > memory in the order of 60 GB!
 > >
 > > [ 5331.992539] Out of memory: Kill process 24221 (java) score 937 or
 > > sacrifice child [ 5331.992619] Killed process 24221 (java)
 > > total-vm:64800680kB, anon-rss:31387544kB, file-rss:6064kB,
 shmem-rss:0kB
 > >
 > > That wasn't definitively possible usin an ordinary JVM (and our TM
 was
 > > running without off-heap settings) so we've looked at the
 parameters used
 > > to run the TM JVM and indeed there was a reall huge amount of memory
 > > given to MaxDirectMemorySize. With my big surprise Flink runs a TM
 with
 > > this parameter set to 8.388.607T..does it make any sense?? Is it
 > > documented anywhere the importance of this parameter (and why it is
 used

Re: Running job in "dry mode"?

2017-06-07 Thread Maciek Próchniak



On 07/06/2017 10:27, Maciek Próchniak wrote:




On 07/06/2017 10:07, Tzu-Li (Gordon) Tai wrote:

Hi Maciek,

Is there any particular reason why you do not wish to start running 
the Kafka sources on the test run?
Otherwise, it would be perfectly fine to start the test job for 
testing to see if everything works, and keep that savepoint 
eventually for the non-dry run.


well, I want to make sure I don't interfere with currently running, 
production process. While I could use different consumer I certainly 
don't want to have events emitted. Although it may work if I have some 
dummy sinks... I'll think about that...


Also, what our integration tests for migrating across Flink versions 
typically do is have some dummy collection source (`fromElements`) 
for the test job.
yes, I also tried that. Unfortunately I encountered a problem: when I 
replace kafka source with collection source the state becomes 
incompatible, because collection source has different state than kafka 
one...


thanks,
maciek


Cheers,
Gordon

On 7 June 2017 at 7:34:25 AM, Maciek Próchniak (m...@touk.pl 
) wrote:



Hello,

I'd like to be able to see if new version of my job is compatible with
the old one.

I can make a savepoint and run new version from that, but I'd like 
to be
able to do it without actually starting sources and so on - so that 
e.g.

it won't start to read from my kafka topics.

Of course I can do it by substituting configuration values, or running
without network access - but this seems a bit mundane and error-prone.

Do you know about any ways to achieve this?

thanks,

maciek







[DISCUSS] Removal of twitter-inputformat

2017-06-07 Thread Chesnay Schepler

Hello,

I'm proposing to remove the Twitter-InputFormat in FLINK-6710 
, with an open PR you 
can find here .
The PR currently has a +1 from Robert, but Timo raised some concerns 
saying that it is useful for prototyping and

advised me to start a discussion on the ML.

This format is a DelimitedInputFormat that reads JSON objects and turns 
them into a custom tweet class.
I believe this format doesn't provide much value to Flink; there's 
nothing interesting about it as an InputFormat,
as it is purely an exercise in /manually /converting a JSON object into 
a POJO.
This is apparent since you could just as well use 
ExecutionEnvironment#readTextFile(...) and throw the parsing logic

into a subsequent MapFunction.

In the PR i suggested to replace this with a JsonInputFormat, but this 
was a misguided attempt at getting Timo to agree
to the removal. This format has the same problem outlined above, as it 
could be effectively implemented with a one-liner map function.


So the question now is whether we want to keep it, remove it, or replace 
it with something more general.


Regards,
Chesnay


Re: Methods that trigger execution

2017-06-07 Thread Aljoscha Krettek
Hi,

I’m afraid I don’t know that part well enough. What’s the percentage in 
slowdown? (7 seconds alone doesn’t say anything)

Maybe Till (in cc) knows more since he used to work on the ML part.

Best,
Aljoscha

> On 6. Jun 2017, at 17:45, Borja  wrote:
> 
> *Thank so much Aljoscha* :)
> I was stucked in this point. I didn't know that the print or collect method
> collecting all the data in one place.
> 
> The execution time has dropped a lot.
> However, I still get that Flink is slower (just for 7 seconds).
> 
> I really think I'm not getting all the performance out of Flink.
> Because Flink draws the execution in a cyclic dependency graph meanwhile
> Spark uses a DAG,
> so it's clear that the Flin's way results in superior scalability and
> performance compared to DAG approach.
> 
> So... Which is the problem with my code?
> 
> //Read data
> val data: DataSet[org.apache.flink.ml.common.LabeledVector] =
> MLUtils.readLibSVM(benv, "/inputPath/_.libsvm")
> 
> // Create multiple linear regression learner
> val mlr = MultipleLinearRegression()
> 
> val model = mlr.fit(data)
> 
> data.writeAsText("file:///outputPath") 
> 
> benv.execute()
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Methods-that-trigger-execution-tp12972p13537.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.