Hi Stephan,
I have got 2 node configuration. The first node is a master and a worker, the
second node is a worker. File path, Flink (Stratosphere) version and operation
system is the same on both nodes.
My test program is in the attachment (simple modification of "Word count"
example).
The execution plan looks like this:
{
"nodes": [
{
"id": 4,
"type": "source",
"pact": "Data Source",
"contents": "TextInputFormat
(file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
"parallelism": "2",
"subtasks_per_instance": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "3..6 GB" },
{ "name": "Est. Cardinality", "value": "98.43 M" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "3..6 GB" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "3..6 GB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 3,
"type": "pact",
"pact": "FlatMap",
"contents":
"eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 4, "ship_strategy": "Forward"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "492.19 M" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "1..82 GB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 2,
"type": "pact",
"pact": "GroupReduce",
"contents": "SUM(2),MAX(1)",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 3, "ship_strategy": "Forward"}
],
"driver_strategy": "Sorted Combine",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "492.19 M" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "1..82 GB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 1,
"type": "pact",
"pact": "GroupReduce",
"contents": "SUM(2),MAX(1)",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 2, "ship_strategy": "Hash Partition on [0]",
"local_strategy": "Sort (combining) on [0:ASC]"}
],
"driver_strategy": "Sorted Group Reduce",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 0,
"type": "sink",
"pact": "Data Sink",
"contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 1, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 8,
"type": "pact",
"pact": "FlatMap",
"contents":
"eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 4, "ship_strategy": "Forward"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "492.19 M" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "1..82 GB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 7,
"type": "pact",
"pact": "GroupReduce",
"contents": "SUM(2),MIN(1)",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 8, "ship_strategy": "Forward"}
],
"driver_strategy": "Sorted Combine",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "492.19 M" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "1..82 GB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 6,
"type": "pact",
"pact": "GroupReduce",
"contents": "SUM(2),MIN(1)",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 7, "ship_strategy": "Hash Partition on [0]",
"local_strategy": "Sort (combining) on [0:ASC]"}
],
"driver_strategy": "Sorted Group Reduce",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 5,
"type": "sink",
"pact": "Data Sink",
"contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
"parallelism": "2",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 6, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
]
}
-----Ursprüngliche Nachricht-----
Von: [email protected] [mailto:[email protected]] Im Auftrag von
Stephan Ewen
Gesendet: Sonntag, 29. Juni 2014 16:33
An: [email protected]
Betreff: Re: Cluster execution of an example program ("Word count") and a
problem related to the modificated example
Hi Krzysztof!
Indeed, the input size should not matter. Can you tell us the details of the
setup that worked?
The built-in examples work without distributed file system, because they do not
depend on files. The example programs set a Java Collection as the input, which
gets distributed as part of the program.
Stephan