Repository: systemml Updated Branches: refs/heads/master 95cbbd656 -> 7a3447a50
[SYSTEMML-2500] Async matrix allocation on Spark RDD collect This patch introduces a general performance improvement of RDD collect operations into the driver memory, by interleaving the matrix allocation with the collect (and pending RDD evaluation). This is generally useful because it reduces the serial fraction of parallel programs. For example, for 100 distributed sum(cumsum(X)) operations, it reduced the total runtime from 1,102s to 1,006s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/77a7ef15 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/77a7ef15 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/77a7ef15 Branch: refs/heads/master Commit: 77a7ef155d5f3546d053c7f3d11b1ff3b8021834 Parents: 95cbbd6 Author: Matthias Boehm <mboe...@gmail.com> Authored: Sat Dec 1 17:08:45 2018 +0100 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Dec 1 17:08:45 2018 +0100 ---------------------------------------------------------------------- .../controlprogram/caching/LazyWriteBuffer.java | 4 ++++ .../controlprogram/context/SparkExecutionContext.java | 14 ++++++++++---- .../org/apache/sysml/runtime/io/IOUtilFunctions.java | 10 ++++++++++ .../apache/sysml/runtime/matrix/data/MatrixBlock.java | 10 ++++++++++ 4 files changed, 34 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java index 391f21a..d1dc801 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -272,6 +272,10 @@ public class LazyWriteBuffer } } + public static ExecutorService getUtilThreadPool() { + return _fClean != null ? _fClean._pool : null; + } + /** * Extended LinkedHashMap with convenience methods for adding and removing * last/first entries. http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 8981c87..b04aad0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -46,7 +47,6 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.mlcontext.MLContext; import org.apache.sysml.api.mlcontext.MLContextUtil; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.parser.Expression.ValueType; @@ -72,6 +72,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFu import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; @@ -824,7 +825,7 @@ public class SparkExecutionContext extends ExecutionContext long t0 = ConfigurationManager.isStatistics() ? System.nanoTime() : 0; MatrixBlock out = null; - + if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK { //special case without copy and nnz maintenance @@ -846,9 +847,14 @@ public class SparkExecutionContext extends ExecutionContext //create output matrix block (w/ lazy allocation) out = new MatrixBlock(rlen, clen, sparse, lnnz); - + + //kickoff asynchronous allocation + Future<MatrixBlock> fout = out.allocateBlockAsync(); + + //trigger pending RDD operations and collect blocks List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); - + out = IOUtilFunctions.get(fout); //wait for allocation + //copy blocks one-at-a-time into output matrix block long aNnz = 0; for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 4d7e133..6f76d3f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.LinkedList; +import java.util.concurrent.Future; import org.apache.commons.io.input.ReaderInputStream; import org.apache.commons.lang.StringUtils; @@ -624,4 +625,13 @@ public class IOUtilFunctions buff.get(ret, buff.position(), len); return ret; } + + public static <T> T get(Future<T> in) { + try { + return in.get(); + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index ad59e20..ae5ab84 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -31,8 +31,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.stream.IntStream; +import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.apache.commons.math3.random.Well1024a; import org.apache.hadoop.io.DataInputBuffer; import org.apache.sysml.conf.ConfigurationManager; @@ -42,6 +45,7 @@ import org.apache.sysml.lops.MapMultChain.ChainType; import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; +import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; @@ -332,6 +336,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab allocateDenseBlock( true ); return this; } + + public Future<MatrixBlock> allocateBlockAsync() { + ExecutorService pool = LazyWriteBuffer.getUtilThreadPool(); + return (pool != null) ? pool.submit(() -> allocateBlock()) : //async + ConcurrentUtils.constantFuture(allocateBlock()); //fallback sync + } public MatrixBlock allocateBlock() { if( sparse )