Hi Stephan, I have got 2 node configuration. The first node is a master and a worker, the second node is a worker. File path, Flink (Stratosphere) version and operation system is the same on both nodes. My test program is in the attachment (simple modification of "Word count" example). The execution plan looks like this: { "nodes": [
{ "id": 4, "type": "source", "pact": "Data Source", "contents": "TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8", "parallelism": "2", "subtasks_per_instance": "1", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "3..6 GB" }, { "name": "Est. Cardinality", "value": "98.43 M" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "3..6 GB" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "3..6 GB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "FlatMap", "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 4, "ship_strategy": "Forward"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "492.19 M" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 2, "type": "pact", "pact": "GroupReduce", "contents": "SUM(2),MAX(1)", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 3, "ship_strategy": "Forward"} ], "driver_strategy": "Sorted Combine", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "492.19 M" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 1, "type": "pact", "pact": "GroupReduce", "contents": "SUM(2),MAX(1)", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"} ], "driver_strategy": "Sorted Group Reduce", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 0, "type": "sink", "pact": "Data Sink", "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 1, "ship_strategy": "Forward"} ], "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 8, "type": "pact", "pact": "FlatMap", "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 4, "ship_strategy": "Forward"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "492.19 M" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 7, "type": "pact", "pact": "GroupReduce", "contents": "SUM(2),MIN(1)", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 8, "ship_strategy": "Forward"} ], "driver_strategy": "Sorted Combine", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "492.19 M" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "1..82 GB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 6, "type": "pact", "pact": "GroupReduce", "contents": "SUM(2),MIN(1)", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"} ], "driver_strategy": "Sorted Group Reduce", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 5, "type": "sink", "pact": "Data Sink", "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484", "parallelism": "2", "subtasks_per_instance": "1", "predecessors": [ {"id": 6, "ship_strategy": "Forward"} ], "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ] } -----Ursprüngliche Nachricht----- Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von Stephan Ewen Gesendet: Sonntag, 29. Juni 2014 16:33 An: dev@flink.incubator.apache.org Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example Hi Krzysztof! Indeed, the input size should not matter. Can you tell us the details of the setup that worked? The built-in examples work without distributed file system, because they do not depend on files. The example programs set a Java Collection as the input, which gets distributed as part of the program. Stephan