Distribute DataSet to subset of nodes

2015-09-13 Thread Stefan Bunk
Hi! Following problem: I have 10 nodes on which I want to execute a flatMap operator on a DataSet. In the open method of the operator, some data is read from disk and preprocessed, which is necessary for the operator. Problem is, the data does not fit in memory on one node, however, half of the da

Re: Distribute DataSet to subset of nodes

2015-09-13 Thread Sachin Goel
Hi Stefan Just a clarification : The output corresponding to an element based on the whole data will be a union of the outputs based on the two halves. Is this what you're trying to achieve? [It appears like that since every flatMap task will independently produce outputs.] In that case, one solu

Re: Distribute DataSet to subset of nodes

2015-09-13 Thread Sachin Goel
Of course, someone else might have better ideas in re the partitioner. :) On Sep 14, 2015 1:12 AM, "Sachin Goel" wrote: > Hi Stefan > Just a clarification : The output corresponding to an element based on the > whole data will be a union of the outputs based on the two halves. Is this > what you'

Re: Distribute DataSet to subset of nodes

2015-09-14 Thread Fabian Hueske
Hi Stefan, I agree with Sachin's approach. That should be the easiest solution and would look like: env.setParallelism(10); // default is 10 DataSet data = env.read(...) // large data set DataSet smallData1 = env.read(...) // read first part of small data DataSet smallData2 = env.read(...) // re

Re: Distribute DataSet to subset of nodes

2015-09-14 Thread Stefan Bunk
Hi, actually, I am distributing my data before the program starts, without using broadcast sets. However, the approach should still work, under one condition: > DataSet mapped1 = > data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5); > DataSet mapped2 = > data.flatMap(you

Re: Distribute DataSet to subset of nodes

2015-09-14 Thread Fabian Hueske
Hi Stefan, forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals. It is possible and I can help you with that, but would recommend to use a shared filesystem suc

Re: Distribute DataSet to subset of nodes

2015-09-15 Thread Stefan Bunk
Hi Fabian, I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS s

Re: Distribute DataSet to subset of nodes

2015-09-15 Thread Fabian Hueske
Hi Stefan, the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster. I assumed t

Re: Distribute DataSet to subset of nodes

2015-09-16 Thread Stefan Bunk
Hi Fabian, the local file problem would however not exist, if I just copy both halves to all nodes, right? Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes. Now with your approach from above, I do: // helper broadcast datasets to know on which half to operate val data1stH

Re: Distribute DataSet to subset of nodes

2015-09-17 Thread Fabian Hueske
Hi Stefan, I think I have a solution for your problem :-) 1) Distribute both parts of the small data to each machine (you have done that) 2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first h

Re: Distribute DataSet to subset of nodes

2015-09-21 Thread Stefan Bunk
Hi Fabian, that sounds good, thank you. One final question: As I said earlier, this also distributes data in some unnecessary cases, say ID 4 sends data to ID 3. Is there no way to find out the ID of the current node? I guess that number is already available on the node and just needs to be expos

Re: Distribute DataSet to subset of nodes

2015-09-21 Thread Fabian Hueske
The custom partitioner does not know its task id but the mapper that assigns the partition ids knows its subtaskid. So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 will be send over the network. On Sep 21, 2015 6:56 PM, "Stefan Bunk" wrote: > Hi Fabian, > > that sounds g

Re: Distribute DataSet to subset of nodes

2015-09-21 Thread Stefan Bunk
Of course! On 21 September 2015 at 19:10, Fabian Hueske wrote: > The custom partitioner does not know its task id but the mapper that > assigns the partition ids knows its subtaskid. > > So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 > will be send over the network. > O

Re: Distribute DataSet to subset of nodes

2015-10-19 Thread Stefan Bunk
Hi Fabian, I implemented your approach from above. However, the runtime decides to run two subtasks on the same node, resulting in an out of memory error. I thought partitioning really does partition the data to nodes, but now it seems like its partitioning to tasks, and tasks can be one the same