[SYSTEMML-562] Frame Left Indexing
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/266d4c8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/266d4c8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/266d4c8c Branch: refs/heads/master Commit: 266d4c8ced805f3bbd56086ae2394a3adfeff0d6 Parents: 3e4ceaf Author: Arvind Surve <ac...@yahoo.com> Authored: Tue Jun 21 21:40:47 2016 -0700 Committer: Arvind Surve <ac...@yahoo.com> Committed: Tue Jun 21 21:40:47 2016 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 9 + .../apache/sysml/hops/recompile/Recompiler.java | 3 +- .../controlprogram/caching/CacheBlock.java | 22 +- .../controlprogram/caching/CacheableData.java | 7 +- .../controlprogram/caching/FrameObject.java | 34 +- .../controlprogram/caching/MatrixObject.java | 2 +- .../context/SparkExecutionContext.java | 129 +++++- .../instructions/SPInstructionParser.java | 4 +- .../spark/AppendMSPInstruction.java | 22 +- .../instructions/spark/BinarySPInstruction.java | 4 +- .../spark/FrameIndexingSPInstruction.java | 330 +++++++++++++++ .../spark/IndexingSPInstruction.java | 119 ++++++ .../spark/MapmmChainSPInstruction.java | 22 +- .../instructions/spark/MapmmSPInstruction.java | 28 +- .../spark/MatrixIndexingSPInstruction.java | 74 +--- .../instructions/spark/PMapmmSPInstruction.java | 14 +- .../ParameterizedBuiltinSPInstruction.java | 22 +- .../instructions/spark/PmmSPInstruction.java | 10 +- .../spark/QuaternarySPInstruction.java | 34 +- .../spark/UaggOuterChainSPInstruction.java | 10 +- .../instructions/spark/WriteSPInstruction.java | 4 +- .../spark/data/BroadcastObject.java | 18 +- .../spark/data/PartitionedBlock.java | 399 +++++++++++++++++++ .../spark/data/PartitionedBroadcast.java | 122 ++++++ .../spark/data/PartitionedBroadcastMatrix.java | 121 ------ .../spark/data/PartitionedMatrixBlock.java | 385 ------------------ .../functions/CopyFrameBlockPairFunction.java | 2 +- .../spark/functions/ExtractGroup.java | 8 +- .../functions/MatrixVectorBinaryOpFunction.java | 8 +- .../MatrixVectorBinaryOpPartitionFunction.java | 8 +- .../functions/OuterVectorBinaryOpFunction.java | 8 +- .../spark/utils/FrameRDDConverterUtils.java | 19 +- .../instructions/spark/utils/RDDSortUtils.java | 12 +- .../instructions/spark/utils/SparkUtils.java | 43 +- .../sysml/runtime/matrix/data/FrameBlock.java | 175 ++++++-- .../sysml/runtime/matrix/data/MatrixBlock.java | 22 +- .../matrix/data/OperationsOnMatrixValues.java | 155 +++++++ .../sysml/runtime/matrix/data/OutputInfo.java | 2 +- .../util/FastBufferedDataOutputStream.java | 3 +- .../sysml/runtime/util/MapReduceTool.java | 76 ++-- .../sysml/runtime/util/UtilFunctions.java | 141 ++++++- .../test/integration/AutomatedTestBase.java | 131 ++++++ .../functions/frame/FrameConverterTest.java | 3 + .../functions/frame/FrameIndexingDistTest.java | 203 ++++++++++ .../functions/frame/FrameMatrixCastingTest.java | 2 +- .../org/apache/sysml/test/utils/TestUtils.java | 131 +++++- .../functions/frame/FrameIndexingDistTest.R | 42 ++ .../functions/frame/FrameIndexingDistTest.dml | 32 ++ 48 files changed, 2373 insertions(+), 801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index e6c8b88..8e53e96 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -67,6 +67,9 @@ public class OptimizerUtils /** Default blocksize if unspecified or for testing purposes */ public static final int DEFAULT_BLOCKSIZE = 1000; + /** Default frame blocksize */ + public static final int DEFAULT_FRAME_BLOCKSIZE = 1000; + /** Default optimization level if unspecified */ public static final OptimizationLevel DEFAULT_OPTLEVEL = OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT; @@ -388,6 +391,12 @@ public class OptimizerUtils DEFAULT_SIZE = getDefaultSize(); } + + public static int getDefaultFrameSize() + { + return DEFAULT_FRAME_BLOCKSIZE; + } + /** * Returns memory budget (according to util factor) in bytes * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index ec89775..5e65bf1 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -2129,7 +2129,8 @@ public class Recompiler DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase()); dop.setDataType(dt); - dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase())); + if(dt != DataType.FRAME) + dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase())); dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0); dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java index 2cc9471..f58a7c2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java @@ -19,7 +19,12 @@ package org.apache.sysml.runtime.controlprogram.caching; +import java.util.ArrayList; + import org.apache.hadoop.io.Writable; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.util.IndexRange; /** @@ -40,7 +45,7 @@ public interface CacheBlock extends Writable * @return */ public long getExactSerializedSize(); - + /** * Indicates if the cache block is subject to shallow serialized, * which is generally true if in-memory size and serialized size @@ -54,4 +59,19 @@ public interface CacheBlock extends Writable * Free unnecessarily allocated empty block. */ public void compactEmptyBlock(); + + public int getNumRows(); + public int getNumColumns(); + + public CacheBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock block) + throws DMLRuntimeException; + + public void merge(CacheBlock that, boolean appendOnly) + throws DMLRuntimeException; + + @SuppressWarnings("rawtypes") + public ArrayList getPairList(); + + ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) + throws DMLRuntimeException; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index 479a520..4bab6b8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -178,7 +178,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data //note: we use the abstraction of LineageObjects for two reasons: (1) to keep track of cleanup //for lazily evaluated RDDs, and (2) as abstraction for environments that do not necessarily have spark libraries available private RDDObject _rddHandle = null; //RDD handle - private BroadcastObject _bcHandle = null; //Broadcast handle + private BroadcastObject<T> _bcHandle = null; //Broadcast handle public GPUObject _gpuHandle = null; /** @@ -362,7 +362,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data rdd.setBackReference(this); } - public BroadcastObject getBroadcastHandle() { + public BroadcastObject<T> getBroadcastHandle() { return _bcHandle; } @@ -370,6 +370,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data * * @param bc */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public void setBroadcastHandle( BroadcastObject bc ) { //cleanup potential old back reference if( _bcHandle != null ) @@ -1064,7 +1065,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data { mc = new MatrixCharacteristics(mc.getRows(), mc.getCols(), ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()); } - MapReduceTool.writeMetaDataFile (filePathAndName + ".mtd", valueType, dataType, mc, oinfo, formatProperties); + MapReduceTool.writeMetaDataFile (filePathAndName + ".mtd", valueType, null, dataType, mc, oinfo, formatProperties); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index 33436b6..04d1543 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -107,6 +107,10 @@ public class FrameObject extends CacheableData<FrameBlock> } } + public void setSchema(List<ValueType> schema) { + _schema = schema; + } + @Override public void refreshMetaData() throws CacheException @@ -139,7 +143,7 @@ public class FrameObject extends CacheableData<FrameBlock> MatrixCharacteristics mc = getMatrixCharacteristics(); return mc.getCols(); } - + @Override protected FrameBlock readBlobFromCache(String fname) throws IOException { return (FrameBlock)LazyWriteBuffer.readBlock(fname, false); @@ -173,11 +177,19 @@ public class FrameObject extends CacheableData<FrameBlock> return data; } + /** + * Read Frame object from RDD + * + * @param rdd + * @param status + * + * @param fo + */ @Override protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) throws IOException { - //note: the read of a matrix block from an RDD might trigger + //note: the read of a frame block from an RDD might trigger //lazy evaluation of pending transformations. RDDObject lrdd = rdd; @@ -186,17 +198,20 @@ public class FrameObject extends CacheableData<FrameBlock> MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; MatrixCharacteristics mc = iimd.getMatrixCharacteristics(); - FrameBlock fb = null; - try { + FrameBlock fb = null; + try + { //prevent unnecessary collect through rdd checkpoint if( rdd.allowsShortCircuitCollect() ) { lrdd = (RDDObject)rdd.getLineageChilds().get(0); } - //collect frame block from binary block RDD + //obtain frame block from RDD int rlen = (int)mc.getRows(); int clen = (int)mc.getCols(); + + //collect frame block from binary cell RDD fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, rlen, clen); } catch(DMLRuntimeException ex) { @@ -205,7 +220,7 @@ public class FrameObject extends CacheableData<FrameBlock> //sanity check correct output if( fb == null ) { - throw new IOException("Unable to load matrix from rdd: "+lrdd.getVarName()); + throw new IOException("Unable to load frame from rdd: "+lrdd.getVarName()); } return fb; @@ -225,12 +240,13 @@ public class FrameObject extends CacheableData<FrameBlock> throws IOException, DMLRuntimeException { //prepare output info - MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; - OutputInfo oinfo = (ofmt != null ? OutputInfo.stringToOutputInfo(ofmt) - : InputInfo.getMatchingOutputInfo (iimd.getInputInfo())); + MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; + OutputInfo oinfo = (ofmt != null ? OutputInfo.stringToOutputInfo (ofmt ) + : InputInfo.getMatchingOutputInfo (iimd.getInputInfo ())); //note: the write of an RDD to HDFS might trigger //lazy evaluation of pending transformations. SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, oinfo); } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index d781730..2144ef8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -229,7 +229,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> if(_data != null && // Not a column vector _data.getNumRows() != 1 && _data.getNumColumns() != 1) { - double[] arr = ((MatrixBlock)_data).getDenseBlock(); + double[] arr = _data.getDenseBlock(); LibMatrixDNN.cacheReuseableData(arr); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 1516286..a2e2f79 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.controlprogram.context; import java.io.IOException; +import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -57,8 +58,8 @@ import org.apache.sysml.runtime.instructions.spark.SPInstruction; import org.apache.sysml.runtime.instructions.spark.data.BlockPartitioner; import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject; import org.apache.sysml.runtime.instructions.spark.data.LineageObject; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction; @@ -69,8 +70,8 @@ import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils. import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -250,6 +251,22 @@ public class SparkExecutionContext extends ExecutionContext } /** + * Spark instructions should call this for all frame inputs except broadcast + * variables. + * + * @param varname + * @return + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) + throws DMLRuntimeException + { + JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); + return out; + } + + /** * * @param varname * @param inputInfo @@ -456,14 +473,14 @@ public class SparkExecutionContext extends ExecutionContext * @throws DMLRuntimeException */ @SuppressWarnings("unchecked") - public PartitionedBroadcastMatrix getBroadcastForVariable( String varname ) + public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixObject mo = getMatrixObject(varname); - PartitionedBroadcastMatrix bret = null; + PartitionedBroadcast<MatrixBlock> bret = null; //reuse existing broadcast handle if( mo.getBroadcastHandle()!=null @@ -481,20 +498,20 @@ public class SparkExecutionContext extends ExecutionContext //create partitioned matrix block and release memory consumed by input MatrixBlock mb = mo.acquireRead(); - PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(mb, brlen, bclen); + PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<MatrixBlock>(mb, brlen, bclen); mo.release(); //determine coarse-grained partitioning - int numPerPart = PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen); + int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen); int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); - Broadcast<PartitionedMatrixBlock>[] ret = new Broadcast[numParts]; + Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts]; //create coarse-grained partitioned broadcasts if( numParts > 1 ) { for( int i=0; i<numParts; i++ ) { int offset = i * numPerPart; int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); - PartitionedMatrixBlock tmp = pmb.createPartition(offset, numBlks); + PartitionedBlock<MatrixBlock> tmp = pmb.createPartition(offset, numBlks, new MatrixBlock()); ret[i] = getSparkContext().broadcast(tmp); } } @@ -502,8 +519,8 @@ public class SparkExecutionContext extends ExecutionContext ret[0] = getSparkContext().broadcast( pmb); } - bret = new PartitionedBroadcastMatrix(ret); - BroadcastObject bchandle = new BroadcastObject(bret, varname); + bret = new PartitionedBroadcast<MatrixBlock>(ret); + BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname); mo.setBroadcastHandle(bchandle); } @@ -515,6 +532,76 @@ public class SparkExecutionContext extends ExecutionContext return bret; } + + /** + * + * @param varname + * @return + * @throws DMLRuntimeException + */ + + @SuppressWarnings("unchecked") + public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname) + throws DMLRuntimeException + { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + + FrameObject fo = getFrameObject(varname); + + PartitionedBroadcast<FrameBlock> bret = null; + + //reuse existing broadcast handle + if( fo.getBroadcastHandle()!=null + && fo.getBroadcastHandle().isValid() ) + { + bret = fo.getBroadcastHandle().getBroadcast(); + } + + //create new broadcast handle (never created, evicted) + if( bret == null ) + { + //obtain meta data for frame + int bclen = (int) fo.getNumColumns(); + int brlen = OptimizerUtils.getDefaultFrameSize(); + + //create partitioned frame block and release memory consumed by input + FrameBlock mb = fo.acquireRead(); + PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<FrameBlock>(mb, brlen, bclen); + fo.release(); + + //determine coarse-grained partitioning + int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen); + int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); + Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts]; + + //create coarse-grained partitioned broadcasts + if( numParts > 1 ) { + for( int i=0; i<numParts; i++ ) { + int offset = i * numPerPart; + int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); + PartitionedBlock<FrameBlock> tmp = pmb.createPartition(offset, numBlks, new FrameBlock()); + ret[i] = getSparkContext().broadcast(tmp); + } + } + else { //single partition + ret[0] = getSparkContext().broadcast( pmb); + } + + bret = new PartitionedBroadcast<FrameBlock>(ret); + BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname); + fo.setBroadcastHandle(bchandle); + } + + if (DMLScript.STATISTICS) { + Statistics.accSparkBroadCastTime(System.nanoTime() - t0); + Statistics.incSparkBroadcastCount(1); + } + + return bret; + } + + + /** * * @param varname @@ -804,7 +891,7 @@ public class SparkExecutionContext extends ExecutionContext return out; } - + /** * * @param rdd @@ -816,13 +903,13 @@ public class SparkExecutionContext extends ExecutionContext * @return * @throws DMLRuntimeException */ - public static PartitionedMatrixBlock toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) + public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; - PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, clen, brlen, bclen); + PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen, new MatrixBlock()); List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); //copy blocks one-at-a-time into output matrix block @@ -831,7 +918,7 @@ public class SparkExecutionContext extends ExecutionContext //unpack index-block pair MatrixIndexes ix = keyval._1(); MatrixBlock block = keyval._2(); - out.setMatrixBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); + out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); } if (DMLScript.STATISTICS) { @@ -873,6 +960,9 @@ public class SparkExecutionContext extends ExecutionContext { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + if(schema == null) + schema = Collections.nCopies(clen, ValueType.STRING); + //create output frame block (w/ lazy allocation) FrameBlock out = new FrameBlock(schema); out.ensureAllocatedColumns(rlen); @@ -887,7 +977,7 @@ public class SparkExecutionContext extends ExecutionContext FrameBlock block = keyval._2(); //copy into output frame - out.copy( ix, ix+block.getNumRows(), + out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block ); } @@ -977,7 +1067,7 @@ public class SparkExecutionContext extends ExecutionContext throws DMLRuntimeException { RDDObject parent = getCacheableData(varParent).getRDDHandle(); - BroadcastObject child = getCacheableData(varChild).getBroadcastHandle(); + BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle(); parent.addLineageChild( child ); } @@ -1049,6 +1139,7 @@ public class SparkExecutionContext extends ExecutionContext * @param lob * @throws IOException */ + @SuppressWarnings({ "rawtypes", "unchecked" }) private void rCleanupLineageObject(LineageObject lob) throws IOException { @@ -1071,9 +1162,9 @@ public class SparkExecutionContext extends ExecutionContext } } else if( lob instanceof BroadcastObject ) { - PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast(); + PartitionedBroadcast pbm = ((BroadcastObject)lob).getBroadcast(); if( pbm != null ) //robustness for evictions - for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() ) + for( Broadcast<PartitionedBlock> bc : pbm.getBroadcasts() ) cleanupBroadcastVariable(bc); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 52b712b..663e6b4 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -52,9 +52,9 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction; import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction; +import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction; -import org.apache.sysml.runtime.instructions.spark.MatrixIndexingSPInstruction; import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction; import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction; @@ -317,7 +317,7 @@ public class SPInstructionParser extends InstructionParser return AggregateTernarySPInstruction.parseInstruction(str); case MatrixIndexing: - return MatrixIndexingSPInstruction.parseInstruction(str); + return IndexingSPInstruction.parseInstruction(str); case Reorg: return ReorgSPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java index 0c1d22c..26be3eb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java @@ -34,7 +34,7 @@ import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -91,7 +91,7 @@ public class AppendMSPInstruction extends BinarySPInstruction int bclen = mc1.getColsPerBlock(); JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( input2.getName() ); + PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() ); long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue(); //execute map-append operations (partitioning preserving if #in-blocks = #out-blocks) @@ -138,14 +138,14 @@ public class AppendMSPInstruction extends BinarySPInstruction { private static final long serialVersionUID = 2738541014432173450L; - private PartitionedBroadcastMatrix _pm = null; + private PartitionedBroadcast<MatrixBlock> _pm = null; private boolean _cbind = true; private long _offset; private int _brlen; private int _bclen; private long _lastBlockColIndex; - public MapSideAppendFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen) + public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) { _pm = binput; _cbind = cbind; @@ -184,12 +184,12 @@ public class AppendMSPInstruction extends BinarySPInstruction if( _cbind ) { ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1), - _pm.getMatrixBlock((int)ix.getRowIndex(), 1)) ); + _pm.getBlock((int)ix.getRowIndex(), 1)) ); } else { //rbind ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()), - _pm.getMatrixBlock(1, (int)ix.getColumnIndex())) ); + _pm.getBlock(1, (int)ix.getColumnIndex())) ); } } //case 3: append operation on boundary block @@ -202,7 +202,7 @@ public class AppendMSPInstruction extends BinarySPInstruction MatrixBlock value_in2 = null; if( _cbind ) { - value_in2 = _pm.getMatrixBlock((int)ix.getRowIndex(), 1); + value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1); if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) { IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1); @@ -210,7 +210,7 @@ public class AppendMSPInstruction extends BinarySPInstruction } } else { //rbind - value_in2 = _pm.getMatrixBlock(1, (int)ix.getColumnIndex()); + value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex()); if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) { IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex()); @@ -233,11 +233,11 @@ public class AppendMSPInstruction extends BinarySPInstruction { private static final long serialVersionUID = 5767240739761027220L; - private PartitionedBroadcastMatrix _pm = null; + private PartitionedBroadcast<MatrixBlock> _pm = null; private boolean _cbind = true; private long _lastBlockColIndex = -1; - public MapSideAppendPartitionFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen) + public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) { _pm = binput; _cbind = cbind; @@ -280,7 +280,7 @@ public class AppendMSPInstruction extends BinarySPInstruction else { int rowix = _cbind ? (int)ix.getRowIndex() : 1; int colix = _cbind ? 1 : (int)ix.getColumnIndex(); - MatrixBlock in2 = _pm.getMatrixBlock(rowix, colix); + MatrixBlock in2 = _pm.getBlock(rowix, colix); MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind); return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java index 4d2e78c..750a624 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java @@ -29,7 +29,7 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.cp.ScalarObject; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.MatrixMatrixBinaryOpFunction; import org.apache.sysml.runtime.instructions.spark.functions.MatrixScalarUnaryFunction; import org.apache.sysml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction; @@ -154,7 +154,7 @@ public abstract class BinarySPInstruction extends ComputationSPInstruction String rddVar = input1.getName(); String bcastVar = input2.getName(); JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar ); - PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar ); + PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar ); MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(rddVar); MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(bcastVar); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java new file mode 100644 index 0000000..c1d8766 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + +import org.apache.sysml.hops.AggBinaryOp.SparkAggType; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; +import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.util.IndexRange; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class FrameIndexingSPInstruction extends IndexingSPInstruction +{ + + /* + * This class implements the frame indexing functionality inside Spark. + * Example instructions: + * rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6 + * input=mVar1, output=mVar6, + * bounds = (Var2,Var3,Var4,Var5) + * rowindex_lower: Var2, rowindex_upper: Var3 + * colindex_lower: Var4, colindex_upper: Var5 + * leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7 + * triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2" + * the result is stored in mVar7 + * + */ + public FrameIndexingSPInstruction(Operator op, CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, SparkAggType aggtype, String opcode, String istr) + { + super(op, in, rl, ru, cl, cu, out, aggtype, opcode, istr); + } + + public FrameIndexingSPInstruction(Operator op, CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, String opcode, String istr) + { + super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr); + } + + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + SparkExecutionContext sec = (SparkExecutionContext)ec; + String opcode = getOpcode(); + + //get indexing range + long rl = ec.getScalarInput(rowLower.getName(), rowLower.getValueType(), rowLower.isLiteral()).getLongValue(); + long ru = ec.getScalarInput(rowUpper.getName(), rowUpper.getValueType(), rowUpper.isLiteral()).getLongValue(); + long cl = ec.getScalarInput(colLower.getName(), colLower.getValueType(), colLower.isLiteral()).getLongValue(); + long cu = ec.getScalarInput(colUpper.getName(), colUpper.getValueType(), colUpper.isLiteral()).getLongValue(); + IndexRange ixrange = new IndexRange(rl, ru, cl, cu); + + //left indexing + if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex")) + { + JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() ); + PartitionedBroadcast<FrameBlock> broadcastIn2 = null; + JavaPairRDD<Long,FrameBlock> in2 = null; + JavaPairRDD<Long,FrameBlock> out = null; + + //update and check output dimensions + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + MatrixCharacteristics mcLeft = ec.getMatrixCharacteristics(input1.getName()); + mcOut.set(mcLeft.getRows(), mcLeft.getCols(), mcLeft.getRowsPerBlock(), mcLeft.getColsPerBlock()); + checkValidOutputDimensions(mcOut); + + //note: always frame rhs, scalars are preprocessed via cast to 1x1 frame + MatrixCharacteristics mcRight = ec.getMatrixCharacteristics(input2.getName()); + + //sanity check matching index range and rhs dimensions + if(!mcRight.dimsKnown()) { + throw new DMLRuntimeException("The right input frame dimensions are not specified for FrameIndexingSPInstruction"); + } + if(!(ru-rl+1 == mcRight.getRows() && cu-cl+1 == mcRight.getCols())) { + throw new DMLRuntimeException("Invalid index range of leftindexing: ["+rl+":"+ru+","+cl+":"+cu+"] vs ["+mcRight.getRows()+"x"+mcRight.getCols()+"]." ); + } + + if(opcode.equalsIgnoreCase("mapLeftIndex")) + { + broadcastIn2 = sec.getBroadcastForFrameVariable( input2.getName()); + + //partitioning-preserving mappartitions (key access required for broadcast loopkup) + out = in1.mapPartitionsToPair( + new LeftIndexPartitionFunction(broadcastIn2, ixrange, mcOut), true); + } + else { //general case + + // zero-out lhs + in1 = in1.flatMapToPair(new ZeroOutLHS(false, ixrange, mcLeft)); + + // slice rhs, shift and merge with lhs + in2 = sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() ) + .flatMapToPair(new SliceRHSForLeftIndexing(ixrange, mcLeft)); + + out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1.union(in2)); + } + + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + if( broadcastIn2 != null) + sec.addLineageBroadcast(output.getName(), input2.getName()); + if(in2 != null) + sec.addLineageRDD(output.getName(), input2.getName()); + } + else + throw new DMLRuntimeException("Invalid opcode (" + opcode +") encountered in FrameIndexingSPInstruction."); + } + + /** + * + * @param mcOut + * @throws DMLRuntimeException + */ + private static void checkValidOutputDimensions(MatrixCharacteristics mcOut) + throws DMLRuntimeException + { + if(!mcOut.dimsKnown()) { + throw new DMLRuntimeException("FrameIndexingSPInstruction: The updated output dimensions are invalid: " + mcOut); + } + } + + /** + * + */ + private static class SliceRHSForLeftIndexing implements PairFlatMapFunction<Tuple2<Long,FrameBlock>, Long, FrameBlock> + { + private static final long serialVersionUID = 5724800998701216440L; + + private IndexRange _ixrange = null; + private int _brlen = -1; + private int _bclen = -1; + private long _rlen = -1; + private long _clen = -1; + + public SliceRHSForLeftIndexing(IndexRange ixrange, MatrixCharacteristics mcLeft) { + _ixrange = ixrange; + _rlen = mcLeft.getRows(); + _clen = mcLeft.getCols(); + _brlen = (int) Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen); + _bclen = (int) mcLeft.getCols(); + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> rightKV) + throws Exception + { + Pair<Long,FrameBlock> in = SparkUtils.toIndexedFrameBlock(rightKV); + ArrayList<Pair<Long,FrameBlock>> out = new ArrayList<Pair<Long,FrameBlock>>(); + OperationsOnMatrixValues.performShift(in, _ixrange, _brlen, _bclen, _rlen, _clen, out); + return SparkUtils.fromIndexedFrameBlock(out); + } + } + + /** + * + */ + private static class ZeroOutLHS implements PairFlatMapFunction<Tuple2<Long,FrameBlock>, Long,FrameBlock> + { + private static final long serialVersionUID = -2672267231152496854L; + + private boolean _complement = false; + private IndexRange _ixrange = null; + private int _brlen = -1; + private int _bclen = -1; + private long _rlen = -1; + + public ZeroOutLHS(boolean complement, IndexRange range, MatrixCharacteristics mcLeft) { + _complement = complement; + _ixrange = range; + _brlen = (int) OptimizerUtils.getDefaultFrameSize(); + _bclen = (int) mcLeft.getCols(); + _rlen = mcLeft.getRows(); + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> kv) + throws Exception + { + ArrayList<Pair<Long,FrameBlock>> out = new ArrayList<Pair<Long,FrameBlock>>(); + + IndexRange curBlockRange = new IndexRange(_ixrange.rowStart, _ixrange.rowEnd, _ixrange.colStart, _ixrange.colEnd); + + // Global index of row (1-based) + long lGblStartRow = ((kv._1.longValue()-1)/_brlen)*_brlen+1; + FrameBlock zeroBlk = null; + int iMaxRowsToCopy = 0; + + // Starting local location (0-based) of target block where to start copy. + int iRowStartDest = UtilFunctions.computeCellInBlock(kv._1, _brlen); + for(int iRowStartSrc = 0; iRowStartSrc<kv._2.getNumRows(); iRowStartSrc += iMaxRowsToCopy, lGblStartRow += _brlen) { + IndexRange range = UtilFunctions.getSelectedRangeForZeroOut(new Pair<Long, FrameBlock>(kv._1, kv._2), _brlen, _bclen, curBlockRange, lGblStartRow-1, lGblStartRow); + if(range.rowStart == -1 && range.rowEnd == -1 && range.colStart == -1 && range.colEnd == -1) { + throw new Exception("Error while getting range for zero-out"); + } + //Maximum range of rows in target block + int iMaxRows=(int) Math.min(_brlen, _rlen-lGblStartRow+1); + + // Maximum number of rows to be copied from source block to target. + iMaxRowsToCopy = Math.min(iMaxRows, kv._2.getNumRows()-iRowStartSrc); + iMaxRowsToCopy = Math.min(iMaxRowsToCopy, iMaxRows-iRowStartDest); + + // Zero out the applicable range in this block + zeroBlk = (FrameBlock) kv._2.zeroOutOperations(new FrameBlock(), range, _complement, iRowStartSrc, iRowStartDest, iMaxRows, iMaxRowsToCopy); + out.add(new Pair<Long, FrameBlock>(lGblStartRow, zeroBlk)); + curBlockRange.rowStart = lGblStartRow + _brlen; + iRowStartDest = UtilFunctions.computeCellInBlock(iRowStartDest+iMaxRowsToCopy+1, _brlen); + } + return SparkUtils.fromIndexedFrameBlock(out); + } + } + + /** + * + */ + private static class LeftIndexPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> + { + private static final long serialVersionUID = -911940376947364915L; + + private PartitionedBroadcast<FrameBlock> _binput; + private IndexRange _ixrange = null; + + public LeftIndexPartitionFunction(PartitionedBroadcast<FrameBlock> binput, IndexRange ixrange, MatrixCharacteristics mc) + { + _binput = binput; + _ixrange = ixrange; + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Long, FrameBlock>> arg0) + throws Exception + { + return new LeftIndexPartitionIterator(arg0); + } + + /** + * + */ + private class LeftIndexPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>> + { + public LeftIndexPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) { + super(in); + } + + @Override + protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg) + throws Exception + { + int iNumRowsInBlock = arg._2.getNumRows(); + int iNumCols = arg._2.getNumColumns(); + if(!UtilFunctions.isInFrameBlockRange(arg._1(), iNumRowsInBlock, iNumCols, _ixrange)) { + return arg; + } + + // Calculate global index of left hand side block + long lhs_rl = Math.max(_ixrange.rowStart, arg._1); //Math.max(_ixrange.rowStart, (arg._1-1)*iNumRowsInBlock + 1); + long lhs_ru = Math.min(_ixrange.rowEnd, arg._1+iNumRowsInBlock-1); + long lhs_cl = Math.max(_ixrange.colStart, 1); + long lhs_cu = Math.min(_ixrange.colEnd, iNumCols); + + // Calculate global index of right hand side block + long rhs_rl = lhs_rl - _ixrange.rowStart + 1; + long rhs_ru = rhs_rl + (lhs_ru - lhs_rl); + long rhs_cl = lhs_cl - _ixrange.colStart + 1; + long rhs_cu = rhs_cl + (lhs_cu - lhs_cl); + + // Provide local zero-based index to leftIndexingOperations + int lhs_lrl = (int)(lhs_rl- arg._1); + int lhs_lru = (int)(lhs_ru- arg._1); + int lhs_lcl = (int)lhs_cl-1; + int lhs_lcu = (int)lhs_cu-1; + + FrameBlock ret = arg._2; + int brlen = OptimizerUtils.DEFAULT_BLOCKSIZE; + long rhs_rl_pb = rhs_rl; + long rhs_ru_pb = Math.min(rhs_ru, (((rhs_rl-1)/brlen)+1)*brlen); + while(rhs_rl_pb <= rhs_ru_pb) { + // Provide global zero-based index to sliceOperations, but only for one RHS partition block at a time. + FrameBlock slicedRHSMatBlock = _binput.sliceOperations(rhs_rl_pb, rhs_ru_pb, rhs_cl, rhs_cu, new FrameBlock()); + + // Provide local zero-based index to leftIndexingOperations + int lhs_lrl_pb = (int) (lhs_lrl + (rhs_rl_pb - rhs_rl)); + int lhs_lru_pb = (int) (lhs_lru + (rhs_ru_pb - rhs_ru)); + ret = ret.leftIndexingOperations(slicedRHSMatBlock, lhs_lrl_pb, lhs_lru_pb, lhs_lcl, lhs_lcu, new FrameBlock()); + rhs_rl_pb = rhs_ru_pb + 1; + rhs_ru_pb = Math.min(rhs_ru, rhs_ru_pb+brlen); + } + + return new Tuple2<Long, FrameBlock>(arg._1, ret); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java new file mode 100644 index 0000000..04433dc --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import org.apache.sysml.hops.AggBinaryOp.SparkAggType; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.SimpleOperator; + +public abstract class IndexingSPInstruction extends UnarySPInstruction +{ + + /* + * This class implements the matrix indexing functionality inside Spark. + * Example instructions: + * rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6 + * input=mVar1, output=mVar6, + * bounds = (Var2,Var3,Var4,Var5) + * rowindex_lower: Var2, rowindex_upper: Var3 + * colindex_lower: Var4, colindex_upper: Var5 + * leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7 + * triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2" + * the result is stored in mVar7 + * + */ + protected CPOperand rowLower, rowUpper, colLower, colUpper; + protected SparkAggType _aggType = null; + + public IndexingSPInstruction(Operator op, CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, SparkAggType aggtype, String opcode, String istr) + { + super(op, in, out, opcode, istr); + rowLower = rl; + rowUpper = ru; + colLower = cl; + colUpper = cu; + + _aggType = aggtype; + } + + public IndexingSPInstruction(Operator op, CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, + CPOperand out, String opcode, String istr) + { + super(op, lhsInput, rhsInput, out, opcode, istr); + rowLower = rl; + rowUpper = ru; + colLower = cl; + colUpper = cu; + } + + public static IndexingSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + String opcode = parts[0]; + + if ( opcode.equalsIgnoreCase("rangeReIndex") ) { + if ( parts.length == 8 ) { + // Example: rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6 + CPOperand in = new CPOperand(parts[1]); + CPOperand rl = new CPOperand(parts[2]); + CPOperand ru = new CPOperand(parts[3]); + CPOperand cl = new CPOperand(parts[4]); + CPOperand cu = new CPOperand(parts[5]); + CPOperand out = new CPOperand(parts[6]); + SparkAggType aggtype = SparkAggType.valueOf(parts[7]); + if( in.getDataType()==DataType.MATRIX ) + return new MatrixIndexingSPInstruction(new SimpleOperator(null), in, rl, ru, cl, cu, out, aggtype, opcode, str); + else + return new FrameIndexingSPInstruction(new SimpleOperator(null), in, rl, ru, cl, cu, out, aggtype, opcode, str); + } + else { + throw new DMLRuntimeException("Invalid number of operands in instruction: " + str); + } + } + else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex")) { + if ( parts.length == 8 ) { + // Example: leftIndex:mVar1:mvar2:Var3:Var4:Var5:Var6:mVar7 + CPOperand lhsInput = new CPOperand(parts[1]); + CPOperand rhsInput = new CPOperand(parts[2]); + CPOperand rl = new CPOperand(parts[3]); + CPOperand ru = new CPOperand(parts[4]); + CPOperand cl = new CPOperand(parts[5]); + CPOperand cu = new CPOperand(parts[6]); + CPOperand out = new CPOperand(parts[7]); + if( lhsInput.getDataType()==DataType.MATRIX ) + return new MatrixIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str); + else + return new FrameIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str); + } + else { + throw new DMLRuntimeException("Invalid number of operands in instruction: " + str); + } + } + else { + throw new DMLRuntimeException("Unknown opcode while parsing a IndexingSPInstruction: " + str); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java index 159eddf..99d1978 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java @@ -33,7 +33,7 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -127,7 +127,7 @@ public class MapmmChainSPInstruction extends SPInstruction //get rdd and broadcast inputs JavaPairRDD<MatrixIndexes,MatrixBlock> inX = sec.getBinaryBlockRDDHandleForVariable( _input1.getName() ); - PartitionedBroadcastMatrix inV = sec.getBroadcastForVariable( _input2.getName() ); + PartitionedBroadcast<MatrixBlock> inV = sec.getBroadcastForVariable( _input2.getName() ); //execute mapmmchain (guaranteed to have single output block) MatrixBlock out = null; @@ -137,7 +137,7 @@ public class MapmmChainSPInstruction extends SPInstruction out = RDDAggregateUtils.sumStable(tmp); } else { // ChainType.XtwXv / ChainType.XtXvy - PartitionedBroadcastMatrix inW = sec.getBroadcastForVariable( _input3.getName() ); + PartitionedBroadcast<MatrixBlock> inW = sec.getBroadcastForVariable( _input3.getName() ); RDDMapMMChainFunction2 fmmc = new RDDMapMMChainFunction2(inV, inW, _chainType); JavaPairRDD<MatrixIndexes,MatrixBlock> tmp = inX.mapToPair(fmmc); out = RDDAggregateUtils.sumStable(tmp); @@ -157,9 +157,9 @@ public class MapmmChainSPInstruction extends SPInstruction { private static final long serialVersionUID = 8197406787010296291L; - private PartitionedBroadcastMatrix _pmV = null; + private PartitionedBroadcast<MatrixBlock> _pmV = null; - public RDDMapMMChainFunction( PartitionedBroadcastMatrix bV) + public RDDMapMMChainFunction( PartitionedBroadcast<MatrixBlock> bV) throws DMLRuntimeException { //get first broadcast vector (always single block) @@ -170,7 +170,7 @@ public class MapmmChainSPInstruction extends SPInstruction public MatrixBlock call( MatrixBlock arg0 ) throws Exception { - MatrixBlock pmV = _pmV.getMatrixBlock(1, 1); + MatrixBlock pmV = _pmV.getBlock(1, 1); //execute mapmmchain operation MatrixBlock out = new MatrixBlock(); @@ -186,11 +186,11 @@ public class MapmmChainSPInstruction extends SPInstruction { private static final long serialVersionUID = -7926980450209760212L; - private PartitionedBroadcastMatrix _pmV = null; - private PartitionedBroadcastMatrix _pmW = null; + private PartitionedBroadcast<MatrixBlock> _pmV = null; + private PartitionedBroadcast<MatrixBlock> _pmW = null; private ChainType _chainType = null; - public RDDMapMMChainFunction2( PartitionedBroadcastMatrix bV, PartitionedBroadcastMatrix bW, ChainType chain) + public RDDMapMMChainFunction2( PartitionedBroadcast<MatrixBlock> bV, PartitionedBroadcast<MatrixBlock> bW, ChainType chain) throws DMLRuntimeException { //get both broadcast vectors (first always single block) @@ -203,7 +203,7 @@ public class MapmmChainSPInstruction extends SPInstruction public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { - MatrixBlock pmV = _pmV.getMatrixBlock(1, 1); + MatrixBlock pmV = _pmV.getBlock(1, 1); MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); @@ -213,7 +213,7 @@ public class MapmmChainSPInstruction extends SPInstruction MatrixBlock blkOut = new MatrixBlock(); //execute mapmmchain operation - blkIn.chainMatrixMultOperations(pmV, _pmW.getMatrixBlock(rowIx,1), blkOut, _chainType); + blkIn.chainMatrixMultOperations(pmV, _pmW.getBlock(rowIx,1), blkOut, _chainType); //output new tuple return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 400ccf0..5b3e9ad 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -40,7 +40,7 @@ import org.apache.sysml.runtime.functionobjects.Plus; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -117,7 +117,7 @@ public class MapmmSPInstruction extends BinarySPInstruction //get inputs JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar ); - PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar ); + PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar ); //empty input block filter if( !_outputEmpty ) @@ -196,9 +196,9 @@ public class MapmmSPInstruction extends BinarySPInstruction private CacheType _type = null; private AggregateBinaryOperator _op = null; - private PartitionedBroadcastMatrix _pbc = null; + private PartitionedBroadcast<MatrixBlock> _pbc = null; - public RDDMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput ) + public RDDMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput ) { _type = type; _pbc = binput; @@ -221,7 +221,7 @@ public class MapmmSPInstruction extends BinarySPInstruction if( _type == CacheType.LEFT ) { //get the right hand side matrix - MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); //execute matrix-vector mult OperationsOnMatrixValues.performAggregateBinary( @@ -230,7 +230,7 @@ public class MapmmSPInstruction extends BinarySPInstruction else //if( _type == CacheType.RIGHT ) { //get the right hand side matrix - MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); //execute matrix-vector mult OperationsOnMatrixValues.performAggregateBinary( @@ -252,9 +252,9 @@ public class MapmmSPInstruction extends BinarySPInstruction private CacheType _type = null; private AggregateBinaryOperator _op = null; - private PartitionedBroadcastMatrix _pbc = null; + private PartitionedBroadcast<MatrixBlock> _pbc = null; - public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcastMatrix binput ) + public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput ) { _type = type; _pbc = binput; @@ -293,7 +293,7 @@ public class MapmmSPInstruction extends BinarySPInstruction if( _type == CacheType.LEFT ) { //get the right hand side matrix - MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); //execute index preserving matrix multiplication left.aggregateBinaryOperations(left, blkIn, blkOut, _op); @@ -301,7 +301,7 @@ public class MapmmSPInstruction extends BinarySPInstruction else //if( _type == CacheType.RIGHT ) { //get the right hand side matrix - MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); //execute index preserving matrix multiplication blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op); @@ -322,9 +322,9 @@ public class MapmmSPInstruction extends BinarySPInstruction private CacheType _type = null; private AggregateBinaryOperator _op = null; - private PartitionedBroadcastMatrix _pbc = null; + private PartitionedBroadcast<MatrixBlock> _pbc = null; - public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput ) + public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput ) { _type = type; _pbc = binput; @@ -349,7 +349,7 @@ public class MapmmSPInstruction extends BinarySPInstruction int len = _pbc.getNumRowBlocks(); for( int i=1; i<=len; i++ ) { - MatrixBlock left = _pbc.getMatrixBlock(i, (int)ixIn.getRowIndex()); + MatrixBlock left = _pbc.getBlock(i, (int)ixIn.getRowIndex()); MatrixIndexes ixOut = new MatrixIndexes(); MatrixBlock blkOut = new MatrixBlock(); @@ -367,7 +367,7 @@ public class MapmmSPInstruction extends BinarySPInstruction for( int j=1; j<=len; j++ ) { //get the right hand side matrix - MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), j); + MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), j); MatrixIndexes ixOut = new MatrixIndexes(); MatrixBlock blkOut = new MatrixBlock(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java index 4248999..e7bf894 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java @@ -33,10 +33,9 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.IsBlockInRange; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; @@ -46,13 +45,11 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.SimpleOperator; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; -public class MatrixIndexingSPInstruction extends UnarySPInstruction +public class MatrixIndexingSPInstruction extends IndexingSPInstruction { - /* * This class implements the matrix indexing functionality inside CP. * Example instructions: @@ -66,72 +63,17 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction * the result is stored in mVar7 * */ - protected CPOperand rowLower, rowUpper, colLower, colUpper; - protected SparkAggType _aggType = null; - + public MatrixIndexingSPInstruction(Operator op, CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, CPOperand out, SparkAggType aggtype, String opcode, String istr) { - super(op, in, out, opcode, istr); - rowLower = rl; - rowUpper = ru; - colLower = cl; - colUpper = cu; - - _aggType = aggtype; + super(op, in, rl, ru, cl, cu, out, aggtype, opcode, istr); } public MatrixIndexingSPInstruction(Operator op, CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, CPOperand out, String opcode, String istr) { - super(op, lhsInput, rhsInput, out, opcode, istr); - rowLower = rl; - rowUpper = ru; - colLower = cl; - colUpper = cu; - } - - public static MatrixIndexingSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - String opcode = parts[0]; - - if ( opcode.equalsIgnoreCase("rangeReIndex") ) { - if ( parts.length == 8 ) { - // Example: rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6 - CPOperand in = new CPOperand(parts[1]); - CPOperand rl = new CPOperand(parts[2]); - CPOperand ru = new CPOperand(parts[3]); - CPOperand cl = new CPOperand(parts[4]); - CPOperand cu = new CPOperand(parts[5]); - CPOperand out = new CPOperand(parts[6]); - SparkAggType aggtype = SparkAggType.valueOf(parts[7]); - return new MatrixIndexingSPInstruction(new SimpleOperator(null), in, rl, ru, cl, cu, out, aggtype, opcode, str); - } - else { - throw new DMLRuntimeException("Invalid number of operands in instruction: " + str); - } - } - else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex")) { - if ( parts.length == 8 ) { - // Example: leftIndex:mVar1:mvar2:Var3:Var4:Var5:Var6:mVar7 - CPOperand lhsInput = new CPOperand(parts[1]); - CPOperand rhsInput = new CPOperand(parts[2]); - CPOperand rl = new CPOperand(parts[3]); - CPOperand ru = new CPOperand(parts[4]); - CPOperand cl = new CPOperand(parts[5]); - CPOperand cu = new CPOperand(parts[6]); - CPOperand out = new CPOperand(parts[7]); - return new MatrixIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str); - } - else { - throw new DMLRuntimeException("Invalid number of operands in instruction: " + str); - } - } - else { - throw new DMLRuntimeException("Unknown opcode while parsing a MatrixIndexingSPInstruction: " + str); - } + super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr); } @Override @@ -181,7 +123,7 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex")) { JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - PartitionedBroadcastMatrix broadcastIn2 = null; + PartitionedBroadcast<MatrixBlock> broadcastIn2 = null; JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null; JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; @@ -334,12 +276,12 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction { private static final long serialVersionUID = 1757075506076838258L; - private PartitionedBroadcastMatrix _binput; + private PartitionedBroadcast<MatrixBlock> _binput; private IndexRange _ixrange = null; private int _brlen = -1; private int _bclen = -1; - public LeftIndexPartitionFunction(PartitionedBroadcastMatrix binput, IndexRange ixrange, MatrixCharacteristics mc) + public LeftIndexPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, IndexRange ixrange, MatrixCharacteristics mc) { _binput = binput; _ixrange = ixrange; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index 0b0ee35..a69a67b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -38,7 +38,7 @@ import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.functionobjects.Plus; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock; import org.apache.sysml.runtime.instructions.spark.functions.IsBlockInRange; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -115,8 +115,8 @@ public class PMapmmSPInstruction extends BinarySPInstruction .mapToPair(new PMapMMRebaseBlocksFunction(i/mc1.getRowsPerBlock())); int rlen = (int)Math.min(mc1.getRows()-i, NUM_ROWBLOCKS*mc1.getRowsPerBlock()); - PartitionedMatrixBlock pmb = SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L); - Broadcast<PartitionedMatrixBlock> bpmb = sec.getSparkContext().broadcast(pmb); + PartitionedBlock<MatrixBlock> pmb = SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L); + Broadcast<PartitionedBlock<MatrixBlock>> bpmb = sec.getSparkContext().broadcast(pmb); //matrix multiplication JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2 @@ -178,10 +178,10 @@ public class PMapmmSPInstruction extends BinarySPInstruction private static final long serialVersionUID = -4520080421816885321L; private AggregateBinaryOperator _op = null; - private Broadcast<PartitionedMatrixBlock> _pbc = null; + private Broadcast<PartitionedBlock<MatrixBlock>> _pbc = null; private long _offset = -1; - public PMapMMFunction( Broadcast<PartitionedMatrixBlock> binput, long offset ) + public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> binput, long offset ) { _pbc = binput; _offset = offset; @@ -195,7 +195,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { - PartitionedMatrixBlock pm = _pbc.value(); + PartitionedBlock<MatrixBlock> pm = _pbc.value(); MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); @@ -207,7 +207,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction //get the right hand side matrix for( int i=1; i<=pm.getNumRowBlocks(); i++ ) { - MatrixBlock left = pm.getMatrixBlock(i, (int)ixIn.getRowIndex()); + MatrixBlock left = pm.getBlock(i, (int)ixIn.getRowIndex()); //execute matrix-vector mult OperationsOnMatrixValues.performAggregateBinary( http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index 0d45ae1..a80dd1b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -45,7 +45,7 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupJoin; import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroupNWeights; @@ -196,7 +196,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction String targetVar = params.get(Statement.GAGG_TARGET); String groupsVar = params.get(Statement.GAGG_GROUPS); JavaPairRDD<MatrixIndexes,MatrixBlock> target = sec.getBinaryBlockRDDHandleForVariable(targetVar); - PartitionedBroadcastMatrix groups = sec.getBroadcastForVariable(groupsVar); + PartitionedBroadcast<MatrixBlock> groups = sec.getBroadcastForVariable(groupsVar); MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( targetVar ); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); CPOperand ngrpOp = new CPOperand(params.get(Statement.GAGG_NUM_GROUPS)); @@ -252,7 +252,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //execute basic grouped aggregate (extract and preagg) if( broadcastGroups ) { - PartitionedBroadcastMatrix pbm = sec.getBroadcastForVariable(groupsVar); + PartitionedBroadcast<MatrixBlock> pbm = sec.getBroadcastForVariable(groupsVar); groupWeightedCells = target .flatMapToPair(new ExtractGroupBroadcast(pbm, mc1.getColsPerBlock(), ngroups, _optr)); } @@ -314,7 +314,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //get input rdd handle JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( rddInVar ); JavaPairRDD<MatrixIndexes,MatrixBlock> off; - PartitionedBroadcastMatrix broadcastOff; + PartitionedBroadcast<MatrixBlock> broadcastOff; long brlen = mcIn.getRowsPerBlock(); long bclen = mcIn.getColsPerBlock(); long numRep = (long)Math.ceil( rows ? (double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen); @@ -529,9 +529,9 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction private long _brlen; private long _bclen; - private PartitionedBroadcastMatrix _off = null; + private PartitionedBroadcast<MatrixBlock> _off = null; - public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen, PartitionedBroadcastMatrix off) + public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen, PartitionedBroadcast<MatrixBlock> off) { _rmRows = rmRows; _len = len; @@ -549,9 +549,9 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //IndexedMatrixValue offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2()._2()); IndexedMatrixValue offsets = null; if(_rmRows) - offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock((int)arg0._1().getRowIndex(), 1)); + offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock((int)arg0._1().getRowIndex(), 1)); else - offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock(1, (int)arg0._1().getColumnIndex())); + offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock(1, (int)arg0._1().getColumnIndex())); //execute remove empty operations ArrayList<IndexedMatrixValue> out = new ArrayList<IndexedMatrixValue>(); @@ -606,13 +606,13 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction { private static final long serialVersionUID = 6795402640178679851L; - private PartitionedBroadcastMatrix _pbm = null; + private PartitionedBroadcast<MatrixBlock> _pbm = null; private Operator _op = null; private int _ngroups = -1; private int _brlen = -1; private int _bclen = -1; - public RDDMapGroupedAggFunction(PartitionedBroadcastMatrix pbm, Operator op, int ngroups, int brlen, int bclen) + public RDDMapGroupedAggFunction(PartitionedBroadcast<MatrixBlock> pbm, Operator op, int ngroups, int brlen, int bclen) { _pbm = pbm; _op = op; @@ -628,7 +628,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //get all inputs MatrixIndexes ix = arg0._1(); MatrixBlock target = arg0._2(); - MatrixBlock groups = _pbm.getMatrixBlock((int)ix.getRowIndex(), 1); + MatrixBlock groups = _pbm.getBlock((int)ix.getRowIndex(), 1); //execute map grouped aggregate operations IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(ix, target); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java index 44313e6..b1e7d07 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java @@ -37,7 +37,7 @@ import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.functionobjects.Plus; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -106,7 +106,7 @@ public class PmmSPInstruction extends BinarySPInstruction //get inputs JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar ); - PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar ); + PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar ); //execute pmm instruction JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1 @@ -130,11 +130,11 @@ public class PmmSPInstruction extends BinarySPInstruction { private static final long serialVersionUID = -1696560050436469140L; - private PartitionedBroadcastMatrix _pmV = null; + private PartitionedBroadcast<MatrixBlock> _pmV = null; private long _rlen = -1; private int _brlen = -1; - public RDDPMMFunction( CacheType type, PartitionedBroadcastMatrix binput, long rlen, int brlen ) + public RDDPMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput, long rlen, int brlen ) throws DMLRuntimeException { _brlen = brlen; @@ -151,7 +151,7 @@ public class PmmSPInstruction extends BinarySPInstruction MatrixBlock mb2 = arg0._2(); //get the right hand side matrix - MatrixBlock mb1 = _pmV.getMatrixBlock((int)ixIn.getRowIndex(), 1); + MatrixBlock mb1 = _pmV.getBlock((int)ixIn.getRowIndex(), 1); //compute target block indexes long minPos = UtilFunctions.toLong( mb1.minNonZero() );