RE: Flink performance tuning

2016-05-17 Thread Serhiy Boychenko
Cheerz,

Basically the data is stored in CSV format. The flatMap which I have 
implemented does:
String[] tokens = value.split(",");
out.collect(new Tuple2<String, Double>(tokens[0], Double.valueOf(tokens[2])));

The result calculation looks like:
DataSet<Tuple2<String, String>> statistics = rawData.flatMap(new 
VariableParser()).groupBy(0).reduceGroup(new ReduceStats());

ReduceStats implements GroupReduceFunction, iterates and addes values into 
DescriptiveStatistics and at the end output min, max and avg.

I ran the new experiments with suggested configuration and what I have noticed 
is only one task slot is being occupied. Something I am doing is wrong..
3
Task Managers
21
Task Slots
20
Available Task Slots


Best regards,
Serhiy.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: 13 May 2016 15:26
To: user@flink.apache.org
Subject: Re: Flink performance tuning

Hi,

Can you try running the job with 8 slots, 7 GB (maybe you need to go down to 6 
GB) and only three TaskManagers (-n 3) ?

I'm suggesting this, because you have many small JVMs running on your machines. 
On such small machines you can probably get much more use out of your available 
memory by running a few big task managers (which can share all the common 
management infra).
Another plus of running a few JVMs is that you are deducing network overhead, 
because communication can happen within the process, and less network transfer 
is required.

Another big factor for performance are the datatypes used. How do you represent 
your data in Flink? (Are you using the TupleX types? or POJOs?)
How do you select the key for the grouping?

Regards,
Robert


On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko 
<serhiy.boyche...@cern.ch<mailto:serhiy.boyche...@cern.ch>> wrote:
Hey,

I have successfully integrated Flink into our very small test cluster (3 
machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am 
started the session to use YARN as RM and the data is being read from HDFS.
/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

My code is very simple, flatMap is being done on the CSV data, so I extract the 
signal name and value, I group by signal name and performing group reduce on 
the data in order to calculate max, min and average on the collected values.

I have observed on 3 nodes, the average processing rate is around 
11Mbytes/second. I have compared the results with MR execution(without any kind 
of tuning) and I am quite surprised, since the performance of Hadoop is 
85Mybtes/second when executing the same query on the same data. I have read few 
reports claiming that Flink is better in comparison to MR and other tools. I am 
wondering what is wrong? Any clue?

The processing rate is calculated according to the following formula:
Overall processing rate = sum of total amount of data read per job/sum of total 
time the job was running (including staging periods)

Best regards,
Serhiy.



Flink performance tuning

2016-05-13 Thread Serhiy Boychenko
Hey,

I have successfully integrated Flink into our very small test cluster (3 
machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am 
started the session to use YARN as RM and the data is being read from HDFS.
/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

My code is very simple, flatMap is being done on the CSV data, so I extract the 
signal name and value, I group by signal name and performing group reduce on 
the data in order to calculate max, min and average on the collected values.

I have observed on 3 nodes, the average processing rate is around 
11Mbytes/second. I have compared the results with MR execution(without any kind 
of tuning) and I am quite surprised, since the performance of Hadoop is 
85Mybtes/second when executing the same query on the same data. I have read few 
reports claiming that Flink is better in comparison to MR and other tools. I am 
wondering what is wrong? Any clue?

The processing rate is calculated according to the following formula:
Overall processing rate = sum of total amount of data read per job/sum of total 
time the job was running (including staging periods)

Best regards,
Serhiy.


RE: Running Flink jobs directly from Eclipse

2016-04-06 Thread Serhiy Boychenko
What about YARN(and HDFS) configuration? I put yarn-site.xml directly into 
classpath? Or I can set the variables in the execution environment? I will give 
it a try tomorrow morning, will report back and if successful blog about it ofc 
☺

From: Christophe Salperwyck [mailto:christophe.salperw...@gmail.com]
Sent: 06 April 2016 13:41
To: user@flink.apache.org
Subject: Re: Running Flink jobs directly from Eclipse

For me it was taking the local jar and uploading it into the cluster.

2016-04-06 13:16 GMT+02:00 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>:
Thanks for the info! It is a bit difficult to tell based on the documentation 
whether or not you need to put your jar onto the Flink master node and run the 
flink command from there in order to get a job running. The documentation on 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
 isn't very explicit about where you can run the flink command from, and 
doesn't mention that you can run the job programmatically instead of using the 
CLI.

From: Christophe Salperwyck 
<christophe.salperw...@gmail.com<mailto:christophe.salperw...@gmail.com>>
Date: Wednesday, April 6, 2016 at 1:24 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Running Flink jobs directly from Eclipse

From my side I was starting the YARN session from the cluster:
flink-0.10.1/bin/yarn-session.sh -n 64 -s 4 -jm 4096 -tm 4096

Then getting the IP/port from the WebUI and then from Eclipse:
ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("xx.xx.xx.xx", 40631, 
"target/FlinkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

The JAR need to be compiled before.

Hope it helps!
Christophe

2016-04-06 9:25 GMT+02:00 Serhiy Boychenko 
<serhiy.boyche...@cern.ch<mailto:serhiy.boyche...@cern.ch>>:
Cheerz,

I have been working last few month on the comparison of different data 
processing engines and recently came across Apache Flink. After reading 
different academic papers on comparison of Flink with other data processing I 
would definitely give it a shot. The only issue I am currently having is that I 
am unable to submit Flink jobs directly from Eclipse (to YARN cluster). I am 
wondering if you got any guildelines how I could do the submission not from the 
client but from Eclipse directly? (I was unable to find anything related, with 
the exception of setting up Eclipse for working on Flink core)

Best regards,
Serhiy.





Running Flink jobs directly from Eclipse

2016-04-06 Thread Serhiy Boychenko
Cheerz,

I have been working last few month on the comparison of different data 
processing engines and recently came across Apache Flink. After reading 
different academic papers on comparison of Flink with other data processing I 
would definitely give it a shot. The only issue I am currently having is that I 
am unable to submit Flink jobs directly from Eclipse (to YARN cluster). I am 
wondering if you got any guildelines how I could do the submission not from the 
client but from Eclipse directly? (I was unable to find anything related, with 
the exception of setting up Eclipse for working on Flink core)

Best regards,
Serhiy.