Repository: systemml Updated Branches: refs/heads/master d2efa65c8 -> 11e460573
[MINOR] Replace QuaternaryOp copy/pasted code with methods Closes #640. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/11e46057 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/11e46057 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/11e46057 Branch: refs/heads/master Commit: 11e4605735f139b18b008e34a2d505397097389c Parents: d2efa65 Author: Deron Eriksson <de...@apache.org> Authored: Mon Aug 28 10:19:48 2017 -0700 Committer: Deron Eriksson <de...@apache.org> Committed: Mon Aug 28 10:19:48 2017 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/QuaternaryOp.java | 368 +++++-------------- 1 file changed, 88 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/11e46057/src/main/java/org/apache/sysml/hops/QuaternaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java index 17188be..bfbaae7 100644 --- a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java +++ b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java @@ -321,6 +321,74 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop setLops( wsloss ); } + private Lop obtainlU(Hop U, Hop V, boolean cacheU, double m1Size) throws HopsException, LopsException { + Lop lU = null; + if (cacheU) { + // partitioning of U for read through distributed cache + boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; + lU = U.constructLops(); + if (needPartU) { // requires partitioning + lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, + (m1Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP, + PDataPartitionFormat.ROW_BLOCK_WISE_N); + lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), + U.getNnz()); + setLineNumbers(lU); + } + } else { + // replication of U for shuffle to target block + Lop offset = createOffsetLop(V, false); // ncol of t(V) -> nrow of V determines num replicates + lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); + lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), + U.getNnz()); + setLineNumbers(lU); + + Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); + grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), + -1); + setLineNumbers(grpU); + lU = grpU; + } + return lU; + } + + private Lop obtainlV(Hop U, Hop V, boolean cacheV, double m2Size) throws HopsException, LopsException { + Lop lV = null; + if (cacheV) { + // partitioning of V for read through distributed cache + boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; + lV = V.constructLops(); + if (needPartV) { // requires partitioning + lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, + (m2Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP, + PDataPartitionFormat.ROW_BLOCK_WISE_N); + lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), + V.getNnz()); + setLineNumbers(lV); + } + } else { + // replication of t(V) for shuffle to target block + Transform ltV = new Transform(V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), + getValueType(), ExecType.MR); + ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), + V.getNnz()); + setLineNumbers(ltV); + + Lop offset = createOffsetLop(U, false); // nrow of U determines num replicates + lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); + lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), + V.getNnz()); + setLineNumbers(lV); + + Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); + grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), + -1); + setLineNumbers(grpV); + lV = grpV; + } + return lV; + } + private void constructMRLopsWeightedSquaredLoss(WeightsType wtype) throws HopsException, LopsException { @@ -395,62 +463,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), W.getRowsInBlock(), W.getColsInBlock(), -1); setLineNumbers(grpW); } - - Lop lU = null; - if( cacheU ) { - //partitioning of U for read through distributed cache - boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lU = U.constructLops(); - if( needPartU ){ //requires partitioning - lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - } - } - else { - //replication of U for shuffle to target block - Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates - lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), - U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - - Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); - setLineNumbers(grpU); - lU = grpU; - } - - Lop lV = null; - if( cacheV ) { - //partitioning of V for read through distributed cache - boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lV = V.constructLops(); - if( needPartV ){ //requires partitioning - lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); - setLineNumbers(lV); - } - } - else { - //replication of t(V) for shuffle to target block - Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); - ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(ltV); - - Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates - lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); - lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(lV); - - Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); - setLineNumbers(grpV); - lV = grpV; - } - + + Lop lU = obtainlU(U, V, cacheU, m1Size); + Lop lV = obtainlV(U, V, cacheV, m2Size); + //reduce-side wsloss w/ or without broadcast Lop wsloss = new WeightedSquaredLossR( grpX, lU, lV, grpW, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); @@ -596,62 +612,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); setLineNumbers(grpX); - - Lop lU = null; - if( cacheU ) { - //partitioning of U for read through distributed cache - boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lU = U.constructLops(); - if( needPartU ){ //requires partitioning - lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - } - } - else { - //replication of U for shuffle to target block - Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates - lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), - U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - - Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); - setLineNumbers(grpU); - lU = grpU; - } - - Lop lV = null; - if( cacheV ) { - //partitioning of V for read through distributed cache - boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lV = V.constructLops(); - if( needPartV ){ //requires partitioning - lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); - setLineNumbers(lV); - } - } - else { - //replication of t(V) for shuffle to target block - Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); - ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(ltV); - - Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates - lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); - lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(lV); - - Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); - setLineNumbers(grpV); - lV = grpV; - } - + + Lop lU = obtainlU(U, V, cacheU, m1Size); + Lop lV = obtainlV(U, V, cacheV, m2Size); + //reduce-side wsig w/ or without broadcast Lop wsigmoid = new WeightedSigmoidR( grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); @@ -792,62 +756,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop grpX = new Group(grpX, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); setLineNumbers(grpX); - - Lop lU = null; - if( cacheU ) { - //partitioning of U for read through distributed cache - boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lU = U.constructLops(); - if( needPartU ){ //requires partitioning - lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - } - } - else { - //replication of U for shuffle to target block - Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates - lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), - U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - - Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); - setLineNumbers(grpU); - lU = grpU; - } - - Lop lV = null; - if( cacheV ) { - //partitioning of V for read through distributed cache - boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lV = V.constructLops(); - if( needPartV ){ //requires partitioning - lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); - setLineNumbers(lV); - } - } - else { - //replication of t(V) for shuffle to target block - Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); - ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(ltV); - - Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates - lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); - lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(lV); - - Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); - setLineNumbers(grpV); - lV = grpV; - } - + + Lop lU = obtainlU(U, V, cacheU, m1Size); + Lop lV = obtainlV(U, V, cacheV, m2Size); + //reduce-side wdivmm w/ or without broadcast Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); @@ -1006,62 +918,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1); setLineNumbers(grpX); - - Lop lU = null; - if( cacheU ) { - //partitioning of U for read through distributed cache - boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lU = U.constructLops(); - if( needPartU ){ //requires partitioning - lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - } - } - else { - //replication of U for shuffle to target block - Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates - lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), - U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - - Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); - setLineNumbers(grpU); - lU = grpU; - } - - Lop lV = null; - if( cacheV ) { - //partitioning of V for read through distributed cache - boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lV = V.constructLops(); - if( needPartV ){ //requires partitioning - lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); - setLineNumbers(lV); - } - } - else { - //replication of t(V) for shuffle to target block - Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); - ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(ltV); - - Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates - lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); - lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(lV); - - Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); - setLineNumbers(grpV); - lV = grpV; - } - + + Lop lU = obtainlU(U, V, cacheU, m1Size); + Lop lV = obtainlV(U, V, cacheV, m2Size); + //reduce-side wcemm w/ or without broadcast Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, eps.constructLops(), DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR); @@ -1215,62 +1075,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz()); setLineNumbers(grpX); - - Lop lU = null; - if( cacheU ) { - //partitioning of U for read through distributed cache - boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lU = U.constructLops(); - if( needPartU ){ //requires partitioning - lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - } - } - else { - //replication of U for shuffle to target block - Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates - lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType()); - lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), - U.getRowsInBlock(), U.getColsInBlock(), U.getNnz()); - setLineNumbers(lU); - - Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1); - setLineNumbers(grpU); - lU = grpU; - } - - Lop lV = null; - if( cacheV ) { - //partitioning of V for read through distributed cache - boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE; - lV = V.constructLops(); - if( needPartV ){ //requires partitioning - lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); - lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz()); - setLineNumbers(lV); - } - } - else { - //replication of t(V) for shuffle to target block - Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR); - ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(ltV); - - Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates - lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType()); - lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), - V.getColsInBlock(), V.getRowsInBlock(), V.getNnz()); - setLineNumbers(lV); - - Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE); - grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1); - setLineNumbers(grpV); - lV = grpV; - } - + + Lop lU = obtainlU(U, V, cacheU, m1Size); + Lop lV = obtainlV(U, V, cacheV, m2Size); + //reduce-side wumm w/ or without broadcast Lop wumm = new WeightedUnaryMMR( grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.MR);