Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Ovidiu-Cristian MARCU
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is 
running with the same parallelism as successive map operator and the workflow 
optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming 
N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be 
scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced 
across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push 
records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since 
each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and 
map and be pipelined on same slot (maybe shared slot configuration could 
enforce that).

DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), 
inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector() {
private static final long serialVersionUID = 1L;

@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
 


> On 18 Jun 2018, at 10:05, Fabian Hueske  wrote:
> 
> Not sure if TM local assignment is explicitly designed in 1.5.0, but it might 
> be an artifact of how slots are registered in the resource manager. 
> Till (in CC) should know how that works.
> 
> Anyway, tasks that run in the same TM exchange data via in-memory channels 
> which is of course much faster than going over the network.
> So yes, a performance drop when tasks are scheduled to different TMs is not 
> unexpected IMO.
> You can check that by starting multiple TMs with a single slot each and 
> running you job on that setup.
> 
> Best, Fabian
> 
> 
> 
> 2018-06-18 9:57 GMT+02:00 Siew Wai Yow  >:
> Hi Fabian,
> 
> We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?
> 
> "Hence, applications might scale better until tasks are scheduled to 
> different machines."
> 
> This seems the case. We have 32 vCPU 16 slots in one TM machine. So the 
> scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale 
> to 32 the performance drop, not even in par with case of parallelism 16. Is 
> this something expected? Thank you.
> 
> Regards,
> Yow
> 
> From: Fabian Hueske mailto:fhue...@gmail.com>>
> Sent: Monday, June 18, 2018 3:47 PM
> To: Siew Wai Yow
> Cc: Jörn Franke; user@flink.apache.org 
> 
> Subject: Re: Flink application does not scale as expected, please help!
>  
> Hi,
> 
> Which Flink version are you using?
> Did you try to analyze the bottleneck of the application, i.e., is it CPU, 
> disk IO, or network bound?
> 
> Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to 
> schedule tasks on the same machine to reduce the amount of network transfer.
> Hence, applications might scale better until tasks are scheduled to different 
> machines.
> 
> Fabian
> 
> 2018-06-16 12:20 GMT+02:00 Siew Wai Yow  >:
> Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git 
> 
> Thank you!
> 
> From: Jörn Franke mailto:jornfra...@gmail.com>>
> Sent: Saturday, June 16, 2018 6:03 PM
> 
> To: Siew Wai Yow
> Cc: user@flink.apache.org 
> Subject: Re: Flink application does not scale as expected, please help!
>  
> Can you share the app source on gitlab, github or bitbucket etc? 
> 
> On 16. Jun 2018, at 11:46, Siew Wai Yow  > wrote:
> 
>> Hi, There is an interesting finding, the reason of low parallelism work much 
>> better is because all task being run in same TM, once we scale more, the 
>> task is distributed to different TM and the performance worse than the low 
>> parallelism case. Is this something expected? The more I scale the less I 
>> get?
>> 
>> From: Siew Wai Yow mailto:wai_...@hotmail.com>>
>> Sent: Saturday, June 16, 2018 5:09 PM
>> To: Jörn Franke
>> Cc: user@flink.apache.org 

Re: parallelism for window operations

2017-01-27 Thread Ovidiu-Cristian MARCU
Now I see (documentation clear), just a correction:

because I set PInput as slot sharing group for flatMap, source and flatMap are 
in different slots.
Also that means S6 and S7 are the same slot, as expected because they share the 
same slot group output.

Best,
Ovidiu 
> On 27 Jan 2017, at 10:43, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr> wrote:
> 
> Thank you, Fabian!
> 
> It works, what I did and results, as an example for other users:
> Total slots occupied are 7 (not sure how to check that Source + Flat Map are 
> in the same slot, assumed slot S1 will be that; also S6 and S7 are different, 
> although I set the same name for slot sharing group).
> 
>   // get input data by connecting to the socket
>   DataStream text = env.socketTextStream("localhost", 
> port, "\n");
> 
>   
>   DataStream input = 
> text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
>   DataStream counts1 = null;
> 
>   counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>   .apply(new WindowFunction<IN, Double, Tuple, 
> GlobalWindow>() {
> ...
>   
> }).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); 
> //ONE SLOT S2
> 
>   DataStream counts2 = 
> input.keyBy(2).countWindow(windowSize, slideSize)
>   .apply(new WindowFunction<Tuple8<String, 
> String, String, Integer, String, Double, Long, Long>, Double, Tuple, 
> GlobalWindow>() {
> ...
>   
> }).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); 
> //THREE SLOTS S3, S4, S5
> 
>   
> counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1);
>  //ONE SLOT S6
>   
> counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1);
>  //ONE SLOT S7
> 
>   env.execute("Socket Window WordCount");
> 
> 
> Best,
> Ovidiu
> 
>> On 27 Jan 2017, at 10:13, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Ovidiu,
>> 
>> you can control the slot assignment by assigning operators to 
>> SlotSharingGroups.
>> For example like this:
>> 
>> someStream.filter(...).slotSharingGroup("name");
>> 
>> Operators is different groups are scheduled to different slots. By default, 
>> all operators are in the same group.
>> Have a look at the docs as well [1]
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
>> 
>> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU 
>> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>>:
>> Hi,
>> 
>> I have the following program configured with parallelism 2.
>> After running this example I see only 2 slots are busy.
>> 
>> How can I ensure counts1 and counts2 are executed on their own slots with 
>> the given parallelism (in this case 2 slots each)?
>> 
>>  port = params.getInt("port");
>> 
>>  // get the execution environment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>  env.setParallelism(params.getInt("paral", 2));
>>  env.setMaxParallelism(params.getInt("paral", 2));
>> 
>>  // get input data by connecting to the socket
>>  DataStream text = env.socketTextStream("localhost", 
>> port, "\n");
>> 
>>  DataStream<Tuple8<String, String, String, Integer, String, 
>> Double, Long, Long>> input = text.flatMap(...);
>>  DataStream counts1 = null;
>> 
>>  counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>  .apply(new WindowFunction<Tuple8<String, 
>> String, String, Integer, String, Double, Long, Long>, Double, Tuple, 
>> GlobalWindow>() {
>> ...
>>  });
>> 
>>  DataStream counts2 = 
>> input.keyBy(1).countWindow(windowSize, slid

Re: Monitoring REST API

2016-12-21 Thread Ovidiu-Cristian MARCU
Hi Lydia,

I have used sar monitoring (sar -u -n DEV -p -d -r 1) and plotted the average 
over multiple nodes.

1)So for each node you can collect the sar output, and obtain for example:

Linux 3.2.0-4-amd64 (parasilo-4.rennes.grid5000.fr) 2016-01-27  
_x86_64_(16 CPU)
12:54:09CPU %user %nice   %system   %iowait%steal %idle
12:54:10all  4.63  0.00  3.25  0.13  0.00 91.99
12:54:09kbmemfree kbmemused  %memused kbbuffers  kbcached  kbcommit   
%commit  kbactive   kbinact
12:54:10129538812   2525308  1.91  1292 85876   3662636  
2.69   2111652 55132
12:54:09  DEV   tps  rd_sec/s  wr_sec/s  avgrq-sz  avgqu-sz 
await svctm %util
12:54:10  sda 28.71   2708.91 87.13 97.38  0.03  
1.10  0.97  2.77
12:54:09IFACE   rxpck/s   txpck/srxkB/stxkB/s   rxcmp/s   
txcmp/s  rxmcst/s
12:54:10 eth0632.67587.13   3173.60 58.47  0.00  
0.00  0.00

2) Calculate the average over your nodes (sync clocks) and obtain a final 
output over which you run some plot scripts:

LINE  DATE  FILENAME CPU_user  CPU_SYS   KBMEMFREE 
KBMEMUSED MEMUSED   DISK_UTIL DISK_RKBs DISK_WKBs _IO_RSTs  _IO_WSTs
1 12:54:10  res1Avg  6.12  1.25  129554704 
2509412   1.90  6.00  4253.63   87.04 3944.00   88.00 
2 12:54:11  res1Avg  3.41  0.28  129523432 
2540690   1.92  4.00  2335.82   51.62 2692.00   0.00  
3 12:54:12  res1Avg  0.06  0.03  129522000 
2542120   1.92  1.60  0.16  0.59  2048.00   32.00 
4 12:54:13  res1Avg  0.09  0.06  129520936 
2543182   1.92  0.60  0.19  0.59  2048.00   0.00  
5 12:54:14  res1Avg  0.06  0.06  129518448 
2545670   1.93  6.80  4.31  169.474044.00   16.00 

For other metrics specific to Flink’s execution you may need to rely on various 
metrics Flink is currently exposing.

Best,
Ovidiu

> On 21 Dec 2016, at 19:55, Lydia Ickler  wrote:
> 
> Hi all,
> 
> I have a question regarding the Monitoring REST API;
> 
> I want to analyze the behavior of my program with regards to I/O MiB/s, 
> Network MiB/s and CPU % as the authors of this paper did. 
> (https://hal.inria.fr/hal-01347638v2/document 
> )
> From the JSON file at http:master:8081/jobs/jobid/ I get a summary including 
> the information of read/write records and read/write bytes.
> Unfortunately the entries of Network or CPU are either (unknown) or 0.0. I am 
> running my program on a cluster with up to 32 nodes.
> 
> Where can I find the values for e.g. CPU or Network?
> 
> Thanks in advance!
> Lydia
> 



Re: Parameters to Control Intra-node Parallelism

2016-07-13 Thread Ovidiu-Cristian MARCU
Hi,

I would pay attention to the memory settings such that heap+off-heap+network 
buffers can be served from your node’s RAM for both TMs.
Also, there is some correlation between the number of buffers, parallelism and 
your workflow’s operators. The suggestion to be used for the numberOfBuffers 
does not work in every case.

I guess the numberOfBuffers could be automatically determined based on the 
parallelism and workflow’s operators, not sure how to do that.

Best,
Ovidiu

> On 12 Jul 2016, at 21:18, Saliya Ekanayake <esal...@gmail.com> wrote:
> 
> Hi Ovidiu,
> 
> Checking the /var/log/messages based on Greg's response revealed TMs were 
> killed due to out of memory. Here's the node architecture. Each node has 
> 128GB of RAM. I was trying to run 2 TMs per node binding each to 12 cores (or 
> 1 socket). The total number of nodes were 16. I finally, managed to get it 
> working with the following (non-default) settings.
> 
> taskmanager.heap.mb: 12288
> taskmanager.numberOfTaskSlots: 12
> akka.ask.timeout: 1000 s
> taskmanager.network.numberOfBuffers: 36864
> 
> Note, the number of buffers value, this had to be higher (twice in this case) 
> than what's suggested in Flink (#slots-per-TM^2 * #TMs * 4, which would be 
> 12*12*32*4 = 18432). Otherwise, it would throw me the not enough buffers 
> error.
> 
> Thank you,
> Saliya
> 
> 
> 
> On Tue, Jul 12, 2016 at 7:39 AM, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> Can you post your configuration parameters (exclude default settings) and 
> cluster description?
> 
> Best,
> Ovidiu
> 
>> On 11 Jul 2016, at 17:49, Saliya Ekanayake <esal...@gmail.com 
>> <mailto:esal...@gmail.com>> wrote:
>> 
>> Thank you Greg, I'll check if this was the cause for my TMs to disappear.
>> 
>> On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan <c...@greghogan.com 
>> <mailto:c...@greghogan.com>> wrote:
>> The OOM killer doesn't give warning so you'll need to call dmesg or look in 
>> /var/log/messages or similar. The following reports that Debian flavors may 
>> use /var/log/syslog.
>>   
>> http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer
>>  
>> <http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer>
>> 
>> On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake <esal...@gmail.com 
>> <mailto:esal...@gmail.com>> wrote:
>> Greg,
>> 
>> where did you see the OOM log as shown in this mail thread? In my case none 
>> of the TaskManagers nor JobManger reports an error like this.
>> 
>> On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <c...@greghogan.com 
>> <mailto:c...@greghogan.com>> wrote:
>> These symptoms sounds similar to what I was experiencing in the following 
>> thread. Flink can have some unexpected memory usage which can result in an 
>> OOM kill by the kernel, and this becomes more pronounced as the cluster size 
>> grows.
>>   https://www.mail-archive.com/dev@flink.apache.org/msg06346.html 
>> <https://www.mail-archive.com/dev@flink.apache.org/msg06346.html>
>> 
>> On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <esal...@gmail.com 
>> <mailto:esal...@gmail.com>> wrote:
>> I checked, but JVMs didn't crash. No puppet or other services like that.
>> 
>> One thing I found is that things work OK when I have a smaller number of 
>> slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. 
>> Then I reduced it to 4 nodes each with 2 TMs, which worked.
>> 
>> 
>> 
>> On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org 
>> <mailto:rmetz...@apache.org>> wrote:
>> Hi,
>> from the TaskManager logs, I can not see anything suspicious.
>> Its a bit weird that the TaskManager logs just end, without any shutdown 
>> messages. Usually the TMs log some shut down stuff when they are stopping.
>> Also, if they would be still running, I would expect some error messages 
>> from akka about the connection status.
>> So the only thing I conclude is that one of the TMs was killed by the OS or 
>> the JVM crashed. Did you check if that happened?
>> 
>> Do you have any service like puppet that is controlling processes?
>> 
>> 
>> On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <esal...@gmail.com 
>> <mailto:esal...@gmail.com>> wrote:
>> I see two logs (attached), but there's only 1 TaskManger process. Also, the 
>> Web console says it can 

Re: Parameters to Control Intra-node Parallelism

2016-07-12 Thread Ovidiu-Cristian MARCU
Hi,

Can you post your configuration parameters (exclude default settings) and 
cluster description?

Best,
Ovidiu
> On 11 Jul 2016, at 17:49, Saliya Ekanayake  wrote:
> 
> Thank you Greg, I'll check if this was the cause for my TMs to disappear.
> 
> On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan  > wrote:
> The OOM killer doesn't give warning so you'll need to call dmesg or look in 
> /var/log/messages or similar. The following reports that Debian flavors may 
> use /var/log/syslog.
>   
> http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer
>  
> 
> 
> On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake  > wrote:
> Greg,
> 
> where did you see the OOM log as shown in this mail thread? In my case none 
> of the TaskManagers nor JobManger reports an error like this.
> 
> On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan  > wrote:
> These symptoms sounds similar to what I was experiencing in the following 
> thread. Flink can have some unexpected memory usage which can result in an 
> OOM kill by the kernel, and this becomes more pronounced as the cluster size 
> grows.
>   https://www.mail-archive.com/dev@flink.apache.org/msg06346.html 
> 
> 
> On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake  > wrote:
> I checked, but JVMs didn't crash. No puppet or other services like that.
> 
> One thing I found is that things work OK when I have a smaller number of 
> slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. 
> Then I reduced it to 4 nodes each with 2 TMs, which worked.
> 
> 
> 
> On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger  > wrote:
> Hi,
> from the TaskManager logs, I can not see anything suspicious.
> Its a bit weird that the TaskManager logs just end, without any shutdown 
> messages. Usually the TMs log some shut down stuff when they are stopping.
> Also, if they would be still running, I would expect some error messages from 
> akka about the connection status.
> So the only thing I conclude is that one of the TMs was killed by the OS or 
> the JVM crashed. Did you check if that happened?
> 
> Do you have any service like puppet that is controlling processes?
> 
> 
> On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake  > wrote:
> I see two logs (attached), but there's only 1 TaskManger process. Also, the 
> Web console says it can find only 1 TM. 
> 
> However, I see this part in JM log, which shows there was a second TM at one 
> point, but it was unregistered. Any thoughts?
> 
> --
> 
> - Registered TaskManager at j-002 
> (akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
> ) as 
> 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. 
> Current number of alive task slots is 12.
> 
> 2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@172.16.0.2:42888 
> ] has failed, address is now gated for [5000] 
> ms. Reason is: [Disassociated].
> 
> 2016-07-07 11:32:42,722 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at 
> j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager 
> ) as 
> 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. 
> Current number of alive task slots is 24.
> 
> 2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable 
> remote address [akka.tcp://flink@172.16.0.2:42888 
> ]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: 
> Connection refused: /172.16.0.2:42888 
> 
> 2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager 
> - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
>  terminated.
> 2016-07-07 11:33:15,320 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager 
> akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
> . Number of registered task 
> managers 1. Number of available slots 12.
> 
> 
> On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi  > wrote:
> No that should suffice. Can you check whether there are any task
> manager logs for the second TM on that machine
> (taskmanager-X-j-011.log where X is the TM 

Re: Optimizations not performed - please confirm

2016-06-29 Thread Ovidiu-Cristian MARCU
Thank you, Aljoscha!
I received a similar update from Fabian, only now I see the user list was not 
in CC.

Fabian::The optimizer hasn’t been touched (except for bugfixes and new 
operators) for quite some time.
These limitations are still present and I don’t expect them to be removed 
anytime soon. IMO, it is more likely that certain optimizations like join 
reordering will be done for Table API / SQL queries by the Calcite optimizer 
and pushed through the Flink Dataset optimizer.

I agree, for join reordering optimisations it makes sense to rely on Calcite.
My goal is to understand how current documentation correlates to the Flink’s 
framework status.

I've did an experimental study where I compared Flink and Spark for many 
workloads at very large scale (I’ll share the results soon) and I would like to 
develop a few ideas on top of Flink (from the results Flink is the winner in 
most of the use cases and it is our choice for the platform on which to develop 
and grow).

My interest is in understanding more about Flink today. I am familiar with most 
of the papers written, I am watching the documentation also.
I am looking at the DataSet API, runtime and current architecture.

Best,
Ovidiu

> On 29 Jun 2016, at 17:27, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> I think this document is still up-to-date since not much was done in these 
> parts of the code for the 1.0 release and after that.
> 
> Maybe Timo can give some insights into what optimizations are done in the 
> Table API/SQL that will be be released in an updated version in 1.1.
> 
> Cheers,
> Aljoscha
> 
> +Timo, Explicitly adding Timo
> 
> On Tue, 28 Jun 2016 at 21:41 Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> The optimizer internals described in this document [1] are probably not 
> up-to-date.
> Can you please confirm if this is still valid:
> 
> “The following optimizations are not performed
> Join reordering (or operator reordering in general): Joins / Filters / 
> Reducers are not re-ordered in Flink. This is a high opportunity 
> optimization, but with high risk in the absence of good estimates about the 
> data characteristics. Flink is not doing these optimizations at this point.
> Index vs. Table Scan selection: In Flink, all data sources are always 
> scanned. The data source (the input format) may apply clever mechanism to not 
> scan all the data, but pre-select and project. Examples are the RCFile / 
> ORCFile / Parquet input formats."
> Any update of this page will be very helpful.
> 
> Thank you.
> 
> Best,
> Ovidiu
> [1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals 
> <https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals>


Optimizations not performed - please confirm

2016-06-28 Thread Ovidiu-Cristian MARCU
Hi,

The optimizer internals described in this document [1] are probably not 
up-to-date.
Can you please confirm if this is still valid:

“The following optimizations are not performed
Join reordering (or operator reordering in general): Joins / Filters / Reducers 
are not re-ordered in Flink. This is a high opportunity optimization, but with 
high risk in the absence of good estimates about the data characteristics. 
Flink is not doing these optimizations at this point.
Index vs. Table Scan selection: In Flink, all data sources are always scanned. 
The data source (the input format) may apply clever mechanism to not scan all 
the data, but pre-select and project. Examples are the RCFile / ORCFile / 
Parquet input formats."
Any update of this page will be very helpful.

Thank you.

Best,
Ovidiu
[1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals 


Re: Flink Version 1.1

2016-05-18 Thread Ovidiu-Cristian MARCU
Hi

We are also very interested on the SQL (SQL on Streaming) future support in the 
next release (even if it is partial work that works :) )

Thank you!

Best,
Ovidiu

> On 18 May 2016, at 14:42, Stephan Ewen  wrote:
> 
> Hi!
> 
> That question is coming up more and more.
> I think we should start the public discussion about the 1.1 release planning, 
> scope, and release manager in the next days.
> 
> Stephan
> 
> 
> On Wed, May 18, 2016 at 1:53 PM, simon peyer  > wrote:
> Hi Marton
> 
> Thanks for the answer.
> I'm looking for the Session Windows  feature described in 
> http://data-artisans.com/session-windowing-in-flink/ 
> .
> I think its already available in the snapshot version, but I would like to 
> have it in a stable version.
> This is why I'm interested in the release date of flink 1.1
> 
> Thanks
> --Simon
> 
> 
>> On 18 May 2016, at 13:45, Márton Balassi > > wrote:
>> 
>> Hey Simon,
>> 
>> The general policy is that the community aims to release a major version 
>> every 3 months. That would mean the next release coming out in early to mid 
>> June. I am not aware of the 1.1.0 schedule yet, but it is about time to 
>> start the discussion on that.
>> 
>> Are you looking for a specific feature that 1.1.0 would bring to the table?
>> 
>> Best,
>> 
>> Marton
>> 
>> On Wed, May 18, 2016 at 1:28 PM, simon peyer > > wrote:
>> Hi guys
>> 
>> When are you expecting to release a stable version of flink 1.1?
>> 
>> --Cheers
>> Simon
>> 
> 
> 



What / Where / When / How questions in Spark 2.0 ?

2016-05-16 Thread Ovidiu-Cristian MARCU
Hi,

We can see in [2] many interesting (and expected!) improvements (promises) like 
extended SQL support, unified API (DataFrames, DataSets), improved engine 
(Tungsten relates to ideas from modern compilers and MPP databases - similar to 
Flink [3]), structured streaming etc. It seems we somehow assist at a smart 
unification of Big Data analytics (Spark, Flink - best of two worlds)!

How does Spark respond to the missing What/Where/When/How questions 
(capabilities) highlighted in the unified model Beam [1] ?

Best,
Ovidiu

[1] 
https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective
 

[2] 
https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
 

[3] http://stratosphere.eu/project/publications/ 





Hash tables - joins, cogroup, deltaIteration

2016-04-18 Thread Ovidiu-Cristian MARCU
Hi,

Can you please confirm if there is any update regarding the hash tables use 
cases, as in [1] it is specified that Hash tables are used in Joins and for the 
Solution set in iterations (pending work to use them for grouping/aggregations)?

I am interested in the pending work progress and also if you consider to add an 
implementation where Joins and Solution Set in delta iterations (and CoGroup) 
can rely on a hybrid implementation where the engine can use also disk if not 
enough memory available when working with these hash tables.
 
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 
 
Memory Management (Batch API)

Thanks

Best,
Ovidiu

Re: Flink performance pre-packaged vs. self-compiled

2016-04-14 Thread Ovidiu-Cristian MARCU
Hi,

Your assumption may be incorrect related to the TeraSort use case for 
eastcirclek's implementation. 
How many time did you run your program?
It would be helpful to give more details about your experiment, in terms of 
configuration, dataset size.

Best,
Ovidiu

> On 14 Apr 2016, at 17:14, Robert Schmidtke  wrote:
> 
> I have tried multiple Maven and Scala Versions, but to no avail. I can't seem 
> to achieve performance of the downloaded archive. I am stumped by this and 
> will need to do more experiments when I have more time.
> 
> Robert
> 
> On Thu, Apr 14, 2016 at 1:13 PM, Robert Schmidtke  > wrote:
> Hi Robert,
> 
> thanks for the hint! Looks like something I could have figured out myself 
> -.-" I'll let you know if I find something.
> 
> Robert
> 
> On Thu, Apr 14, 2016 at 1:06 PM, Robert Metzger  > wrote:
> Hi Robert,
> 
> check out the tools/create_release_files.sh file in the source tree. There 
> you can see how we are building the release binaries.
> It would be quite interesting to find out what caused the performance 
> difference.
> 
> On Wed, Apr 13, 2016 at 5:03 PM, Robert Schmidtke  > wrote:
> Hi everyone,
> 
> I'm using Flink 0.10.2 for some benchmarks and had to add some small changes 
> to Flink, which led me to compiling and running it myself. This is when I 
> noticed a performance difference in the pre-packaged Flink version that I 
> downloaded from the web 
> (http://archive.apache.org/dist/flink/flink-0.10.2/flink-0.10.2-bin-hadoop27.tgz
>  
> )
>  versus the form of the release-0.10 branch I built myself (mvn 
> -Dhadoop.version=2.7.1 -Dscala-2.11 -DskipTests -Drat.skip=true clean install 
> // mvn version 3.0.4).
> 
> I ran some version of TeraSort (https://github.com/eastcirclek/terasort 
> ) and I noticed that the 
> pre-packaged version of Flink performs 10-20% better than the one I built 
> myself (the only tweaks I mead are in the CliFrontend after the Job has 
> finished running, so I would rule out bad programming on my side).
> 
> Has anyone come across this before? Or could you provide me with clearer 
> build instructions in order to reproduce the downloadable archive as closely 
> as possible? Thanks in advance!
> 
> Robert
> 
> -- 
> My GPG Key ID: 336E2680
> 
> 
> 
> 
> -- 
> My GPG Key ID: 336E2680
> 
> 
> 
> -- 
> My GPG Key ID: 336E2680



Re: off-heap size feature request

2016-03-19 Thread Ovidiu-Cristian MARCU
Thanks!
I will try this one: taskmanager.memory.size. So I should expect this will be 
the off-heap memory size, right?
I am using taskmanager.heap.mb=some value, taskmanager.memory.off-heap: true
Memory usage goes up to 99%.

The documentation is confusing:

taskmanager.memory.size: The amount of memory (in megabytes) that the task 
manager reserves on the JVM’s heap space for sorting, hash tables, and caching 
of intermediate results. If unspecified (-1), the memory manager will take a 
fixed ratio of the heap memory available to the JVM, as specified by 
taskmanager.memory.fraction.  [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory>

Best,
Ovidiu

> On 16 Mar 2016, at 12:13, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ovidiu,
> 
> the parameters to configure the amount of managed memory 
> (taskmanager.memory.size, taskmanager.memory.fraction) are valid for on and 
> off-heap memory. 
> 
> Have you tried these parameters and didn't they work as expected?
> 
> Best, Fabian
> 
> 
> 2016-03-16 11:43 GMT+01:00 Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>>:
> Hi,
> 
> Is it possible to add a parameter off-heap.size for the task manager off-heap 
> memory [1]?
> 
> It is not possible to limit the off-heap memory size, at least I found 
> nothing in the documentation.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory>
> 
> Best,
> Ovidiu
> 



Re: off-heap size feature request

2016-03-18 Thread Ovidiu-Cristian MARCU
Updating the documentation will partially remove the confusion.
Heap memory is JVM managed while off-heap is outside JVM and I would define its 
size separately.

In my case, I see memory usage going up to full utilisation on a node where 
full RAM available is 128GB in the following conditions:
taskmanager.heap.mb = 12*1024
taskmanager.memory.off-heap: true
taskmanager.memory.fraction: 0.6

So the memory fraction will be applied to the free memory and in the absence of 
the taskmanager.memory.size parameter, the fraction is defining the off-heap 
size.
Everything else in the memory usage is network buffers I guess 
(taskmanager.network.numberOfBuffers * taskmanager.network.bufferSizeInBytes), 
along with OS cache.

I would like parameters like:
taskmanager.off-heap.size or taskmanager.off-heap.fraction
taskmanager.off-heap.enabled true or false
and same for heap.

Thanks for clarification.

Best,
Ovidiu
 
> On 16 Mar 2016, at 13:43, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Oh yes, good point! The documentation needs to be updated to something like:
> 
> "The amount of memory (in megabytes) that the task manager reserves on-heap 
> or off-heap (depending on taskmanager.memory.off-heap) for sorting, hash 
> tables, and caching of intermediate results. If unspecified (-1), the memory 
> manager will take a fixed ratio with respect to the size of the task manager 
> JVM as specified by taskmanager.memory.fraction."
> 
> I will open a JIRA to fix the documentation.
> 
> If you do the following
> taskmanager.heap.mb: 4096
> taskmanager.memory.size: 2048
> taskmanager.memory.off-heap: true
> 
> The TaskManager will be started with a 2GB (4096MB-2048MB) JVM and allocate 
> (2048MB) as off-heap memory. Hence, the overall process size will be roughly 
> 4GB. The parameter name "taskmanager.heap.mb" is a bit confusing in case of 
> off-heap memory usage, because it does not define this size of the heap but 
> of the overall process.
> 
> Hope this helps,
> Fabian
> 
> 
> 
> 2016-03-16 12:40 GMT+01:00 Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>>:
> Thanks!
> I will try this one: taskmanager.memory.size. So I should expect this will be 
> the off-heap memory size, right?
> I am using taskmanager.heap.mb=some value, taskmanager.memory.off-heap: true
> Memory usage goes up to 99%.
> 
> The documentation is confusing:
> 
> taskmanager.memory.size: The amount of memory (in megabytes) that the task 
> manager reserves on the JVM’s heap space for sorting, hash tables, and 
> caching of intermediate results. If unspecified (-1), the memory manager will 
> take a fixed ratio of the heap memory available to the JVM, as specified by 
> taskmanager.memory.fraction.  [1]
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory>
> 
> Best,
> Ovidiu
> 
>> On 16 Mar 2016, at 12:13, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Ovidiu,
>> 
>> the parameters to configure the amount of managed memory 
>> (taskmanager.memory.size, taskmanager.memory.fraction) are valid for on and 
>> off-heap memory. 
>> 
>> Have you tried these parameters and didn't they work as expected?
>> 
>> Best, Fabian
>> 
>> 
>> 2016-03-16 11:43 GMT+01:00 Ovidiu-Cristian MARCU 
>> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>>:
>> Hi,
>> 
>> Is it possible to add a parameter off-heap.size for the task manager 
>> off-heap memory [1]?
>> 
>> It is not possible to limit the off-heap memory size, at least I found 
>> nothing in the documentation.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory>
>> 
>> Best,
>> Ovidiu
>> 
> 
> 



Not enough free slots to run the job

2016-03-18 Thread Ovidiu-Cristian MARCU
Hi,

For the situation where a program specify a maximum parallelism (so it is 
supposed to use all available task slots) we can have the possibility that one 
of the task managers is not registered for various reasons.
In this case the job will fail for not enough free slots to run the job.

For me this means the scheduler has a limitation to work by statically assign 
tasks to the task slots the job is configured.

Instead I would like to be able to specify a minimum parallelism of a job but 
also the possibility to dynamically use more task slots if additional task 
slots can be used.
Another use case will be that if during the execution of a job we lose one node 
so some task slots, if the minimum parallelism is still ensured, the job should 
recover and continue its execution instead of just failing.

Is it possible to make such changes?

Best,
Ovidiu

off-heap size feature request

2016-03-16 Thread Ovidiu-Cristian MARCU
Hi,

Is it possible to add a parameter off-heap.size for the task manager off-heap 
memory [1]?

It is not possible to limit the off-heap memory size, at least I found nothing 
in the documentation.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory
 


Best,
Ovidiu

Re: Memory ran out PageRank

2016-03-16 Thread Ovidiu-Cristian MARCU
Hi,

Regarding the solution set going out of memory, I would like an issue to be 
filled against it.

Looking into code for CompactingHashTable I see

The hash table is internally divided into two parts: The hash index, and the 
partition buffers that store the actual records. When records are inserted or 
updated, the hash table appends the records to its corresponding partition, and 
inserts or updates the entry in the hash index. In the case that the hash table 
runs out of memory, it compacts a partition by walking through the hash index 
and copying all reachable elements into a fresh partition. After that, it 
releases the memory of the partition to compact.

It is not clear the expected behaviour when the hash table runs out of memory.

If by contrast Spark is working on RDDs and they can be cached in memory or 
spilled to disk, something similar could be done for all the components 
currently built in memory and not being spilled to disk to avoid OutOfMemory.
What do you think?

Best,
Ovidiu

> On 14 Mar 2016, at 18:48, Ufuk Celebi <u...@apache.org> wrote:
> 
> Probably the limitation is that the number of keys is different in the
> real and the synthetic data set respectively. Can you confirm this?
> 
> The solution set for delta iterations is currently implemented as an
> in-memory hash table that works on managed memory segments, but is not
> spillable.
> 
> – Ufuk
> 
> On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU
> <ovidiu-cristian.ma...@inria.fr> wrote:
>> 
>> This problem is surprising as I was able to run PR and CC on a larger graph 
>> (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran 
>> out of memory; regarding configuration (memory and parallelism, other 
>> internals) I was using the same.
>> There is some limitation somewhere I will try to understand more what is 
>> happening.
>> 
>> Best,
>> Ovidiu
>> 
>>> On 14 Mar 2016, at 18:06, Martin Junghanns <m.jungha...@mailbox.org> wrote:
>>> 
>>> Hi,
>>> 
>>> I understand the confusion. So far, I did not run into the problem, but I 
>>> think this needs to be adressed as all our graph processing abstractions 
>>> are implemented on top of the delta iteration.
>>> 
>>> According to the previous mailing list discussion, the problem is with the 
>>> solution set and its missing ability to spill.
>>> 
>>> If this is the still the case, we should open an issue for that. Any 
>>> further opinions on that?
>>> 
>>> Cheers,
>>> Martin
>>> 
>>> 
>>> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote:
>>>> Thank you for this alternative.
>>>> I don’t understand how the workaround will fix this on systems with 
>>>> limited memory and maybe larger graph.
>>>> 
>>>> Running Connected Components on the same graph gives the same problem.
>>>> 
>>>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>>>> numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow 
>>>> segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 
>>>> 65601536 Message: Index: 32, Size: 31
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>>>>at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>>>>at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>>>>at 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Best,
>>>> Ovidiu
>>>> 
>>>>> On 14 Mar 2016, at 17:36, Martin Junghanns <m.jungha...@mailbox.org> 
>>>>> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> I think this is the same issue we had before on the list [1]. Stephan 
>>>>> recommended the followi

Re: Memory ran out PageRank

2016-03-14 Thread Ovidiu-Cristian MARCU
Correction: successfully CC I am running is on top of your friend, Spark :)

Best,
Ovidiu
> On 14 Mar 2016, at 20:38, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr> wrote:
> 
> Yes, largely different. I was expecting for the solution set to be spillable.
> This is somehow very hard limitation, the layout of the data makes the 
> difference.
> 
> By contract, I am able to run successfully CC on the synthetic data but RDDs 
> are persisted in memory or on disk.
> 
> Best,
> Ovidiu
> 
>> On 14 Mar 2016, at 18:48, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> Probably the limitation is that the number of keys is different in the
>> real and the synthetic data set respectively. Can you confirm this?
>> 
>> The solution set for delta iterations is currently implemented as an
>> in-memory hash table that works on managed memory segments, but is not
>> spillable.
>> 
>> – Ufuk
>> 
>> On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU
>> <ovidiu-cristian.ma...@inria.fr> wrote:
>>> 
>>> This problem is surprising as I was able to run PR and CC on a larger graph 
>>> (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran 
>>> out of memory; regarding configuration (memory and parallelism, other 
>>> internals) I was using the same.
>>> There is some limitation somewhere I will try to understand more what is 
>>> happening.
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 14 Mar 2016, at 18:06, Martin Junghanns <m.jungha...@mailbox.org> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I understand the confusion. So far, I did not run into the problem, but I 
>>>> think this needs to be adressed as all our graph processing abstractions 
>>>> are implemented on top of the delta iteration.
>>>> 
>>>> According to the previous mailing list discussion, the problem is with the 
>>>> solution set and its missing ability to spill.
>>>> 
>>>> If this is the still the case, we should open an issue for that. Any 
>>>> further opinions on that?
>>>> 
>>>> Cheers,
>>>> Martin
>>>> 
>>>> 
>>>> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote:
>>>>> Thank you for this alternative.
>>>>> I don’t understand how the workaround will fix this on systems with 
>>>>> limited memory and maybe larger graph.
>>>>> 
>>>>> Running Connected Components on the same graph gives the same problem.
>>>>> 
>>>>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
>>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>>>>> numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow 
>>>>> segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 
>>>>> 65601536 Message: Index: 32, Size: 31
>>>>>   at 
>>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>>>>>   at 
>>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>>>>>   at 
>>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>>>>>   at 
>>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>>>>>   at 
>>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>>>>>   at 
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>   at java.lang.Thread.run(Thread.java:745)
>>>>> 
>>>>> Best,
>>>>> Ovidiu
>>>>> 
>>>>>> On 14 Mar 2016, at 17:36, Martin Junghanns <m.jungha...@mailbox.org> 
>>>>>> wrote:
>>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> I think this is the same issue we had before on the list [1]. Stephan 
>>>>>> recommended the following workaround:
>>>>>> 
>>>>>>> A possible workaround is to use the option 
>>>>>>> "setSolutionSetUnman

Re: Memory ran out PageRank

2016-03-14 Thread Ovidiu-Cristian MARCU
Yes, largely different. I was expecting for the solution set to be spillable.
This is somehow very hard limitation, the layout of the data makes the 
difference.

By contract, I am able to run successfully CC on the synthetic data but RDDs 
are persisted in memory or on disk.

Best,
Ovidiu

> On 14 Mar 2016, at 18:48, Ufuk Celebi <u...@apache.org> wrote:
> 
> Probably the limitation is that the number of keys is different in the
> real and the synthetic data set respectively. Can you confirm this?
> 
> The solution set for delta iterations is currently implemented as an
> in-memory hash table that works on managed memory segments, but is not
> spillable.
> 
> – Ufuk
> 
> On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU
> <ovidiu-cristian.ma...@inria.fr> wrote:
>> 
>> This problem is surprising as I was able to run PR and CC on a larger graph 
>> (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran 
>> out of memory; regarding configuration (memory and parallelism, other 
>> internals) I was using the same.
>> There is some limitation somewhere I will try to understand more what is 
>> happening.
>> 
>> Best,
>> Ovidiu
>> 
>>> On 14 Mar 2016, at 18:06, Martin Junghanns <m.jungha...@mailbox.org> wrote:
>>> 
>>> Hi,
>>> 
>>> I understand the confusion. So far, I did not run into the problem, but I 
>>> think this needs to be adressed as all our graph processing abstractions 
>>> are implemented on top of the delta iteration.
>>> 
>>> According to the previous mailing list discussion, the problem is with the 
>>> solution set and its missing ability to spill.
>>> 
>>> If this is the still the case, we should open an issue for that. Any 
>>> further opinions on that?
>>> 
>>> Cheers,
>>> Martin
>>> 
>>> 
>>> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote:
>>>> Thank you for this alternative.
>>>> I don’t understand how the workaround will fix this on systems with 
>>>> limited memory and maybe larger graph.
>>>> 
>>>> Running Connected Components on the same graph gives the same problem.
>>>> 
>>>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>>>> numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow 
>>>> segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 
>>>> 65601536 Message: Index: 32, Size: 31
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>>>>at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>>>>at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>>>>at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>>>>at 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Best,
>>>> Ovidiu
>>>> 
>>>>> On 14 Mar 2016, at 17:36, Martin Junghanns <m.jungha...@mailbox.org> 
>>>>> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> I think this is the same issue we had before on the list [1]. Stephan 
>>>>> recommended the following workaround:
>>>>> 
>>>>>> A possible workaround is to use the option 
>>>>>> "setSolutionSetUnmanaged(true)"
>>>>>> on the iteration. That will eliminate the fragmentation issue, at least.
>>>>> 
>>>>> Unfortunately, you cannot set this when using graph.run(new PageRank(...))
>>>>> 
>>>>> I created a Gist which shows you how to set this using PageRank
>>>>> 
>>>>> https://gist.github.com/s1ck/801a8ef97ce374b358df
>>>>> 
>>>>&

Re: Memory ran out PageRank

2016-03-14 Thread Ovidiu-Cristian MARCU

This problem is surprising as I was able to run PR and CC on a larger graph 
(2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran out 
of memory; regarding configuration (memory and parallelism, other internals) I 
was using the same.
There is some limitation somewhere I will try to understand more what is 
happening.

Best,
Ovidiu

> On 14 Mar 2016, at 18:06, Martin Junghanns <m.jungha...@mailbox.org> wrote:
> 
> Hi,
> 
> I understand the confusion. So far, I did not run into the problem, but I 
> think this needs to be adressed as all our graph processing abstractions are 
> implemented on top of the delta iteration.
> 
> According to the previous mailing list discussion, the problem is with the 
> solution set and its missing ability to spill.
> 
> If this is the still the case, we should open an issue for that. Any further 
> opinions on that?
> 
> Cheers,
> Martin
> 
> 
> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote:
>> Thank you for this alternative.
>> I don’t understand how the workaround will fix this on systems with limited 
>> memory and maybe larger graph.
>> 
>> Running Connected Components on the same graph gives the same problem.
>> 
>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>> numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow 
>> segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 
>> 65601536 Message: Index: 32, Size: 31
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>> at 
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>> at 
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>> at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> Best,
>> Ovidiu
>> 
>>> On 14 Mar 2016, at 17:36, Martin Junghanns <m.jungha...@mailbox.org> wrote:
>>> 
>>> Hi
>>> 
>>> I think this is the same issue we had before on the list [1]. Stephan 
>>> recommended the following workaround:
>>> 
>>>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)"
>>>> on the iteration. That will eliminate the fragmentation issue, at least.
>>> 
>>> Unfortunately, you cannot set this when using graph.run(new PageRank(...))
>>> 
>>> I created a Gist which shows you how to set this using PageRank
>>> 
>>> https://gist.github.com/s1ck/801a8ef97ce374b358df
>>> 
>>> Please let us know if it worked out for you.
>>> 
>>> Cheers,
>>> Martin
>>> 
>>> [1] 
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E
>>> 
>>> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote:
>>>> Hi,
>>>> 
>>>> While running PageRank on a synthetic graph I run into this problem:
>>>> Any advice on how should I proceed to overcome this memory issue?
>>>> 
>>>> IterationHead(Vertex-centric iteration 
>>>> (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | 
>>>> org.apache.flink.graph.library.PageRank$RankMesseng$
>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>>>> numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow 
>>>> segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 
>>>> 50659328 Message: Index: 25, Size: 24
>>>> at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>>>> at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>>>> at 
>>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>>>> at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>>>> at 
>>>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>>>> at 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Thanks!
>>>> 
>>>> Best,
>>>> Ovidiu
>>>> 
>> 
>> 



Re: Memory ran out PageRank

2016-03-14 Thread Ovidiu-Cristian MARCU
Thank you for this alternative.
I don’t understand how the workaround will fix this on systems with limited 
memory and maybe larger graph.

Running Connected Components on the same graph gives the same problem.

IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 
32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 
bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: 
Index: 32, Size: 31
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
at 
org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
at 
org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

Best,
Ovidiu

> On 14 Mar 2016, at 17:36, Martin Junghanns <m.jungha...@mailbox.org> wrote:
> 
> Hi
> 
> I think this is the same issue we had before on the list [1]. Stephan 
> recommended the following workaround:
> 
>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)"
>> on the iteration. That will eliminate the fragmentation issue, at least.
> 
> Unfortunately, you cannot set this when using graph.run(new PageRank(...))
> 
> I created a Gist which shows you how to set this using PageRank
> 
> https://gist.github.com/s1ck/801a8ef97ce374b358df
> 
> Please let us know if it worked out for you.
> 
> Cheers,
> Martin
> 
> [1] 
> http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E
> 
> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote:
>> Hi,
>> 
>> While running PageRank on a synthetic graph I run into this problem:
>> Any advice on how should I proceed to overcome this memory issue?
>> 
>> IterationHead(Vertex-centric iteration 
>> (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | 
>> org.apache.flink.graph.library.PageRank$RankMesseng$
>> java.lang.RuntimeException: Memory ran out. Compaction failed. 
>> numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow 
>> segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 
>> 50659328 Message: Index: 25, Size: 24
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>> at 
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>> at 
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>> at 
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>> at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> Thanks!
>> 
>> Best,
>> Ovidiu
>> 



Memory ran out PageRank

2016-03-14 Thread Ovidiu-Cristian MARCU
Hi,

While running PageRank on a synthetic graph I run into this problem: 
Any advice on how should I proceed to overcome this memory issue?

IterationHead(Vertex-centric iteration 
(org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | 
org.apache.flink.graph.library.PageRank$RankMesseng$
java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 
32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 
bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: 
Index: 25, Size: 24
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
at 
org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
at 
org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

Thanks!

Best,
Ovidiu

Re: Batch Processing Fault Tolerance (DataSet API)

2016-02-22 Thread Ovidiu-Cristian MARCU
Thank you, Till!

The current (in progress) implementation is considering also the problem 
related to losing the task's slots of the failed node(s), something related to 
[2] ?

[2] https://issues.apache.org/jira/browse/FLINK-3047

Best,
Ovidiu

> On 22 Feb 2016, at 18:13, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> at the moment Flink's batch fault tolerance restarts the whole job in case of 
> a failure. However, parts of the logic to do partial backtracking such as 
> intermediate result partitions and the backtracking algorithm are already 
> implemented or exist as a PR [1]. So we hope to complete the partial 
> backtracking soon.
> 
> [1] https://github.com/apache/flink/pull/640 
> <https://github.com/apache/flink/pull/640>
> 
> Cheers,
> Till
> 
> On Mon, Feb 22, 2016 at 6:00 PM, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi
> 
> In case of failure of a node what does it mean 'Fault tolerance for programs 
> in the DataSet API works by retrying failed executions’ [1] ?
> -work already done by the rest of the nodes is not lost, only work of the 
> lost node is recomputed, job execution will continue
> or
> -entire job execution is retried
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/fault_tolerance.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/fault_tolerance.html>
> 
> Best,
> Ovidiu 
> 



Batch Processing Fault Tolerance (DataSet API)

2016-02-22 Thread Ovidiu-Cristian MARCU
Hi

In case of failure of a node what does it mean 'Fault tolerance for programs in 
the DataSet API works by retrying failed executions’ [1] ?
-work already done by the rest of the nodes is not lost, only work of the lost 
node is recomputed, job execution will continue
or
-entire job execution is retried

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/fault_tolerance.html
 


Best,
Ovidiu 

Apache Flink Web Dashboard - Completed Job history

2015-12-16 Thread Ovidiu-Cristian MARCU
Hi

If I restart the Flink I don’t see anymore the history of the completed jobs.
Is this a missing feature or what should I do to see the completed job list 
history?

Best regards,
Ovidiu

Re: flink connectors

2015-11-27 Thread Ovidiu-Cristian MARCU
Hi,

The main question here is why the distribution release doesn’t contain the 
connector dependencies.
It is fair to say that it does not have to (which connector to include or all). 
So just like Spark does, Flink offers binary distribution for hadoop only 
without considering other dependencies.

The thing to consider is if it may help, on the flink.apache.org download page, 
to offer a customised page in order to let the user choose also a dependency 
(connector) to be included in the binary to be downloaded.

Best regards,
Ovidiu



> On 27 Nov 2015, at 14:52, Matthias J. Sax  wrote:
> 
> If I understand the question right, you just want to download the jar
> manually?
> 
> Just go to the maven repository website and download the jar from there.
> 
> 
> -Matthias
> 
> On 11/27/2015 02:49 PM, Robert Metzger wrote:
>> Maybe there is a maven mirror you can access from your network?
>> 
>> This site contains a list of some mirrors
>> http://stackoverflow.com/questions/5233610/what-are-the-official-mirrors-of-the-maven-central-repository
>> You don't have to use the maven tool, you can also manually browse for
>> the jars and download what you need.
>> 
>> 
>> On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske > > wrote:
>> 
>>You can always build Flink from source, but apart from that I am not
>>aware of an alternative.
>> 
>>2015-11-27 14:42 GMT+01:00 Radu Tudoran >>:
>> 
>>Hi,
>> 
>>__ __
>> 
>>Is there any alternative to avoiding maven?
>> 
>>That is why I was curious if there is a binary distribution of
>>this available for download directly
>> 
>>__ __
>> 
>>Dr. Radu Tudoran
>> 
>>Research Engineer
>> 
>>IT R Division
>> 
>>__ __
>> 
>>cid:image007.jpg@01CD52EB.AD060EE0
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> 
>>European Research Center
>> 
>>Riesstrasse 25, 80992 München
>> 
>>__ __
>> 
>>E-mail: _radu.tudo...@huawei.com
>>_
>> 
>>Mobile: +49 15209084330 
>> 
>>Telephone: +49 891588344173 
>> 
>>__ __
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>
>>Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
>>56063,
>>Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> 
>>This e-mail and its attachments contain confidential information
>>from HUAWEI, which is intended only for the person or entity
>>whose address is listed above. Any use of the information
>>contained herein in any way (including, but not limited to,
>>total or partial disclosure, reproduction, or dissemination) by
>>persons other than the intended recipient(s) is prohibited. If
>>you receive this e-mail in error, please notify the sender by
>>phone or email immediately and delete it!
>> 
>>__ __
>> 
>>*From:*Fabian Hueske [mailto:fhue...@gmail.com
>>]
>>*Sent:* Friday, November 27, 2015 2:41 PM
>>*To:* user@flink.apache.org 
>>*Subject:* Re: flink connectors
>> 
>>__ __
>> 
>>Hi Radu,
>> 
>>the connectors are available in Maven Central.
>> 
>>Just add them as a dependency in your project and they will be
>>fetched and included.
>> 
>>Best, Fabian
>> 
>>__ __
>> 
>>2015-11-27 14:38 GMT+01:00 Radu Tudoran >>:
>> 
>>Hi,
>> 
>> 
>> 
>>I was trying to use flink connectors. However, when I tried to
>>import this
>> 
>> 
>> 
>>import org.apache.flink.streaming.connectors.*;
>> 
>> 
>> 
>>I saw that they are not present in the binary distribution as
>>downloaded from website (flink-dist-0.10.0.jar). Is this
>>intentionally? Is there also a binary distribution that contains
>>these connectors?
>> 
>> 
>> 
>>Regards,
>> 
>> 
>> 
>>Dr. Radu Tudoran
>> 
>>Research Engineer
>> 
>>IT R Division
>> 
>> 
>> 
>>cid:image007.jpg@01CD52EB.AD060EE0
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> 
>>European Research Center
>> 
>>Riesstrasse 25, 80992 München
>> 
>> 

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi Robert,

In this case, if both the standalone and yarn modes will run jobs if they have 
resources, is it better to rely on which one?
I would be interested in a feature like the dynamic resource allocation with a 
fair scheduler that Spark has implemented.
If you guys will consider this feature, I will be glad to join as a contributor 
also.

Best regards,
Ovidiu
 

> On 20 Nov 2015, at 14:53, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> good choice on your research topic ;)
> 
> I think doing some hands on experiments will help you to understand much 
> better how Flink works and what you can do with it.
> 
> If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have enough 
> resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it when 
> there are enough resources.
> 
> That's not right. The YARN session and standalone cluster mode are basically 
> the same.
> Both the YARN session and the cluster mode will run job in parallel if there 
> are not enough resources and they both will reject jobs if not enough 
> resources are there.
> 
> 
> My point on scheduling: 
> If I have an installation (Flink over Yarn for example) and in my cluster I 
> have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to 
> schedule jobs concurrently. My options right now, if I understand correctly, 
> is to either wait for the current job to finish (assuming it has acquired all 
> the available resources) or to stop the current job, in case I have other 
> jobs with higher priorities. This could be related also to the resource 
> elasticity you mentioned.
> 
> Yes, resource elasticity in Flink will mitigate such issues. We would be able 
> to respond to YARN's preemption requests if jobs with higher priorities are 
> requesting additional resources. 
> 
> On Fri, Nov 20, 2015 at 2:07 PM, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Thank you, Robert!
> 
> My research interest includes Flink (I am a PhD student, BigStorage EU 
> project, Inria Rennes) so I am currently preparing some experiments in order 
> to understand better how it works.
> 
> If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have enough 
> resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it when 
> there are enough resources.
> 
> My point on scheduling: 
> If I have an installation (Flink over Yarn for example) and in my cluster I 
> have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to 
> schedule jobs concurrently. My options right now, if I understand correctly, 
> is to either wait for the current job to finish (assuming it has acquired all 
> the available resources) or to stop the current job, in case I have other 
> jobs with higher priorities. This could be related also to the resource 
> elasticity you mentioned.
> 
> Best regards,
> Ovidiu
> 
>> On 20 Nov 2015, at 13:34, Robert Metzger <rmetz...@apache.org 
>> <mailto:rmetz...@apache.org>> wrote:
>> 
>> Hi,
>> I'll fix the link in the YARN documentation. Thank you for reporting the 
>> issue.
>> 
>> I'm not aware of any discussions or implementations related to the 
>> scheduling. From my experience working with users and also from the mailing 
>> list, I don't think that such features are very important.
>> Since streaming jobs usually run permanently, there is no need to queue jobs 
>> somehow.
>> For batch jobs, YARN is taking care of the resource allocation (in practice 
>> this means that the job has to wait until the required resources are 
>> available).
>> 
>> There are some discussions (and user requests) regarding resource elasticity 
>> going on and I think we'll add features for dynamically changing the size of 
>> a Flink cluster on YARN while a job is running.
>> 
>> Which features are you missing wrt to scheduling in Flink? Please let me 
>> know if there is anything blocking you from using Flink in production and 
>> we'll see what we can do.
>> 
>> Regards,
>> Robert
>> 
>> 
>> 
>> On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU 
>> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
>> wrote:
>> Hi,
>> 
>> The link to FAQ 
>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 

Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi,

I am currently interested in experimenting on Flink over Hadoop YARN.
I am documenting from the documentation we have here: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
 


There is a subsection Start Flink Session which states the following: A session 
will start all required Flink services (JobManager and TaskManagers) so that 
you can submit programs to the cluster. Note that you can run multiple programs 
per session.

Can you be more precise regarding the multiple programs per session? If I 
submit multiple programs concurently what will happen (can I?)? Maybe they will 
run in a FIFO fashion or what should I expect?

The internals section specify that users can execute multiple Flink Yarn 
sessions in parallel. This is great, this invites to static partitioning of 
resources in order to run multiple applications concurrently. Do you support a 
fair scheduler similar to what Spark claims it has?  

There is FAQ section 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
) 
resource that is missing, can this be updated?

Thank you.

Best regards,
Ovidiu
 

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi,

The link to FAQ 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
<https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html>) is on 
the yarn setup 0.10 documentation page 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html>)
 described in this sentence: If you have troubles using the Flink YARN client, 
have a look in the FAQ section 
<https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html>.

Is the scheduling features considered for next releases?

Thank you.
Best regards,
Ovidiu

> On 20 Nov 2015, at 11:59, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> you can submit multiple programs to a running Flink cluster (or a YARN 
> session). Flink does currently not have any queuing mechanism.
> The JobManager will reject a program if there are not enough free resources 
> for it. If there are enough resources for multiple programs, they'll run 
> concurrently.
> Note that Flink is not starting separate JVMs for the programs, so if one 
> program is doing a System.exit(0), it is killing the entire JVM, including 
> other running programs.
> 
> You can start as many YARN sessions (or single jobs to YARN) as you have 
> resources available on the cluster. The resource allocation is up to the 
> scheduler you've configured in YARN.
> 
> In general, we recommend to start a YARN session per program. You can also 
> directly submit a Flink program to YARN.
> 
> Where did you find the link to the FAQ? The link on the front page is 
> working: http://flink.apache.org/faq.html <http://flink.apache.org/faq.html>
> 
> 
> 
> On Fri, Nov 20, 2015 at 11:41 AM, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> I am currently interested in experimenting on Flink over Hadoop YARN.
> I am documenting from the documentation we have here: 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html>
> 
> There is a subsection Start Flink Session which states the following: A 
> session will start all required Flink services (JobManager and TaskManagers) 
> so that you can submit programs to the cluster. Note that you can run 
> multiple programs per session.
> 
> Can you be more precise regarding the multiple programs per session? If I 
> submit multiple programs concurently what will happen (can I?)? Maybe they 
> will run in a FIFO fashion or what should I expect?
> 
> The internals section specify that users can execute multiple Flink Yarn 
> sessions in parallel. This is great, this invites to static partitioning of 
> resources in order to run multiple applications concurrently. Do you support 
> a fair scheduler similar to what Spark claims it has?  
> 
> There is FAQ section 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html>) 
> resource that is missing, can this be updated?
> 
> Thank you.
> 
> Best regards,
> Ovidiu
>  
>