Hey Krzysztof, thanks for posting your questions. Replies are inline.
On 28 Jun 2014, at 18:59, Krzysztof Pasierbinski <krzysztof.pasierbin...@dfki.de> wrote: > Hi all, > I am new to Flink/Stratosphere so I would like to welcome everyone. I was > trying to configure a simple 2-node cluster (Ubuntu 12.04) and I came across > following problems (using Stratosphere 0.5): > > 1) I execute a "Word count" example and after the execution I get the > result split into 2 files (each one on the different node). How can I modify > this program to get the whole result as a one merged file on the master node? You can add an ungrouped group reduce, which would create a single file. input .flatMap(...) .groupBy(0) .sum(1) .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { while (values.hasNext()) { out.collect(values.next()); } } }); Because the reducers work on separate groups of data and run on different machines you get a file for each reducer. Along the same lines you can also directly use a GroupReduceFunction for the sum and save the extra reduce. But keep in mind that the multiple files should not be a problem in practice (since further processing steps would just have the output directory as input) and the group reduce does not scale to large data sets (since a single reducer on a single machine does all the processing). > > 2) I have wrote my own program basing on a "Word count" program's > structure. There aren't any problems with the execution in a local mode for > any size of the file. For small files it works fine on the cluster too. > Unfortunately I get following error for bigger files (2000 lines): > > 06/28/2014 18:15:20: DataSource(TextInputFormat > (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) > switched to FAILED > java.io.IOException: Error opening the Input Split > file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: > /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or > directory) > at > eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616) > at > eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441) > at > eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44) > at > eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140) > at > eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351) > at java.lang.Thread.run(Thread.java:701) > Caused by: java.io.FileNotFoundException: > /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or > directory) > at java.io.FileInputStream.open(Native Method) > at java.io.FileInputStream.<init>(FileInputStream.java:140) > at > eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44) > at > eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135) > at > eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756) With this error message I'm not sure at the moment, if it is a problem with the InputSplit or if the file really does not exist. So just to make sure: do both machines have the same DFS mounted? If you want you can also provide me with the program and file, so I can check if it is a problem with your setup or not. Best, Ufuk