Hey Krzysztof,

thanks for posting your questions. Replies are inline.

On 28 Jun 2014, at 18:59, Krzysztof Pasierbinski 
<krzysztof.pasierbin...@dfki.de> wrote:

> Hi all,
> I am new to Flink/Stratosphere so I would like to welcome everyone. I was 
> trying to configure a simple 2-node cluster (Ubuntu 12.04) and I came across 
> following problems (using Stratosphere 0.5):
> 
> 1)      I execute a "Word count" example and after the execution I get the 
> result split into 2 files (each one on the different node). How can I modify 
> this program to get the whole result as a one merged file on the master node?

You can add an ungrouped group reduce, which would create a single file.

input
.flatMap(...)
.groupBy(0)
.sum(1)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>>() {
                                        @Override
                                        public void 
reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, 
Integer>> out) throws Exception {
                                                while (values.hasNext()) {
                                                        
out.collect(values.next());
                                                }
                                        }
                                });


Because the reducers work on separate groups of data and run on different 
machines you get a file for each reducer. Along the same lines you can also 
directly use a GroupReduceFunction for the sum and save the extra reduce. But 
keep in mind that the multiple files should not be a problem in practice (since 
further processing steps would just have the output directory as input) and the 
group reduce does not scale to large data sets (since a single reducer on a 
single machine does all the processing).

> 
> 2)      I have wrote my own program basing on a "Word count" program's 
> structure. There aren't any problems with the execution in a local mode for 
> any size of the file. For small files it works fine on the cluster too. 
> Unfortunately I get following error for bigger files (2000 lines):
> 
> 06/28/2014 18:15:20:    DataSource(TextInputFormat 
> (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) 
> switched to FAILED
> java.io.IOException: Error opening the Input Split 
> file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: 
> /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or 
> directory)
>    at 
> eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
>    at 
> eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
>    at 
> eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
>    at 
> eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
>    at 
> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
>    at java.lang.Thread.run(Thread.java:701)
> Caused by: java.io.FileNotFoundException: 
> /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or 
> directory)
>    at java.io.FileInputStream.open(Native Method)
>    at java.io.FileInputStream.<init>(FileInputStream.java:140)
>    at 
> eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
>    at 
> eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
>    at 
> eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

With this error message I'm not sure at the moment, if it is a problem with the 
InputSplit or if the file really does not exist. So just to make sure: do both 
machines have the same DFS mounted? If you want you can also provide me with 
the program and file, so I can check if it is a problem with your setup or not.

Best,

Ufuk

Reply via email to