Repository: systemml
Updated Branches:
  refs/heads/master bda61b600 -> 1a58946a0


[SYSTEMML-2503/04] Fix correctness in-place and broadcast cumagg ops

This patch fixes correctness issues of in-place cumulative aggregate
operations and well as the handling of lineage tracing on spark cumagg
offset. In addition, the patch also includes a minor performance
improvement that avoids unnecessary copying of offset vectors on cumagg
offset operations.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1a58946a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1a58946a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1a58946a

Branch: refs/heads/master
Commit: 1a58946a0a335ccae61d0cf3873a937467ae5544
Parents: bda61b6
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sat Dec 8 13:40:33 2018 +0100
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sat Dec 8 13:40:33 2018 +0100

----------------------------------------------------------------------
 .../instructions/spark/CumulativeOffsetSPInstruction.java |  9 ++++++---
 .../apache/sysml/runtime/matrix/data/LibMatrixAgg.java    | 10 ++++++----
 .../org/apache/sysml/runtime/matrix/data/MatrixBlock.java |  4 ++--
 .../java/org/apache/sysml/runtime/util/DataConverter.java |  9 ++++++++-
 4 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 1b26060..3dba53e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -94,8 +95,9 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction {
                //get and join inputs
                JavaPairRDD<MatrixIndexes,MatrixBlock> inData = 
sec.getBinaryBlockRDDHandleForVariable(input1.getName());
                JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> 
joined = null;
+               boolean broadcast = _broadcast && 
!SparkUtils.isHashPartitioned(inData);
                
-               if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) {
+               if( broadcast ) {
                        //broadcast offsets and broadcast join with data
                        PartitionedBroadcast<MatrixBlock> inAgg = 
sec.getBroadcastForVariable(input2.getName());
                        joined = inData.mapToPair(new 
RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));
@@ -119,7 +121,7 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction {
                        updateUnaryOutputMatrixCharacteristics(sec);
                sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
-               sec.addLineage(output.getName(), input2.getName(), _broadcast);
+               sec.addLineage(output.getName(), input2.getName(), broadcast);
        }
 
        private static class RDDCumSplitFunction implements 
PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
@@ -229,7 +231,8 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction {
                        
                        //blockwise cumagg computation, incl offset aggregation
                        return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, 
blkOut, _uop,
-                               DataConverter.convertToDoubleVector(oblkIn));
+                               DataConverter.convertToDoubleVector(oblkIn, 
false,
+                               ((Builtin)_uop.fn).bFunc == 
BuiltinCode.CUMSUM));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 5e785d9..ed7d8f1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -294,14 +294,16 @@ public class LibMatrixAgg
                final int n2 = out.clen;
                
                //filter empty input blocks (incl special handling for 
sparse-unsafe operations)
-               if( in.isEmptyBlock(false) && (agg == null || aggtype == 
AggType.CUM_SUM_PROD ) ) {
+               if( in.isEmpty() && (agg == null || aggtype == 
AggType.CUM_SUM_PROD) ) {
                        return aggregateUnaryMatrixEmpty(in, out, aggtype, 
null);
                }
                
                //allocate output arrays (if required)
-               if( !uop.isInplace() || in.isInSparseFormat() ) {
+               if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) 
{
                        out.reset(m2, n2, false); //always dense
                        out.allocateDenseBlock();
+                       if( in.isEmpty() )
+                               in.allocateBlock();
                }
                else {
                        out = in;
@@ -340,14 +342,14 @@ public class LibMatrixAgg
                final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1;
                
                //filter empty input blocks (incl special handling for 
sparse-unsafe operations)
-               if( in.isEmptyBlock(false) ){
+               if( in.isEmpty() ){
                        return aggregateUnaryMatrixEmpty(in, out, aggtype, 
null);
                }
 
                //Timing time = new Timing(true);
                
                //allocate output arrays (if required)
-               if( !uop.isInplace() || in.isInSparseFormat() ) {
+               if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) 
{
                        out.reset(m2, n2, false); //always dense
                        out.allocateDenseBlock();
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 7e93598..d1f9ac9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2656,9 +2656,9 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( LibMatrixAgg.isSupportedUnaryOperator(op) ) {
                        //e.g., cumsum/cumprod/cummin/cumax/cumsumprod
                        if( op.getNumThreads() > 1 )
-                               LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, 
op, op.getNumThreads());
+                               ret = 
LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
                        else
-                               LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, 
op);
+                               ret = 
LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
                }
                else if(!sparse && !isEmptyBlock(false)
                        && 
OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index f4d3fb5..0c99959 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -344,8 +344,15 @@ public class DataConverter
                return convertToDoubleVector(mb, true);
        }
        
-       public static double[] convertToDoubleVector( MatrixBlock mb, boolean 
deep )
+       public static double[] convertToDoubleVector( MatrixBlock mb, boolean 
deep ) {
+               return convertToDoubleVector(mb, deep, false);
+       }
+       
+       public static double[] convertToDoubleVector( MatrixBlock mb, boolean 
deep, boolean allowNull )
        {
+               if( mb.isEmpty() && allowNull )
+                       return null;
+               
                int rows = mb.getNumRows();
                int cols = mb.getNumColumns();
                double[] ret = (!mb.isInSparseFormat() && mb.isAllocated() && 
!deep) ? 

Reply via email to