This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new df59aa20a8 [SYSTEMDS-3290] Parallelism-aware DAG linearization / op
scheduling
df59aa20a8 is described below
commit df59aa20a8344dcab9deac4cfecfd79105b4120d
Author: sweetpellegrino <[email protected]>
AuthorDate: Sat Mar 16 20:09:11 2024 +0100
[SYSTEMDS-3290] Parallelism-aware DAG linearization / op scheduling
Closes #1982.
---
src/main/java/org/apache/sysds/lops/Lop.java | 14 ++
.../lops/compile/linearization/ILinearize.java | 6 +-
.../linearization/PipelineAwareLinearize.java | 229 +++++++++++++++++++++
.../linearization/DagLinearizationTest.java | 2 +-
.../functions/linearization/ILinearizeTest.java | 164 +++++++++++++++
.../SystemDS-config-pipeline-depth-first.xml | 22 ++
6 files changed, 434 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/sysds/lops/Lop.java
b/src/main/java/org/apache/sysds/lops/Lop.java
index b7ae1ffe78..5f25000cc9 100644
--- a/src/main/java/org/apache/sysds/lops/Lop.java
+++ b/src/main/java/org/apache/sysds/lops/Lop.java
@@ -157,6 +157,12 @@ public abstract class Lop
*/
protected boolean _asynchronous = false;
+ /**
+ * Refers to the pipeline to which this lop belongs to.
+ * This is used for identifying parallel execution of lops.
+ */
+ protected int _pipelineID = -1;
+
/**
* Estimated size for the output produced by this Lop in bytes.
*/
@@ -418,6 +424,14 @@ public abstract class Lop
return _asynchronous;
}
+ public void setPipelineID(int id) {
+ _pipelineID = id;
+ }
+
+ public int getPipelineID() {
+ return _pipelineID;
+ }
+
public void setMemoryEstimates(double outMem, double totMem, double
interMem, double bcMem) {
_outputMemEstimate = outMem;
_memEstimate = totMem;
diff --git
a/src/main/java/org/apache/sysds/lops/compile/linearization/ILinearize.java
b/src/main/java/org/apache/sysds/lops/compile/linearization/ILinearize.java
index 0cfe80b683..288017bf1b 100644
--- a/src/main/java/org/apache/sysds/lops/compile/linearization/ILinearize.java
+++ b/src/main/java/org/apache/sysds/lops/compile/linearization/ILinearize.java
@@ -66,7 +66,7 @@ public class ILinearize {
public static Log LOG = LogFactory.getLog(ILinearize.class.getName());
public enum DagLinearization {
- DEPTH_FIRST, BREADTH_FIRST, MIN_INTERMEDIATE, MAX_PARALLELIZE,
AUTO
+ DEPTH_FIRST, BREADTH_FIRST, MIN_INTERMEDIATE, MAX_PARALLELIZE,
AUTO, PIPELINE_DEPTH_FIRST;
}
public static List<Lop> linearize(List<Lop> v) {
@@ -82,6 +82,8 @@ public class ILinearize {
return doMinIntermediateSort(v);
case BREADTH_FIRST:
return doBreadthFirstSort(v);
+ case PIPELINE_DEPTH_FIRST:
+ return
PipelineAwareLinearize.pipelineDepthFirst(v);
case DEPTH_FIRST:
default:
return depthFirst(v);
@@ -101,7 +103,7 @@ public class ILinearize {
* @param v List of lops to sort
* @return Sorted list of lops
*/
- private static List<Lop> depthFirst(List<Lop> v) {
+ protected static List<Lop> depthFirst(List<Lop> v) {
// partition nodes into leaf/inner nodes and dag root nodes,
// + sort leaf/inner nodes by ID to force depth-first scheduling
// + append root nodes in order of their original definition
diff --git
a/src/main/java/org/apache/sysds/lops/compile/linearization/PipelineAwareLinearize.java
b/src/main/java/org/apache/sysds/lops/compile/linearization/PipelineAwareLinearize.java
new file mode 100644
index 0000000000..dcc476a688
--- /dev/null
+++
b/src/main/java/org/apache/sysds/lops/compile/linearization/PipelineAwareLinearize.java
@@ -0,0 +1,229 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.sysds.lops.compile.linearization;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.lops.OperatorOrderingUtils;
+
+public class PipelineAwareLinearize {
+
+ // Minimum number of nodes in DAG for applying algorithm
+ private final static int IGNORE_LIMIT = 0;
+
+ // Relevant parameter for Step 3:
+ // (Force) merge for pipelines size of [0, ..., HARD_LIMIT]
+ private final static int HARD_LIMIT = 4;
+ // Merges two pipelines if p1.size() + p2.size() < UPPER_BOUND
+ private final static int UPPER_BOUND = 10;
+
+ /**
+ * Sort lops depth-first while assigning the nodes to pipelines
+ *
+ * @param v List of lops to sort
+ * @return Sorted list of lops with set _pipelineID on the Lop Object
+ */
+ public static List<Lop> pipelineDepthFirst(List<Lop> v) {
+
+ // If size of DAG is smaller than IGNORE_LIMIT, give all nodes
the same pipeline id
+ if(v.size() <= IGNORE_LIMIT) {
+ v.forEach(l -> l.setPipelineID(1));
+ return ILinearize.depthFirst(v);
+ }
+
+ // Find all root nodes (starting points for the depth-first
traversal)
+ List<Lop> roots = v.stream()
+ .filter(OperatorOrderingUtils::isLopRoot)
+ .collect(Collectors.toList());
+
+ // Initialize necessary data objects
+ Integer pipelineId = 0;
+ // Stores a resulting depth first sorted list of lops (same as
in depthFirst())
+ // Returned by this function
+ ArrayList<Lop> opList = new ArrayList<>();
+ // Stores the pipeline ids and the corresponding lops
+ // for further refinement of pipeline assignements
+ Map<Integer, List<Lop>> pipelineMap = new HashMap<>();
+
+ // Step 1: Depth-first assignment of pipeline ids to the roots
+ for (Lop r : roots) {
+ pipelineId = depthFirst(r, pipelineId, opList,
pipelineMap) + 1;
+ }
+ //DEVPrintDAG.asGraphviz("Step1", v);
+
+ // Step 2: Merge pipelines with only one node to another
(connected) pipeline
+ PipelineAwareLinearize.mergeSingleNodePipelines(pipelineMap);
+ //DEVPrintDAG.asGraphviz("Step2", v);
+
+ // Step 3: Merge small pipelines into bigger ones
+ PipelineAwareLinearize.mergeSmallPipelines(pipelineMap);
+ //DEVPrintDAG.asGraphviz("Step3", v);
+
+ // Reset the visited status of all nodes
+ roots.forEach(Lop::resetVisitStatus);
+
+ return opList;
+ }
+
+ // Step 1: Depth-first assignment of pipeline ids to the roots
+ // Finds the branching out/in of Lops, that could be parallized
+ // (and with it assiging of different pipeline ids)
+ private static int depthFirst(Lop root, int pipelineId, List<Lop>
opList, Map<Integer, List<Lop>> pipelineMap) {
+
+ // Abort if the node was already visited
+ if (root.isVisited()) {
+ return root.getPipelineID();
+ }
+
+ // Assign pipeline id to the node, given by the parent
+ // Set the root node as visited
+ root.setPipelineID(pipelineId);
+ root.setVisited();
+
+ // Add the root node to the pipeline list
+ if(pipelineMap.containsKey(pipelineId)) {
+ pipelineMap.get(pipelineId).add(root);
+ } else {
+ ArrayList<Lop> lopList = new ArrayList<>();
+ lopList.add(root);
+ pipelineMap.put(pipelineId, lopList);
+ }
+
+ // Children as inputs, as we are traversing the lops bottom up
+ List<Lop> children = root.getInputs();
+ // If root node has only one child, use the same pipeline id as
root node
+ if (children.size() == 1) {
+ Lop child = children.get(0);
+ // We need to find the max pipeline id of the child,
because the child could branch out
+ pipelineId = Math.max(pipelineId, depthFirst(child,
pipelineId, opList, pipelineMap));
+ } else {
+ // Iteration over all children
+ for (int i = 0; i < children.size(); i++) {
+ Lop child = children.get(i);
+
+ // If the child has only one output, or all
outputs are the root node, use the same pipeline id as parent
+ if(child.getOutputs().size() == 1 ||
+ (child.getOutputs().size() > 1 &&
child.getOutputs().stream().allMatch(o -> o == root))) {
+ // No need for max, because the child
can only have one output
+ depthFirst(child, root.getPipelineID(),
opList, pipelineMap);
+ } else {
+ // We need to find the max pipeline id
of the child, because the child could branch out
+ pipelineId = Math.max(pipelineId,
depthFirst(child, pipelineId + 1, opList, pipelineMap));
+ }
+ }
+ }
+
+ opList.add(root);
+ return pipelineId;
+ }
+
+ // Step 2: Merge pipelines with only one node to another (connected)
pipeline
+ // Return map by reference
+ private static void mergeSingleNodePipelines(Map<Integer, List<Lop>>
map) {
+
+ Map<Integer, List<Lop>> pipelinesWithOneNode =
map.entrySet().stream()
+ .filter(e -> e.getValue().size() == 1)
+ .collect(Collectors.toMap( e-> e.getKey(), e ->
e.getValue()));
+
+ if(pipelinesWithOneNode.size() == 0)
+ return;
+
+ pipelinesWithOneNode.entrySet().stream().forEach(e -> {
+ Lop lop = e.getValue().get(0);
+
+ // Merge to an existing output node
+ if (lop.getOutputs().size() > 0) {
+
lop.setPipelineID(lop.getOutputs().get(0).getPipelineID());
+ // If no outputs are present, merge to an existing
input node
+ } else if (lop.getInputs().size() > 0) {
+
lop.setPipelineID(lop.getInputs().get(0).getPipelineID());
+ }
+ // else (no inputs and no outputs): do nothing
(unreachable node?)
+ // Remove the pipeline from the list of pipelines
+ if (lop.getOutputs().size() > 0 ||
lop.getInputs().size() > 0) {
+ map.get(lop.getPipelineID()).add(lop);
+ map.remove(e.getKey());
+ }
+ });
+ }
+
+ // Step 3: Merge small pipelines into bigger ones
+ // Heuristic: Merge the smallest pipeline with the second smallest
pipeline
+ // We don't care about whether the pipelines are connected or not
+ // This reduces the overhead as we avoid calculating the entire
combinatorial problem space
+ // for finding an optimal solution.
+ // An optimal solution could be defined as a solution that reduces
unnecessary overhead from
+ // too small pipelines (if executed in parallel, e.g., in a separate
thread)
+ // and still find a maximum number of pipelines (for maximal
parallelization)
+
+ // A proposed way to achieve a balance between avoiding too small
pipelines and maximizing the number of pipelines:
+ // HARD_LIMIT: If the size of a pipeline is smaller than HARD_LIMIT, it
will be merged with the next smallest pipeline.
+ // UPPER_BOUND: If the combined size of the two smallest pipelines is
less than UPPER_BOUND, merge them.
+ // Return map by reference
+ private static void mergeSmallPipelines(Map<Integer, List<Lop>> map) {
+
+ // Needs to have atleast two pipelines
+ if(map.size() < 2)
+ return;
+
+ // Sort the pipelines by size
+ List<Map.Entry<Integer, Integer>> sortedPipelineSizes =
getPipelinesSortedBySize(map);
+
+ Map.Entry<Integer, Integer> sm0 = sortedPipelineSizes.get(0);
+ Map.Entry<Integer, Integer> sm1 = sortedPipelineSizes.get(1);
+
+ while((sm0 != null && sm1 != null) &&
+ (sm0.getValue() < HARD_LIMIT ||
+ sm0.getValue() + sm1.getValue() < UPPER_BOUND)
+ ) {
+
+ // Merge pipelines as they satifiy the conditions
+ int mergeIntoId = sm1.getKey();
+ map.get(sm0.getKey()).forEach(l ->
l.setPipelineID(mergeIntoId));
+ map.get(mergeIntoId).addAll(map.get(sm0.getKey()));
+ map.remove(sm0.getKey());
+
+ //Get new list of sizes from updated map!
+ sortedPipelineSizes = getPipelinesSortedBySize(map);
+
+ //Update sm0 and sm1, if possible
+ if(sortedPipelineSizes.size() < 2) {
+ sm0 = null;
+ sm1 = null;
+ }
+ else {
+ sm0 = sortedPipelineSizes.get(0);
+ sm1 = sortedPipelineSizes.get(1);
+ }
+ }
+ }
+
+ private static List<Map.Entry<Integer, Integer>>
getPipelinesSortedBySize(Map<Integer, List<Lop>> map) {
+ return map.entrySet().stream()
+
.sorted(Map.Entry.comparingByValue(Comparator.comparingInt(List::size)))
+ .map(e -> Map.entry(e.getKey(),
e.getValue().size()))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/linearization/DagLinearizationTest.java
b/src/test/java/org/apache/sysds/test/functions/linearization/DagLinearizationTest.java
index 843faa36b9..305992bbb6 100644
---
a/src/test/java/org/apache/sysds/test/functions/linearization/DagLinearizationTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/linearization/DagLinearizationTest.java
@@ -39,7 +39,7 @@ public class DagLinearizationTest extends AutomatedTestBase {
private final String testNames[] = {"matrixmult_dag_linearization",
"csplineCG_dag_linearization",
"linear_regression_dag_linearization"};
- private final String testConfigs[] = {"breadth-first", "depth-first",
"min-intermediate", "max-parallelize"};
+ private final String testConfigs[] = {"breadth-first", "depth-first",
"min-intermediate", "max-parallelize", "pipeline-depth-first"};
private final String testDir = "functions/linearization/";
diff --git
a/src/test/java/org/apache/sysds/test/functions/linearization/ILinearizeTest.java
b/src/test/java/org/apache/sysds/test/functions/linearization/ILinearizeTest.java
new file mode 100644
index 0000000000..e6680c29c2
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/linearization/ILinearizeTest.java
@@ -0,0 +1,164 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.sysds.test.functions.linearization;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Arrays;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+//additional imports
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.lops.compile.linearization.ILinearize;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.lops.BinaryScalar;
+import org.apache.sysds.lops.Data;
+
+public class ILinearizeTest extends AutomatedTestBase {
+
+ private final String testDir = "functions/linearization/";
+ private final String configFile =
"SystemDS-config-pipeline-depth-first.xml";
+
+ private String getConfPath() {
+ return SCRIPT_DIR + "/" + testDir + configFile;
+ }
+
+ @Override
+ public void setUp() {
+ setOutputBuffering(true);
+ disableConfigFile = true;
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration("test", new TestConfiguration(testDir,
"test"));
+ }
+
+ @Test
+ public void testLinearize_Pipeline() {
+
+ try {
+ DMLConfig dmlconf =
DMLConfig.readConfigurationFile(getConfPath());
+ ConfigurationManager.setGlobalConfig(dmlconf);
+
System.out.print(ConfigurationManager.getLinearizationOrder());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // Set up Test example
+ // For testing we don't build a runnable programm
+ // We only want to know, whether the assignement of pipeline
ids work as intended
+ // n1 n2
+ // \ / \
+ // o1 n3 n4
+ // \ |
+ // | n5
+ // r1 o2
+ // \ / \
+ // r2 |
+ // | |
+ // r3 |
+ // | |
+ // r4 |
+ // | \ /
+ // r5 y1
+ // / \
+ // y2 y5
+ // | |
+ // y3 |
+ // \ /
+ // y6
+
+ List<Lop> lops = new ArrayList<>();
+ // Dummy inputs for filling the inputs of the lops
+ // Needed for the constructors
+ Lop d1 = Data.createLiteralLop(ValueType.INT32, "1");
+ Lop d2 = Data.createLiteralLop(ValueType.INT32, "1");
+
+ // Start with creating the leafs, as the constructors await
inputs
+ // Disconnected graph
+ Lop n1 = new BinaryScalar(d1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop n2 = new BinaryScalar(d1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop n3 = new BinaryScalar(n1, n2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop n4 = new BinaryScalar(n2, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop n5 = new BinaryScalar(n4, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ lops.add(n1);
+ lops.add(n2);
+ lops.add(n3);
+ lops.add(n4);
+ lops.add(n5);
+
+ // First pipeline (after step 1)
+ Lop o1 = new BinaryScalar(d1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop o2 = new BinaryScalar(o1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ // Second pipeline (after step 1)
+ Lop r1 = new BinaryScalar(d1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop r2 = new BinaryScalar(r1, o2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop r3 = new BinaryScalar(r2, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop r4 = new BinaryScalar(r3, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop r5 = new BinaryScalar(r4, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ // Third pipeline (after step 1)
+ Lop y1 = new BinaryScalar(o2, r4, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop y2 = new BinaryScalar(y1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop y3 = new BinaryScalar(y2, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop y4 = new BinaryScalar(y1, d2, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+ Lop y5 = new BinaryScalar(y3, y4, OpOp2.PLUS, DataType.SCALAR,
ValueType.INT32);
+
+ // add all lops to the list
+ lops.add(o1);
+ lops.add(o2);
+ lops.add(r1);
+ lops.add(r2);
+ lops.add(r3);
+ lops.add(r4);
+ lops.add(r5);
+ lops.add(y1);
+ lops.add(y2);
+ lops.add(y3);
+ lops.add(y4);
+ lops.add(y5);
+
+ // Remove dummy inputs
+ lops.forEach(l -> {l.getInputs().remove(d1);
l.getInputs().remove(d2);});
+
+ // RUN LINEARIZATION
+ ILinearize.linearize(lops);
+
+ // Set up expected pipelines
+ Map<Integer, List<Lop>> pipelineMap = new HashMap<>();
+ pipelineMap.put(4, Arrays.asList(n1, n2, n3, n4, n5, o1, o2));
+ pipelineMap.put(3, Arrays.asList(r1, r2, r3, r4, r5));
+ pipelineMap.put(5, Arrays.asList(y1, y2, y3, y4, y5));
+
+ // Check if all lops are in the correct pipeline
+ pipelineMap.get(4).forEach(l -> {if (l.getPipelineID() != 4)
fail("Pipeline ID not set correctly");});
+ pipelineMap.get(3).forEach(l -> {if (l.getPipelineID() != 3)
fail("Pipeline ID not set correctly");});
+ pipelineMap.get(5).forEach(l -> {if (l.getPipelineID() != 5)
fail("Pipeline ID not set correctly");});
+ }
+}
diff --git
a/src/test/scripts/functions/linearization/SystemDS-config-pipeline-depth-first.xml
b/src/test/scripts/functions/linearization/SystemDS-config-pipeline-depth-first.xml
new file mode 100644
index 0000000000..84d510569b
--- /dev/null
+++
b/src/test/scripts/functions/linearization/SystemDS-config-pipeline-depth-first.xml
@@ -0,0 +1,22 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+
<sysds.compile.linearization>pipeline_depth_first</sysds.compile.linearization>
+</root>