This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c86b5d5 [MINOR] Improved spark exec type selection for unary
aggregates
c86b5d5 is described below
commit c86b5d55896df7e429878f9debac6f95ef080284
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Jul 26 13:15:37 2021 +0200
[MINOR] Improved spark exec type selection for unary aggregates
This patch refines the execution type selection logic of unary
aggregates. By default, the memory estimates and memory budget drive the
decision of CP (in-memory) vs Spark (distributed, out-of-core). However,
a unary aggregate is largely reducing the data size - for that reason,
we pull unary aggregates whose inputs are created by spark instructions
in spark execution type as well, significantly reducing collected data
(i.e., transfered from spark executors to the spark driver).
For robustness we only did that if the unary aggregate is the only
parent, to avoid unnecessary redundant spark execution if we would then
anyway pull the intermediate into local memory. Instead of just looking
at the number of parents of the input, we now do a more informed
analysis where these parents are execution - if all others are in spark,
it's preferrable to pull this one single unary aggregate also into sark
execution mode.
---
src/main/java/org/apache/sysds/hops/AggUnaryOp.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
index 9c18f49..424f730 100644
--- a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
@@ -372,10 +372,13 @@ public class AggUnaryOp extends MultiThreadedHop
//spark-specific decision refinement (execute unary aggregate
w/ spark input and
//single parent also in spark because it's likely cheap and
reduces data transfer)
+ //we also allow multiple parents, if all other parents are
already in Spark mode
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
&& !(getInput().get(0) instanceof DataOp) //input is
not checkpoint
&& (getInput().get(0).getParent().size()==1 //uagg is
only parent, or
- || !requiresAggregation(getInput().get(0),
_direction)) //w/o agg
+ ||
getInput().get(0).getParent().stream().filter(h -> h != this)
+ .allMatch(h -> h.optFindExecType() ==
ExecType.SPARK)
+ || !requiresAggregation(getInput().get(0),
_direction)) //w/o agg
&& getInput().get(0).optFindExecType() ==
ExecType.SPARK )
{
//pull unary aggregate into spark