Hey Krzysztof! Everything looks standard there.
Let me ask those questions, to make sure I get the discussion right: - You are running a two node setup. Has one node the master and a worker, the other one has a worker only? Or do you have a dedicated master node? - Are the example on small data and on large data strictly the same, except for differently sized input files? Most importantly: - It the input file is available on the workers, is it available under the path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt" ? My guess right now is still that there the workers do not see the file properly. Greetings, Stephan On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski < krzysztof.pasierbin...@dfki.de> wrote: > Hi Stephan, > I have got 2 node configuration. The first node is a master and a worker, > the second node is a worker. File path, Flink (Stratosphere) version and > operation system is the same on both nodes. > My test program is in the attachment (simple modification of "Word count" > example). > The execution plan looks like this: > { > "nodes": [ > > { > "id": 4, > "type": "source", > "pact": "Data Source", > "contents": "TextInputFormat > (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8", > "parallelism": "2", > "subtasks_per_instance": "1", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "3..6 GB" }, > { "name": "Est. Cardinality", "value": "98.43 M" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "3..6 GB" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "3..6 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 3, > "type": "pact", > "pact": "FlatMap", > "contents": > "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 4, "ship_strategy": "Forward"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "492.19 M" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 2, > "type": "pact", > "pact": "GroupReduce", > "contents": "SUM(2),MAX(1)", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 3, "ship_strategy": "Forward"} > ], > "driver_strategy": "Sorted Combine", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "492.19 M" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 1, > "type": "pact", > "pact": "GroupReduce", > "contents": "SUM(2),MAX(1)", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 2, "ship_strategy": "Hash Partition on [0]", > "local_strategy": "Sort (combining) on [0:ASC]"} > ], > "driver_strategy": "Sorted Group Reduce", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 0, > "type": "sink", > "pact": "Data Sink", > "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 1, "ship_strategy": "Forward"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 8, > "type": "pact", > "pact": "FlatMap", > "contents": > "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 4, "ship_strategy": "Forward"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "492.19 M" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 7, > "type": "pact", > "pact": "GroupReduce", > "contents": "SUM(2),MIN(1)", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 8, "ship_strategy": "Forward"} > ], > "driver_strategy": "Sorted Combine", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "492.19 M" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 6, > "type": "pact", > "pact": "GroupReduce", > "contents": "SUM(2),MIN(1)", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 7, "ship_strategy": "Hash Partition on [0]", > "local_strategy": "Sort (combining) on [0:ASC]"} > ], > "driver_strategy": "Sorted Group Reduce", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 5, > "type": "sink", > "pact": "Data Sink", > "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484", > "parallelism": "2", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 6, "ship_strategy": "Forward"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" > }, > { "name": "Filter Factor", "value": "(none)" } ] > } > ] > } > > -----Ursprüngliche Nachricht----- > Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von > Stephan Ewen > Gesendet: Sonntag, 29. Juni 2014 16:33 > An: dev@flink.incubator.apache.org > Betreff: Re: Cluster execution of an example program ("Word count") and a > problem related to the modificated example > > Hi Krzysztof! > > Indeed, the input size should not matter. Can you tell us the details of > the setup that worked? > > The built-in examples work without distributed file system, because they > do not depend on files. The example programs set a Java Collection as the > input, which gets distributed as part of the program. > > Stephan >