Repository: systemml
Updated Branches:
  refs/heads/master d2efa65c8 -> 11e460573


[MINOR] Replace QuaternaryOp copy/pasted code with methods

Closes #640.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/11e46057
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/11e46057
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/11e46057

Branch: refs/heads/master
Commit: 11e4605735f139b18b008e34a2d505397097389c
Parents: d2efa65
Author: Deron Eriksson <de...@apache.org>
Authored: Mon Aug 28 10:19:48 2017 -0700
Committer: Deron Eriksson <de...@apache.org>
Committed: Mon Aug 28 10:19:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/QuaternaryOp.java     | 368 +++++--------------
 1 file changed, 88 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/11e46057/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java 
b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
index 17188be..bfbaae7 100644
--- a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
@@ -321,6 +321,74 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                setLops( wsloss );
        }
 
+       private Lop obtainlU(Hop U, Hop V, boolean cacheU, double m1Size) 
throws HopsException, LopsException {
+               Lop lU = null;
+               if (cacheU) {
+                       // partitioning of U for read through distributed cache
+                       boolean needPartU = !U.dimsKnown() || U.getDim1() * 
U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+                       lU = U.constructLops();
+                       if (needPartU) { // requires partitioning
+                               lU = new DataPartition(lU, DataType.MATRIX, 
ValueType.DOUBLE,
+                                               (m1Size > 
OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+                                               
PDataPartitionFormat.ROW_BLOCK_WISE_N);
+                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(),
+                                               U.getNnz());
+                               setLineNumbers(lU);
+                       }
+               } else {
+                       // replication of U for shuffle to target block
+                       Lop offset = createOffsetLop(V, false); // ncol of t(V) 
-> nrow of V determines num replicates
+                       lU = new RepMat(U.constructLops(), offset, true, 
V.getDataType(), V.getValueType());
+                       lU.getOutputParameters().setDimensions(U.getDim1(), 
U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+                                       U.getNnz());
+                       setLineNumbers(lU);
+
+                       Group grpU = new Group(lU, Group.OperationTypes.Sort, 
DataType.MATRIX, ValueType.DOUBLE);
+                       grpU.getOutputParameters().setDimensions(U.getDim1(), 
U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+                                       -1);
+                       setLineNumbers(grpU);
+                       lU = grpU;
+               }
+               return lU;
+       }
+
+       private Lop obtainlV(Hop U, Hop V, boolean cacheV, double m2Size) 
throws HopsException, LopsException {
+               Lop lV = null;
+               if (cacheV) {
+                       // partitioning of V for read through distributed cache
+                       boolean needPartV = !V.dimsKnown() || V.getDim1() * 
V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+                       lV = V.constructLops();
+                       if (needPartV) { // requires partitioning
+                               lV = new DataPartition(lV, DataType.MATRIX, 
ValueType.DOUBLE,
+                                               (m2Size > 
OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+                                               
PDataPartitionFormat.ROW_BLOCK_WISE_N);
+                               
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(),
+                                               V.getNnz());
+                               setLineNumbers(lV);
+                       }
+               } else {
+                       // replication of t(V) for shuffle to target block
+                       Transform ltV = new Transform(V.constructLops(), 
HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(),
+                                       getValueType(), ExecType.MR);
+                       ltV.getOutputParameters().setDimensions(V.getDim2(), 
V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+                                       V.getNnz());
+                       setLineNumbers(ltV);
+
+                       Lop offset = createOffsetLop(U, false); // nrow of U 
determines num replicates
+                       lV = new RepMat(ltV, offset, false, V.getDataType(), 
V.getValueType());
+                       lV.getOutputParameters().setDimensions(V.getDim2(), 
V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+                                       V.getNnz());
+                       setLineNumbers(lV);
+
+                       Group grpV = new Group(lV, Group.OperationTypes.Sort, 
DataType.MATRIX, ValueType.DOUBLE);
+                       grpV.getOutputParameters().setDimensions(V.getDim2(), 
V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+                                       -1);
+                       setLineNumbers(grpV);
+                       lV = grpV;
+               }
+               return lV;
+       }
+
        private void constructMRLopsWeightedSquaredLoss(WeightsType wtype) 
                throws HopsException, LopsException
        {
@@ -395,62 +463,10 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                                
grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), 
W.getRowsInBlock(), W.getColsInBlock(), -1);
                                setLineNumbers(grpW);
                        }
-                       
-                       Lop lU = null;
-                       if( cacheU ) {
-                               //partitioning of U for read through 
distributed cache
-                               boolean needPartU = !U.dimsKnown() || 
U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lU = U.constructLops();
-                               if( needPartU ){ //requires partitioning
-                                       lU = new DataPartition(lU, 
DataType.MATRIX, ValueType.DOUBLE, 
(m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(), U.getNnz());
-                                       setLineNumbers(lU);     
-                               }
-                       }
-                       else {
-                               //replication of U for shuffle to target block
-                               Lop offset = createOffsetLop(V, false); //ncol 
of t(V) -> nrow of V determines num replicates
-                               lU = new RepMat(U.constructLops(), offset, 
true, V.getDataType(), V.getValueType());
-                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-                                               U.getRowsInBlock(), 
U.getColsInBlock(), U.getNnz());
-                               setLineNumbers(lU);
-                               
-                               Group grpU = new Group(lU, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
U.getRowsInBlock(), U.getColsInBlock(), -1);
-                               setLineNumbers(grpU);
-                               lU = grpU;
-                       }
-                       
-                       Lop lV = null;
-                       if( cacheV ) {
-                               //partitioning of V for read through 
distributed cache
-                               boolean needPartV = !V.dimsKnown() || 
V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lV = V.constructLops();
-                               if( needPartV ){ //requires partitioning
-                                       lV = new DataPartition(lV, 
DataType.MATRIX, ValueType.DOUBLE, 
(m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(), V.getNnz());
-                                       setLineNumbers(lV);     
-                               }
-                       }
-                       else {
-                               //replication of t(V) for shuffle to target 
block
-                               Transform ltV = new Transform( 
V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), 
getValueType(), ExecType.MR);
-                               
ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(ltV);
-                               
-                               Lop offset = createOffsetLop(U, false); //nrow 
of U determines num replicates
-                               lV = new RepMat(ltV, offset, false, 
V.getDataType(), V.getValueType());
-                               
lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(lV);
-                               
-                               Group grpV = new Group(lV, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
V.getColsInBlock(), V.getRowsInBlock(), -1);
-                               setLineNumbers(grpV);
-                               lV = grpV;
-                       }
-                       
+
+                       Lop lU = obtainlU(U, V, cacheU, m1Size);
+                       Lop lV = obtainlV(U, V, cacheV, m2Size);
+
                        //reduce-side wsloss w/ or without broadcast
                        Lop wsloss = new WeightedSquaredLossR( 
                                        grpX, lU, lV, grpW, DataType.MATRIX, 
ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -596,62 +612,10 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                        Group grpX = new Group(X.constructLops(), 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
                        grpX.getOutputParameters().setDimensions(X.getDim1(), 
X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
                        setLineNumbers(grpX);
-                       
-                       Lop lU = null;
-                       if( cacheU ) {
-                               //partitioning of U for read through 
distributed cache
-                               boolean needPartU = !U.dimsKnown() || 
U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lU = U.constructLops();
-                               if( needPartU ){ //requires partitioning
-                                       lU = new DataPartition(lU, 
DataType.MATRIX, ValueType.DOUBLE, 
(m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(), U.getNnz());
-                                       setLineNumbers(lU);     
-                               }
-                       }
-                       else {
-                               //replication of U for shuffle to target block
-                               Lop offset = createOffsetLop(V, false); //ncol 
of t(V) -> nrow of V determines num replicates
-                               lU = new RepMat(U.constructLops(), offset, 
true, V.getDataType(), V.getValueType());
-                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-                                               U.getRowsInBlock(), 
U.getColsInBlock(), U.getNnz());
-                               setLineNumbers(lU);
-                               
-                               Group grpU = new Group(lU, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
U.getRowsInBlock(), U.getColsInBlock(), -1);
-                               setLineNumbers(grpU);
-                               lU = grpU;
-                       }
-                       
-                       Lop lV = null;
-                       if( cacheV ) {
-                               //partitioning of V for read through 
distributed cache
-                               boolean needPartV = !V.dimsKnown() || 
V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lV = V.constructLops();
-                               if( needPartV ){ //requires partitioning
-                                       lV = new DataPartition(lV, 
DataType.MATRIX, ValueType.DOUBLE, 
(m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(), V.getNnz());
-                                       setLineNumbers(lV);     
-                               }
-                       }
-                       else {
-                               //replication of t(V) for shuffle to target 
block
-                               Transform ltV = new Transform( 
V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), 
getValueType(), ExecType.MR);
-                               
ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(ltV);
-                               
-                               Lop offset = createOffsetLop(U, false); //nrow 
of U determines num replicates
-                               lV = new RepMat(ltV, offset, false, 
V.getDataType(), V.getValueType());
-                               
lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(lV);
-                               
-                               Group grpV = new Group(lV, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
V.getColsInBlock(), V.getRowsInBlock(), -1);
-                               setLineNumbers(grpV);
-                               lV = grpV;
-                       }
-                       
+
+                       Lop lU = obtainlU(U, V, cacheU, m1Size);
+                       Lop lV = obtainlV(U, V, cacheV, m2Size);
+
                        //reduce-side wsig w/ or without broadcast
                        Lop wsigmoid = new WeightedSigmoidR( 
                                        grpX, lU, lV, DataType.MATRIX, 
ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -792,62 +756,10 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                                grpX = new Group(grpX, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
                        grpX.getOutputParameters().setDimensions(X.getDim1(), 
X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
                        setLineNumbers(grpX);
-                       
-                       Lop lU = null;
-                       if( cacheU ) {
-                               //partitioning of U for read through 
distributed cache
-                               boolean needPartU = !U.dimsKnown() || 
U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lU = U.constructLops();
-                               if( needPartU ){ //requires partitioning
-                                       lU = new DataPartition(lU, 
DataType.MATRIX, ValueType.DOUBLE, 
(m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(), U.getNnz());
-                                       setLineNumbers(lU);     
-                               }
-                       }
-                       else {
-                               //replication of U for shuffle to target block
-                               Lop offset = createOffsetLop(V, false); //ncol 
of t(V) -> nrow of V determines num replicates
-                               lU = new RepMat(U.constructLops(), offset, 
true, V.getDataType(), V.getValueType());
-                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-                                               U.getRowsInBlock(), 
U.getColsInBlock(), U.getNnz());
-                               setLineNumbers(lU);
-                               
-                               Group grpU = new Group(lU, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
U.getRowsInBlock(), U.getColsInBlock(), -1);
-                               setLineNumbers(grpU);
-                               lU = grpU;
-                       }
-                       
-                       Lop lV = null;
-                       if( cacheV ) {
-                               //partitioning of V for read through 
distributed cache
-                               boolean needPartV = !V.dimsKnown() || 
V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lV = V.constructLops();
-                               if( needPartV ){ //requires partitioning
-                                       lV = new DataPartition(lV, 
DataType.MATRIX, ValueType.DOUBLE, 
(m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(), V.getNnz());
-                                       setLineNumbers(lV);     
-                               }
-                       }
-                       else {
-                               //replication of t(V) for shuffle to target 
block
-                               Transform ltV = new Transform( 
V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), 
getValueType(), ExecType.MR);
-                               
ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(ltV);
-                               
-                               Lop offset = createOffsetLop(U, false); //nrow 
of U determines num replicates
-                               lV = new RepMat(ltV, offset, false, 
V.getDataType(), V.getValueType());
-                               
lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(lV);
-                               
-                               Group grpV = new Group(lV, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
V.getColsInBlock(), V.getRowsInBlock(), -1);
-                               setLineNumbers(grpV);
-                               lV = grpV;
-                       }
-                       
+
+                       Lop lU = obtainlU(U, V, cacheU, m1Size);
+                       Lop lV = obtainlV(U, V, cacheV, m2Size);
+
                        //reduce-side wdivmm w/ or without broadcast
                        Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX, 
                                        DataType.MATRIX, ValueType.DOUBLE, 
wtype, cacheU, cacheV, ExecType.MR);
@@ -1006,62 +918,10 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                        Group grpX = new Group(X.constructLops(), 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
                        grpX.getOutputParameters().setDimensions(X.getDim1(), 
X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1);
                        setLineNumbers(grpX);
-                       
-                       Lop lU = null;
-                       if( cacheU ) {
-                               //partitioning of U for read through 
distributed cache
-                               boolean needPartU = !U.dimsKnown() || 
U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lU = U.constructLops();
-                               if( needPartU ){ //requires partitioning
-                                       lU = new DataPartition(lU, 
DataType.MATRIX, ValueType.DOUBLE, 
(m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(), U.getNnz());
-                                       setLineNumbers(lU);     
-                               }
-                       }
-                       else {
-                               //replication of U for shuffle to target block
-                               Lop offset = createOffsetLop(V, false); //ncol 
of t(V) -> nrow of V determines num replicates
-                               lU = new RepMat(U.constructLops(), offset, 
true, V.getDataType(), V.getValueType());
-                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-                                               U.getRowsInBlock(), 
U.getColsInBlock(), U.getNnz());
-                               setLineNumbers(lU);
-                               
-                               Group grpU = new Group(lU, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
U.getRowsInBlock(), U.getColsInBlock(), -1);
-                               setLineNumbers(grpU);
-                               lU = grpU;
-                       }
-                       
-                       Lop lV = null;
-                       if( cacheV ) {
-                               //partitioning of V for read through 
distributed cache
-                               boolean needPartV = !V.dimsKnown() || 
V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lV = V.constructLops();
-                               if( needPartV ){ //requires partitioning
-                                       lV = new DataPartition(lV, 
DataType.MATRIX, ValueType.DOUBLE, 
(m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(), V.getNnz());
-                                       setLineNumbers(lV);     
-                               }
-                       }
-                       else {
-                               //replication of t(V) for shuffle to target 
block
-                               Transform ltV = new Transform( 
V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), 
getValueType(), ExecType.MR);
-                               
ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(ltV);
-                               
-                               Lop offset = createOffsetLop(U, false); //nrow 
of U determines num replicates
-                               lV = new RepMat(ltV, offset, false, 
V.getDataType(), V.getValueType());
-                               
lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(lV);
-                               
-                               Group grpV = new Group(lV, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
V.getColsInBlock(), V.getRowsInBlock(), -1);
-                               setLineNumbers(grpV);
-                               lV = grpV;
-                       }
-                       
+
+                       Lop lU = obtainlU(U, V, cacheU, m1Size);
+                       Lop lV = obtainlV(U, V, cacheV, m2Size);
+
                        //reduce-side wcemm w/ or without broadcast
                        Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, 
eps.constructLops(),
                                        DataType.MATRIX, ValueType.DOUBLE, 
wtype, cacheU, cacheV, ExecType.MR);
@@ -1215,62 +1075,10 @@ public class QuaternaryOp extends Hop implements 
MultiThreadedHop
                        Group grpX = new Group(X.constructLops(), 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
                        grpX.getOutputParameters().setDimensions(X.getDim1(), 
X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
                        setLineNumbers(grpX);
-                       
-                       Lop lU = null;
-                       if( cacheU ) {
-                               //partitioning of U for read through 
distributed cache
-                               boolean needPartU = !U.dimsKnown() || 
U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lU = U.constructLops();
-                               if( needPartU ){ //requires partitioning
-                                       lU = new DataPartition(lU, 
DataType.MATRIX, ValueType.DOUBLE, 
(m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
getRowsInBlock(), getColsInBlock(), U.getNnz());
-                                       setLineNumbers(lU);     
-                               }
-                       }
-                       else {
-                               //replication of U for shuffle to target block
-                               Lop offset = createOffsetLop(V, false); //ncol 
of t(V) -> nrow of V determines num replicates
-                               lU = new RepMat(U.constructLops(), offset, 
true, V.getDataType(), V.getValueType());
-                               
lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-                                               U.getRowsInBlock(), 
U.getColsInBlock(), U.getNnz());
-                               setLineNumbers(lU);
-                               
-                               Group grpU = new Group(lU, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
U.getRowsInBlock(), U.getColsInBlock(), -1);
-                               setLineNumbers(grpU);
-                               lU = grpU;
-                       }
-                       
-                       Lop lV = null;
-                       if( cacheV ) {
-                               //partitioning of V for read through 
distributed cache
-                               boolean needPartV = !V.dimsKnown() || 
V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-                               lV = V.constructLops();
-                               if( needPartV ){ //requires partitioning
-                                       lV = new DataPartition(lV, 
DataType.MATRIX, ValueType.DOUBLE, 
(m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
-                                       
lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), 
getRowsInBlock(), getColsInBlock(), V.getNnz());
-                                       setLineNumbers(lV);     
-                               }
-                       }
-                       else {
-                               //replication of t(V) for shuffle to target 
block
-                               Transform ltV = new Transform( 
V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), 
getValueType(), ExecType.MR);
-                               
ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(ltV);
-                               
-                               Lop offset = createOffsetLop(U, false); //nrow 
of U determines num replicates
-                               lV = new RepMat(ltV, offset, false, 
V.getDataType(), V.getValueType());
-                               
lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-                                               V.getColsInBlock(), 
V.getRowsInBlock(), V.getNnz());
-                               setLineNumbers(lV);
-                               
-                               Group grpV = new Group(lV, 
Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-                               
grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
V.getColsInBlock(), V.getRowsInBlock(), -1);
-                               setLineNumbers(grpV);
-                               lV = grpV;
-                       }
-                       
+
+                       Lop lU = obtainlU(U, V, cacheU, m1Size);
+                       Lop lV = obtainlV(U, V, cacheV, m2Size);
+
                        //reduce-side wumm w/ or without broadcast
                        Lop wumm = new WeightedUnaryMMR( 
                                        grpX, lU, lV, DataType.MATRIX, 
ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.MR);

Reply via email to