This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new db383202d9 [SYSTEMDS-3466] Asynchronous (Future-based) execution of
Spark instructions
db383202d9 is described below
commit db383202d9fb45ea4ba2fe0877eb1cad30503b12
Author: Arnab Phani <[email protected]>
AuthorDate: Wed Nov 16 18:32:10 2022 +0100
[SYSTEMDS-3466] Asynchronous (Future-based) execution of Spark instructions
This patch introduces a future-based asynchronous execution of Spark
actions.
We wrap the matrix block with a future, create a matrix object handle, and
maintain that in the symbol table. This extension allows triggering a chain
of Spark instructions asynchronously and seeking the results only when
needed.
TODO: Account the memory required for the future results, maintain lineage
of the broadcast variables to avoid premature removal.
Closes #1733
---
.../controlprogram/context/ExecutionContext.java | 11 +++
.../controlprogram/context/MatrixObjectFuture.java | 87 ++++++++++++++++++++++
.../spark/AggregateUnarySPInstruction.java | 76 ++++++++++++++++---
3 files changed, 164 insertions(+), 10 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index 7e3076c620..63eb3baa8d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -67,6 +67,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class ExecutionContext {
@@ -601,6 +602,16 @@ public class ExecutionContext {
mo.release();
}
+ public void setMatrixOutput(String varName, Future<MatrixBlock> fmb) {
+ if (isAutoCreateVars() && !containsVariable(varName)) {
+ MatrixObject fmo = new
MatrixObjectFuture(Types.ValueType.FP64,
+ OptimizerUtils.getUniqueTempFileName(), fmb);
+ }
+ MatrixObject mo = getMatrixObject(varName);
+ MatrixObjectFuture fmo = new MatrixObjectFuture(mo, fmb);
+ setVariable(varName, fmo);
+ }
+
public void setMatrixOutput(String varName, MatrixBlock outputData,
UpdateType flag) {
if( isAutoCreateVars() && !containsVariable(varName) )
setVariable(varName, createMatrixObject(outputData));
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
new file mode 100644
index 0000000000..3cbc7eff09
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.context;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+import java.util.concurrent.Future;
+
+public class MatrixObjectFuture extends MatrixObject
+{
+ protected Future<MatrixBlock> _futureData;
+
+ public MatrixObjectFuture(ValueType vt, String file,
Future<MatrixBlock> fmb) {
+ super(vt, file, null);
+ _futureData = fmb;
+ }
+
+ public MatrixObjectFuture(MatrixObject mo, Future<MatrixBlock> fmb) {
+ super(mo.getValueType(), mo.getFileName(), mo.getMetaData());
+ _futureData = fmb;
+ }
+
+ MatrixBlock getMatrixBlock() {
+ try {
+ return _futureData.get();
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ public MatrixBlock acquireRead() {
+ return acquireReadIntern();
+ }
+
+ private synchronized MatrixBlock acquireReadIntern() {
+ try {
+ if(!isAvailableToRead())
+ throw new DMLRuntimeException("MatrixObject not
available to read.");
+ if(_data != null)
+ throw new DMLRuntimeException("_data must be
null for future matrix object/block.");
+ acquire(false, false);
+ return _futureData.get();
+ }
+
+ catch(Exception e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ public void release() {
+ releaseIntern();
+ }
+
+ private synchronized void releaseIntern() {
+ _futureData = null;
+ }
+
+ public synchronized void clearData(long tid) {
+ _data = null;
+ _futureData = null;
+ clearCache();
+ setCacheLineage(null);
+ setDirty(false);
+ setEmpty();
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
index 38b032ca67..52bab3958f 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.CorrectionLocationType;
+import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -43,9 +44,15 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.util.CommonThreadPool;
import scala.Tuple2;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
public class AggregateUnarySPInstruction extends UnarySPInstruction {
private SparkAggType _aggtype = null;
private AggregateOperator _aop = null;
@@ -102,19 +109,36 @@ public class AggregateUnarySPInstruction extends
UnarySPInstruction {
//perform aggregation if necessary and put output into symbol
table
if( _aggtype == SparkAggType.SINGLE_BLOCK )
{
- if( auop.sparseSafe )
- out = out.filter(new
FilterNonEmptyBlocksFunction());
+ if (ConfigurationManager.isPrefetchEnabled()) {
+ //Trigger the chain of Spark operations and
maintain a future to the result
+ //TODO: Make memory for the future matrix block
+ try {
+
if(CommonThreadPool.triggerRemoteOPsPool == null)
+
CommonThreadPool.triggerRemoteOPsPool = Executors.newCachedThreadPool();
+ RDDAggregateTask task = new
RDDAggregateTask(_optr, _aop, in, mc);
+ Future<MatrixBlock> future_out =
CommonThreadPool.triggerRemoteOPsPool.submit(task);
+ sec.setMatrixOutput(output.getName(),
future_out);
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
- JavaRDD<MatrixBlock> out2 = out.map(
- new RDDUAggFunction2(auop,
mc.getBlocksize()));
- MatrixBlock out3 = RDDAggregateUtils.aggStable(out2,
aggop);
+ else {
+ if( auop.sparseSafe )
+ out = out.filter(new
FilterNonEmptyBlocksFunction());
- //drop correction after aggregation
- out3.dropLastRowsOrColumns(aggop.correction);
+ JavaRDD<MatrixBlock> out2 = out.map(
+ new RDDUAggFunction2(auop,
mc.getBlocksize()));
+ MatrixBlock out3 =
RDDAggregateUtils.aggStable(out2, aggop);
- //put output block into symbol table (no lineage
because single block)
- //this also includes implicit maintenance of matrix
characteristics
- sec.setMatrixOutput(output.getName(), out3);
+ //drop correction after aggregation
+ out3.dropLastRowsOrColumns(aggop.correction);
+
+ //put output block into symbol table (no
lineage because single block)
+ //this also includes implicit maintenance of
matrix characteristics
+ sec.setMatrixOutput(output.getName(), out3);
+ }
}
else //MULTI_BLOCK or NONE
{
@@ -337,4 +361,36 @@ public class AggregateUnarySPInstruction extends
UnarySPInstruction {
return out;
}
}
+
+ private static class RDDAggregateTask implements Callable<MatrixBlock>
+ {
+ Operator _optr;
+ AggregateOperator _aop;
+ JavaPairRDD<MatrixIndexes, MatrixBlock> _in;
+ DataCharacteristics _mc;
+
+ RDDAggregateTask(Operator optr, AggregateOperator aop,
JavaPairRDD<MatrixIndexes,
+ MatrixBlock> input, DataCharacteristics dc) {
+ _optr = optr;
+ _aop = aop;
+ _in = input;
+ _mc = dc;
+ }
+
+ @Override
+ public MatrixBlock call() {
+ AggregateUnaryOperator auop =
(AggregateUnaryOperator)_optr;
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = _in;
+ if( auop.sparseSafe )
+ out = out.filter(new
FilterNonEmptyBlocksFunction());
+
+ JavaRDD<MatrixBlock> out2 = out.map(
+ new RDDUAggFunction2(auop, _mc.getBlocksize()));
+ MatrixBlock out3 = RDDAggregateUtils.aggStable(out2,
_aop);
+
+ //drop correction after aggregation
+ out3.dropLastRowsOrColumns(_aop.correction);
+ return out3;
+ }
+ }
}