Hi Stephan,
I have got 2 node configuration. The first node is a master and a worker, the 
second node is a worker. File path, Flink (Stratosphere) version and operation 
system is the same on both nodes.
My test program is in the attachment (simple modification of "Word count" 
example).  
The execution plan looks like this:
{
    "nodes": [

    {
        "id": 4,
        "type": "source",
        "pact": "Data Source",
        "contents": "TextInputFormat 
(file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "3..6 GB" },
            { "name": "Est. Cardinality", "value": "98.43 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "3..6 GB" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 3,
        "type": "pact",
        "pact": "FlatMap",
        "contents": 
"eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 2,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 3, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 1,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 2, "ship_strategy": "Hash Partition on [0]", 
"local_strategy": "Sort (combining) on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 0,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 1, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 8,
        "type": "pact",
        "pact": "FlatMap",
        "contents": 
"eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 7,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 8, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 6,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 7, "ship_strategy": "Hash Partition on [0]", 
"local_strategy": "Sort (combining) on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 5,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 6, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    }
    ]
}

-----Ursprüngliche Nachricht-----
Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von 
Stephan Ewen
Gesendet: Sonntag, 29. Juni 2014 16:33
An: dev@flink.incubator.apache.org
Betreff: Re: Cluster execution of an example program ("Word count") and a 
problem related to the modificated example

Hi Krzysztof!

Indeed, the input size should not matter. Can you tell us the details of the 
setup that worked?

The built-in examples work without distributed file system, because they do not 
depend on files. The example programs set a Java Collection as the input, which 
gets distributed as part of the program.

Stephan

Reply via email to