Hi all, I am new to Flink/Stratosphere so I would like to welcome everyone. I was trying to configure a simple 2-node cluster (Ubuntu 12.04) and I came across following problems (using Stratosphere 0.5):
1) I execute a "Word count" example and after the execution I get the result split into 2 files (each one on the different node). How can I modify this program to get the whole result as a one merged file on the master node? 2) I have wrote my own program basing on a "Word count" program's structure. There aren't any problems with the execution in a local mode for any size of the file. For small files it works fine on the cluster too. Unfortunately I get following error for bigger files (2000 lines): 06/28/2014 18:15:20: Job execution switched to status SCHEDULED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to SCHEDULED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to SCHEDULED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to ASSIGNED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to ASSIGNED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to READY 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to READY 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to STARTING 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to STARTING 06/28/2014 18:15:20: Job execution switched to status RUNNING 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to RUNNING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to READY 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to STARTING 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to RUNNING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to RUNNING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELING 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to CANCELING 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to CANCELED 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELING 06/28/2014 18:15:20: Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to CANCELING 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to CANCELED 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELING 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to CANCELING 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to CANCELED 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELING 06/28/2014 18:15:20: Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELED 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to CANCELING 06/28/2014 18:15:20: DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to CANCELED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to CANCELING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to CANCELING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELING 06/28/2014 18:15:20: CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELED 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to FAILED java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44) at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351) at java.lang.Thread.run(Thread.java:701) Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:140) at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44) at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135) at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756) 06/28/2014 18:15:20: DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to CANCELED 06/28/2014 18:15:20: Job execution switched to status FAILED Error: The program execution failed: java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44) at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351) at java.lang.Thread.run(Thread.java:701) Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:140) at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44) at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135) at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756) eu.stratosphere.client.program.ProgramInvocationException: The program execution failed: java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441) at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44) at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351) at java.lang.Thread.run(Thread.java:701) Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:140) at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44) at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135) at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756) at eu.stratosphere.client.program.Client.run(Client.java:297) at eu.stratosphere.client.program.Client.run(Client.java:263) at eu.stratosphere.client.program.Client.run(Client.java:257) at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50) at eu.stratosphere.example.java.wordcount.WordCount.main(WordCount.java:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383) at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301) at eu.stratosphere.client.program.Client.run(Client.java:215) at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327) at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927) at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) Cluster configuration: Slaves: 192.168.11.216 192.168.11.202 Job Manager: jobmanager.rpc.address: 192.168.11.202 I would be very grateful for any help. Best regards, Krzysztof PasierbiĆski