[
https://issues.apache.org/jira/browse/FLINK-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Markus Holzemer updated FLINK-1018:
-----------------------------------
Attachment: LogisticRegression.java
> Logistic Regression deadlocks
> -----------------------------
>
> Key: FLINK-1018
> URL: https://issues.apache.org/jira/browse/FLINK-1018
> Project: Flink
> Issue Type: Bug
> Reporter: Markus Holzemer
> Attachments: LogisticRegression.java
>
>
> We are currently running our implementation of logistic regression with batch
> gradient descent on the cluster.
> Unfortunatelly for datasets > 1GB it seems to deadlock inside of the
> iteration. This means the first iteration is never finished.
> The iteration does a map over all points, the map gets the iteration input as
> broadcast variable. The result of the map is reduced and the result of the
> reducer (1 tuple) is crossed with the iteration input.
> There should be no reason for the deadlock, since the data is still quite
> small compared to the cluster size (4 nodes a 32GB). Also the datasize stays
> constant throughout the algorithm.
> Here is the generated plan. I will also attach the full algorithm.
> {code}
> {
> "nodes": [
> {
> "id": 2,
> "type": "source",
> "pact": "Data Source",
> "contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "0.0 B" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "step_function": [
> {
> "id": 7,
> "type": "source",
> "pact": "Data Source",
> "contents": "TextInputFormat (D:/Devel/HIGGS-0.0001.csv) -
> UTF-8",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "83.27 MB" },
> { "name": "Est. Cardinality", "value": "113.9. K" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "83.27 MB" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "83.27 MB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 6,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 7, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "113.9. K" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "83.27 MB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 9,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 6, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "113.9. K" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "41.63 MB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 8,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "predecessors": [
> {"id": 9, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "41.63 MB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 10,
> "type": "pact",
> "pact": "Bulk Partial Solution",
> "contents": "Partial Solution",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "0.0 B" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 5,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 6, "side": "first", "ship_strategy": "Forward",
> "temp_mode": "CACHED"},
> {"id": 8, "side": "second", "ship_strategy":
> "Broadcast"},
> {"id": 10, "side": "second", "ship_strategy":
> "Broadcast"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "113.9. K" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 4,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "predecessors": [
> {"id": 5, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 3,
> "type": "pact",
> "pact": "Cross",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 4, "side": "first", "ship_strategy": "Forward"},
> {"id": 10, "side": "second", "ship_strategy":
> "Broadcast", "temp_mode": "PIPELINE_BREAKER"}
> ],
> "driver_strategy": "Nested Loops (Blocked Outer:
> de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> }
> ],
> "partial_solution": 10,
> "next_partial_solution": 3,
> "id": 1,
> "type": "bulk_iteration",
> "pact": "Bulk Iteration",
> "contents": "Bulk Iteration",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 2, "ship_strategy": "Forward"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 0,
> "type": "sink",
> "pact": "Data Sink",
> "contents": "TextOutputFormat (D:/Devel/theta) - UTF-8",
> "parallelism": "2",
> "subtasks_per_instance": "2",
> "predecessors": [
> {"id": 1, "ship_strategy": "Forward"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> }
> ]
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)