[SYSTEMML-562] Frame Left Indexing

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/266d4c8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/266d4c8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/266d4c8c

Branch: refs/heads/master
Commit: 266d4c8ced805f3bbd56086ae2394a3adfeff0d6
Parents: 3e4ceaf
Author: Arvind Surve <ac...@yahoo.com>
Authored: Tue Jun 21 21:40:47 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Tue Jun 21 21:40:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |   9 +
 .../apache/sysml/hops/recompile/Recompiler.java |   3 +-
 .../controlprogram/caching/CacheBlock.java      |  22 +-
 .../controlprogram/caching/CacheableData.java   |   7 +-
 .../controlprogram/caching/FrameObject.java     |  34 +-
 .../controlprogram/caching/MatrixObject.java    |   2 +-
 .../context/SparkExecutionContext.java          | 129 +++++-
 .../instructions/SPInstructionParser.java       |   4 +-
 .../spark/AppendMSPInstruction.java             |  22 +-
 .../instructions/spark/BinarySPInstruction.java |   4 +-
 .../spark/FrameIndexingSPInstruction.java       | 330 +++++++++++++++
 .../spark/IndexingSPInstruction.java            | 119 ++++++
 .../spark/MapmmChainSPInstruction.java          |  22 +-
 .../instructions/spark/MapmmSPInstruction.java  |  28 +-
 .../spark/MatrixIndexingSPInstruction.java      |  74 +---
 .../instructions/spark/PMapmmSPInstruction.java |  14 +-
 .../ParameterizedBuiltinSPInstruction.java      |  22 +-
 .../instructions/spark/PmmSPInstruction.java    |  10 +-
 .../spark/QuaternarySPInstruction.java          |  34 +-
 .../spark/UaggOuterChainSPInstruction.java      |  10 +-
 .../instructions/spark/WriteSPInstruction.java  |   4 +-
 .../spark/data/BroadcastObject.java             |  18 +-
 .../spark/data/PartitionedBlock.java            | 399 +++++++++++++++++++
 .../spark/data/PartitionedBroadcast.java        | 122 ++++++
 .../spark/data/PartitionedBroadcastMatrix.java  | 121 ------
 .../spark/data/PartitionedMatrixBlock.java      | 385 ------------------
 .../functions/CopyFrameBlockPairFunction.java   |   2 +-
 .../spark/functions/ExtractGroup.java           |   8 +-
 .../functions/MatrixVectorBinaryOpFunction.java |   8 +-
 .../MatrixVectorBinaryOpPartitionFunction.java  |   8 +-
 .../functions/OuterVectorBinaryOpFunction.java  |   8 +-
 .../spark/utils/FrameRDDConverterUtils.java     |  19 +-
 .../instructions/spark/utils/RDDSortUtils.java  |  12 +-
 .../instructions/spark/utils/SparkUtils.java    |  43 +-
 .../sysml/runtime/matrix/data/FrameBlock.java   | 175 ++++++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  22 +-
 .../matrix/data/OperationsOnMatrixValues.java   | 155 +++++++
 .../sysml/runtime/matrix/data/OutputInfo.java   |   2 +-
 .../util/FastBufferedDataOutputStream.java      |   3 +-
 .../sysml/runtime/util/MapReduceTool.java       |  76 ++--
 .../sysml/runtime/util/UtilFunctions.java       | 141 ++++++-
 .../test/integration/AutomatedTestBase.java     | 131 ++++++
 .../functions/frame/FrameConverterTest.java     |   3 +
 .../functions/frame/FrameIndexingDistTest.java  | 203 ++++++++++
 .../functions/frame/FrameMatrixCastingTest.java |   2 +-
 .../org/apache/sysml/test/utils/TestUtils.java  | 131 +++++-
 .../functions/frame/FrameIndexingDistTest.R     |  42 ++
 .../functions/frame/FrameIndexingDistTest.dml   |  32 ++
 48 files changed, 2373 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index e6c8b88..8e53e96 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -67,6 +67,9 @@ public class OptimizerUtils
        /** Default blocksize if unspecified or for testing purposes */
        public static final int DEFAULT_BLOCKSIZE = 1000;
        
+       /** Default frame blocksize */
+       public static final int DEFAULT_FRAME_BLOCKSIZE = 1000;
+       
        /** Default optimization level if unspecified */
        public static final OptimizationLevel DEFAULT_OPTLEVEL = 
                        OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT;
@@ -388,6 +391,12 @@ public class OptimizerUtils
                DEFAULT_SIZE = getDefaultSize();
        }
        
+       
+       public static int getDefaultFrameSize()
+       {
+               return DEFAULT_FRAME_BLOCKSIZE;
+       }
+       
        /**
         * Returns memory budget (according to util factor) in bytes
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index ec89775..5e65bf1 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -2129,7 +2129,8 @@ public class Recompiler
                                        
                                        DataType dt = 
DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase());
                                        dop.setDataType(dt);
-                                       
dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
+                                       if(dt != DataType.FRAME)
+                                               
dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
                                        
dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0);
                                        
dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0);
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
index 2cc9471..f58a7c2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
@@ -19,7 +19,12 @@
 
 package org.apache.sysml.runtime.controlprogram.caching;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.io.Writable;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.util.IndexRange;
 
 
 /**
@@ -40,7 +45,7 @@ public interface CacheBlock extends Writable
         * @return
         */
        public long getExactSerializedSize();
-       
+
        /**
         * Indicates if the cache block is subject to shallow serialized,
         * which is generally true if in-memory size and serialized size
@@ -54,4 +59,19 @@ public interface CacheBlock extends Writable
         * Free unnecessarily allocated empty block.
         */
        public void compactEmptyBlock();
+       
+       public int getNumRows();
+       public int getNumColumns();
+       
+       public CacheBlock sliceOperations(int rl, int ru, int cl, int cu, 
CacheBlock block) 
+                       throws DMLRuntimeException;
+       
+       public void merge(CacheBlock that, boolean appendOnly) 
+                       throws DMLRuntimeException;
+       
+       @SuppressWarnings("rawtypes")
+       public  ArrayList getPairList();
+
+       ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int 
bclen, int iix, int jix, CacheBlock in) 
+                       throws DMLRuntimeException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 479a520..4bab6b8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -178,7 +178,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        //note: we use the abstraction of LineageObjects for two reasons: (1) 
to keep track of cleanup
        //for lazily evaluated RDDs, and (2) as abstraction for environments 
that do not necessarily have spark libraries available
        private RDDObject _rddHandle = null; //RDD handle
-       private BroadcastObject _bcHandle = null; //Broadcast handle
+       private BroadcastObject<T> _bcHandle = null; //Broadcast handle
        public GPUObject _gpuHandle = null;
        
        /**
@@ -362,7 +362,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        rdd.setBackReference(this);
        }
        
-       public BroadcastObject getBroadcastHandle() {
+       public BroadcastObject<T> getBroadcastHandle() {
                return _bcHandle;
        }
        
@@ -370,6 +370,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * 
         * @param bc
         */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
        public void setBroadcastHandle( BroadcastObject bc ) {
                //cleanup potential old back reference
                if( _bcHandle != null )
@@ -1064,7 +1065,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        {
                                mc = new MatrixCharacteristics(mc.getRows(), 
mc.getCols(), ConfigurationManager.getBlocksize(), 
ConfigurationManager.getBlocksize(), mc.getNonZeros());
                        }
-                       MapReduceTool.writeMetaDataFile (filePathAndName + 
".mtd", valueType, dataType, mc, oinfo, formatProperties);
+                       MapReduceTool.writeMetaDataFile (filePathAndName + 
".mtd", valueType, null, dataType, mc, oinfo, formatProperties);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index 33436b6..04d1543 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -107,6 +107,10 @@ public class FrameObject extends CacheableData<FrameBlock>
                }
        }
        
+       public void setSchema(List<ValueType> schema) {
+               _schema = schema;
+       }
+               
        @Override
        public void refreshMetaData() 
                throws CacheException
@@ -139,7 +143,7 @@ public class FrameObject extends CacheableData<FrameBlock>
                MatrixCharacteristics mc = getMatrixCharacteristics();
                return mc.getCols();
        }
-
+       
        @Override
        protected FrameBlock readBlobFromCache(String fname) throws IOException 
{
                return (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
@@ -173,11 +177,19 @@ public class FrameObject extends CacheableData<FrameBlock>
                return data;
        }
 
+       /**
+        * Read Frame object from RDD 
+        * 
+        * @param rdd
+        * @param status
+        * 
+        * @param fo
+        */
        @Override
        protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean 
status)
                        throws IOException 
        {
-               //note: the read of a matrix block from an RDD might trigger
+               //note: the read of a frame block from an RDD might trigger
                //lazy evaluation of pending transformations.
                RDDObject lrdd = rdd;
 
@@ -186,17 +198,20 @@ public class FrameObject extends CacheableData<FrameBlock>
                
                MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
                MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
-               FrameBlock fb = null;
                
-               try  {
+               FrameBlock fb = null;
+               try 
+               {
                        //prevent unnecessary collect through rdd checkpoint
                        if( rdd.allowsShortCircuitCollect() ) {
                                lrdd = (RDDObject)rdd.getLineageChilds().get(0);
                        }
                        
-                       //collect frame block from binary block RDD
+                       //obtain frame block from RDD
                        int rlen = (int)mc.getRows();
                        int clen = (int)mc.getCols();
+
+                       //collect frame block from binary cell RDD
                        fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, 
rlen, clen);     
                }
                catch(DMLRuntimeException ex) {
@@ -205,7 +220,7 @@ public class FrameObject extends CacheableData<FrameBlock>
                
                //sanity check correct output
                if( fb == null ) {
-                       throw new IOException("Unable to load matrix from rdd: 
"+lrdd.getVarName());
+                       throw new IOException("Unable to load frame from rdd: 
"+lrdd.getVarName());
                }
                
                return fb;
@@ -225,12 +240,13 @@ public class FrameObject extends CacheableData<FrameBlock>
                throws IOException, DMLRuntimeException 
        {
                //prepare output info
-        MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
-           OutputInfo oinfo = (ofmt != null ? 
OutputInfo.stringToOutputInfo(ofmt) 
-                : InputInfo.getMatchingOutputInfo (iimd.getInputInfo()));
+               MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
+               OutputInfo oinfo = (ofmt != null ? 
OutputInfo.stringToOutputInfo (ofmt ) 
+                               : InputInfo.getMatchingOutputInfo 
(iimd.getInputInfo ()));
            
                //note: the write of an RDD to HDFS might trigger
                //lazy evaluation of pending transformations.                   
        
                SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, oinfo);   
        }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index d781730..2144ef8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -229,7 +229,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                        if(_data != null &&  
                                        // Not a column vector
                                        _data.getNumRows() != 1 && 
_data.getNumColumns() != 1) {
-                               double[] arr = 
((MatrixBlock)_data).getDenseBlock();
+                               double[] arr = _data.getDenseBlock();
                                LibMatrixDNN.cacheReuseableData(arr);
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 1516286..a2e2f79 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.controlprogram.context;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -57,8 +58,8 @@ import 
org.apache.sysml.runtime.instructions.spark.SPInstruction;
 import org.apache.sysml.runtime.instructions.spark.data.BlockPartitioner;
 import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysml.runtime.instructions.spark.data.LineageObject;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
@@ -69,8 +70,8 @@ import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -250,6 +251,22 @@ public class SparkExecutionContext extends ExecutionContext
        }
        
        /**
+        * Spark instructions should call this for all frame inputs except 
broadcast
+        * variables.
+        * 
+        * @param varname
+        * @return
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unchecked")
+       public JavaPairRDD<Long,FrameBlock> 
getFrameBinaryBlockRDDHandleForVariable( String varname ) 
+               throws DMLRuntimeException 
+       {
+               JavaPairRDD<Long,FrameBlock> out = 
(JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo);
+               return out;
+       }
+       
+       /**
         * 
         * @param varname
         * @param inputInfo
@@ -456,14 +473,14 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @throws DMLRuntimeException
         */
        @SuppressWarnings("unchecked")
-       public PartitionedBroadcastMatrix getBroadcastForVariable( String 
varname ) 
+       public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( 
String varname ) 
                throws DMLRuntimeException
        {               
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                MatrixObject mo = getMatrixObject(varname);
                
-               PartitionedBroadcastMatrix bret = null;
+               PartitionedBroadcast<MatrixBlock> bret = null;
                
                //reuse existing broadcast handle
                if( mo.getBroadcastHandle()!=null 
@@ -481,20 +498,20 @@ public class SparkExecutionContext extends 
ExecutionContext
                        
                        //create partitioned matrix block and release memory 
consumed by input
                        MatrixBlock mb = mo.acquireRead();
-                       PartitionedMatrixBlock pmb = new 
PartitionedMatrixBlock(mb, brlen, bclen);
+                       PartitionedBlock<MatrixBlock> pmb = new 
PartitionedBlock<MatrixBlock>(mb, brlen, bclen);
                        mo.release();
                        
                        //determine coarse-grained partitioning
-                       int numPerPart = 
PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), 
mo.getNumColumns(), brlen, bclen);
+                       int numPerPart = 
PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), 
mo.getNumColumns(), brlen, bclen);
                        int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
-                       Broadcast<PartitionedMatrixBlock>[] ret = new 
Broadcast[numParts];
+                       Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new 
Broadcast[numParts];
                                        
                        //create coarse-grained partitioned broadcasts
                        if( numParts > 1 ) {
                                for( int i=0; i<numParts; i++ ) {
                                        int offset = i * numPerPart;
                                        int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
-                                       PartitionedMatrixBlock tmp = 
pmb.createPartition(offset, numBlks);
+                                       PartitionedBlock<MatrixBlock> tmp = 
pmb.createPartition(offset, numBlks, new MatrixBlock());
                                        ret[i] = 
getSparkContext().broadcast(tmp);
                                }
                        }
@@ -502,8 +519,8 @@ public class SparkExecutionContext extends ExecutionContext
                                ret[0] = getSparkContext().broadcast( pmb);
                        }
                
-                       bret = new PartitionedBroadcastMatrix(ret);
-                       BroadcastObject bchandle = new BroadcastObject(bret, 
varname);
+                       bret = new PartitionedBroadcast<MatrixBlock>(ret);
+                       BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<MatrixBlock>(bret, varname);
                        mo.setBroadcastHandle(bchandle);
                }
                
@@ -515,6 +532,76 @@ public class SparkExecutionContext extends ExecutionContext
                return bret;
        }
        
+
+       /**
+        *
+        * @param varname
+        * @return
+        * @throws DMLRuntimeException
+        */
+       
+       @SuppressWarnings("unchecked")
+       public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( 
String varname) 
+               throws DMLRuntimeException
+       {               
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
+               FrameObject fo = getFrameObject(varname);
+               
+               PartitionedBroadcast<FrameBlock> bret = null;
+               
+               //reuse existing broadcast handle
+               if( fo.getBroadcastHandle()!=null 
+                       && fo.getBroadcastHandle().isValid() ) 
+               {
+                       bret = fo.getBroadcastHandle().getBroadcast();
+               }
+               
+               //create new broadcast handle (never created, evicted)
+               if( bret == null ) 
+               {
+                       //obtain meta data for frame 
+                       int bclen = (int) fo.getNumColumns();
+                       int brlen = OptimizerUtils.getDefaultFrameSize();
+                       
+                       //create partitioned frame block and release memory 
consumed by input
+                       FrameBlock mb = fo.acquireRead();
+                       PartitionedBlock<FrameBlock> pmb = new 
PartitionedBlock<FrameBlock>(mb, brlen, bclen);
+                       fo.release();
+                       
+                       //determine coarse-grained partitioning
+                       int numPerPart = 
PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), 
fo.getNumColumns(), brlen, bclen);
+                       int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+                       Broadcast<PartitionedBlock<FrameBlock>>[] ret = new 
Broadcast[numParts];
+                                       
+                       //create coarse-grained partitioned broadcasts
+                       if( numParts > 1 ) {
+                               for( int i=0; i<numParts; i++ ) {
+                                       int offset = i * numPerPart;
+                                       int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
+                                       PartitionedBlock<FrameBlock> tmp = 
pmb.createPartition(offset, numBlks, new FrameBlock());
+                                       ret[i] = 
getSparkContext().broadcast(tmp);
+                               }
+                       }
+                       else { //single partition
+                               ret[0] = getSparkContext().broadcast( pmb);
+                       }
+               
+                       bret = new PartitionedBroadcast<FrameBlock>(ret);
+                       BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<FrameBlock>(bret, varname);
+                       fo.setBroadcastHandle(bchandle);
+               }
+               
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkBroadCastTime(System.nanoTime() - 
t0);
+                       Statistics.incSparkBroadcastCount(1);
+               }
+               
+               return bret;
+       }
+       
+
+       
        /**
         * 
         * @param varname
@@ -804,7 +891,7 @@ public class SparkExecutionContext extends ExecutionContext
                
                return out;
        }
-       
+               
        /**
         * 
         * @param rdd
@@ -816,13 +903,13 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @return
         * @throws DMLRuntimeException
         */
-       public static PartitionedMatrixBlock 
toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, 
int clen, int brlen, int bclen, long nnz) 
+       public static PartitionedBlock<MatrixBlock> 
toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, 
int clen, int brlen, int bclen, long nnz) 
                throws DMLRuntimeException
        {
                
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
-               PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, 
clen, brlen, bclen);
+               PartitionedBlock<MatrixBlock> out = new 
PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen, new MatrixBlock());
                List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
                
                //copy blocks one-at-a-time into output matrix block
@@ -831,7 +918,7 @@ public class SparkExecutionContext extends ExecutionContext
                        //unpack index-block pair
                        MatrixIndexes ix = keyval._1();
                        MatrixBlock block = keyval._2();
-                       out.setMatrixBlock((int)ix.getRowIndex(), 
(int)ix.getColumnIndex(), block);
+                       out.setBlock((int)ix.getRowIndex(), 
(int)ix.getColumnIndex(), block);
                }
                
                if (DMLScript.STATISTICS) {
@@ -873,6 +960,9 @@ public class SparkExecutionContext extends ExecutionContext
        {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
+               if(schema == null)
+                       schema = Collections.nCopies(clen, ValueType.STRING);
+
                //create output frame block (w/ lazy allocation)
                FrameBlock out = new FrameBlock(schema);
                out.ensureAllocatedColumns(rlen);
@@ -887,7 +977,7 @@ public class SparkExecutionContext extends ExecutionContext
                        FrameBlock block = keyval._2();
                
                        //copy into output frame
-                       out.copy( ix, ix+block.getNumRows(), 
+                       out.copy( ix, ix+block.getNumRows()-1, 
                                          0, block.getNumColumns()-1, block );  
                }
                
@@ -977,7 +1067,7 @@ public class SparkExecutionContext extends ExecutionContext
                throws DMLRuntimeException 
        {
                RDDObject parent = getCacheableData(varParent).getRDDHandle();
-               BroadcastObject child = 
getCacheableData(varChild).getBroadcastHandle();
+               BroadcastObject<?> child = 
getCacheableData(varChild).getBroadcastHandle();
                
                parent.addLineageChild( child );
        }
@@ -1049,6 +1139,7 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @param lob
         * @throws IOException
         */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
        private void rCleanupLineageObject(LineageObject lob) 
                throws IOException
        {               
@@ -1071,9 +1162,9 @@ public class SparkExecutionContext extends 
ExecutionContext
                        }
                }
                else if( lob instanceof BroadcastObject ) {
-                       PartitionedBroadcastMatrix pbm = 
((BroadcastObject)lob).getBroadcast();
+                       PartitionedBroadcast pbm = 
((BroadcastObject)lob).getBroadcast();
                        if( pbm != null ) //robustness for evictions
-                               for( Broadcast<PartitionedMatrixBlock> bc : 
pbm.getBroadcasts() )
+                               for( Broadcast<PartitionedBlock> bc : 
pbm.getBroadcasts() )
                                        cleanupBroadcastVariable(bc);
                }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 52b712b..663e6b4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -52,9 +52,9 @@ import 
org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixIndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction;
@@ -317,7 +317,7 @@ public class SPInstructionParser extends InstructionParser
                                return 
AggregateTernarySPInstruction.parseInstruction(str);
                                
                        case MatrixIndexing:
-                               return 
MatrixIndexingSPInstruction.parseInstruction(str);
+                               return 
IndexingSPInstruction.parseInstruction(str);
                                
                        case Reorg:
                                return ReorgSPInstruction.parseInstruction(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index 0c1d22c..26be3eb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -34,7 +34,7 @@ import 
org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -91,7 +91,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
                int bclen = mc1.getColsPerBlock();
                
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-               PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( 
input2.getName() );
+               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( input2.getName() );
                long off = sec.getScalarInput( _offset.getName(), 
_offset.getValueType(), _offset.isLiteral()).getLongValue();
                
                //execute map-append operations (partitioning preserving if 
#in-blocks = #out-blocks)
@@ -138,14 +138,14 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
        {
                private static final long serialVersionUID = 
2738541014432173450L;
                
-               private PartitionedBroadcastMatrix _pm = null;
+               private PartitionedBroadcast<MatrixBlock> _pm = null;
                private boolean _cbind = true;
                private long _offset; 
                private int _brlen; 
                private int _bclen;
                private long _lastBlockColIndex;
                
-               public MapSideAppendFunction(PartitionedBroadcastMatrix binput, 
boolean cbind, long offset, int brlen, int bclen)  
+               public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> 
binput, boolean cbind, long offset, int brlen, int bclen)  
                {
                        _pm = binput;
                        _cbind = cbind;
@@ -184,12 +184,12 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
                                if( _cbind ) {
                                        ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
                                                        new 
MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
-                                                       
_pm.getMatrixBlock((int)ix.getRowIndex(), 1)) );
+                                                       
_pm.getBlock((int)ix.getRowIndex(), 1)) );
                                }
                                else { //rbind
                                        ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
                                                        new 
MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
-                                                       _pm.getMatrixBlock(1, 
(int)ix.getColumnIndex())) );     
+                                                       _pm.getBlock(1, 
(int)ix.getColumnIndex())) );   
                                }
                        }
                        //case 3: append operation on boundary block
@@ -202,7 +202,7 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
                                
                                MatrixBlock value_in2 = null;
                                if( _cbind ) {
-                                       value_in2 = 
_pm.getMatrixBlock((int)ix.getRowIndex(), 1);
+                                       value_in2 = 
_pm.getBlock((int)ix.getRowIndex(), 1);
                                        
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
                                                IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
                                                
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
@@ -210,7 +210,7 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
                                        }
                                }
                                else { //rbind
-                                       value_in2 = _pm.getMatrixBlock(1, 
(int)ix.getColumnIndex());
+                                       value_in2 = _pm.getBlock(1, 
(int)ix.getColumnIndex());
                                        
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
                                                IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
                                                
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
@@ -233,11 +233,11 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
        {
                private static final long serialVersionUID = 
5767240739761027220L;
 
-               private PartitionedBroadcastMatrix _pm = null;
+               private PartitionedBroadcast<MatrixBlock> _pm = null;
                private boolean _cbind = true;
                private long _lastBlockColIndex = -1;
                
-               public 
MapSideAppendPartitionFunction(PartitionedBroadcastMatrix binput, boolean 
cbind, long offset, int brlen, int bclen)  
+               public 
MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, 
boolean cbind, long offset, int brlen, int bclen)  
                {
                        _pm = binput;
                        _cbind = cbind;
@@ -280,7 +280,7 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
                                else {
                                        int rowix = _cbind ? 
(int)ix.getRowIndex() : 1;
                                        int colix = _cbind ? 1 : 
(int)ix.getColumnIndex();                                      
-                                       MatrixBlock in2 = 
_pm.getMatrixBlock(rowix, colix);
+                                       MatrixBlock in2 = _pm.getBlock(rowix, 
colix);
                                        MatrixBlock out = 
in1.appendOperations(in2, new MatrixBlock(), _cbind);
                                        return new 
Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
                                }       

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
index 4d2e78c..750a624 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
@@ -29,7 +29,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.MatrixMatrixBinaryOpFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.MatrixScalarUnaryFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction;
@@ -154,7 +154,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction
                String rddVar = input1.getName(); 
                String bcastVar = input2.getName();
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( rddVar );
-               PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( 
bcastVar );
+               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( bcastVar );
                MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(rddVar);
                MatrixCharacteristics mc2 = 
sec.getMatrixCharacteristics(bcastVar);
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
new file mode 100644
index 0000000..c1d8766
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.IndexRange;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class FrameIndexingSPInstruction  extends IndexingSPInstruction
+{
+       
+       /*
+        * This class implements the frame indexing functionality inside Spark. 
 
+        * Example instructions: 
+        *     rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
+        *         input=mVar1, output=mVar6, 
+        *         bounds = (Var2,Var3,Var4,Var5)
+        *         rowindex_lower: Var2, rowindex_upper: Var3 
+        *         colindex_lower: Var4, colindex_upper: Var5
+        *     leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7
+        *         triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2"
+        *         the result is stored in mVar7
+        *  
+        */
+       public FrameIndexingSPInstruction(Operator op, CPOperand in, CPOperand 
rl, CPOperand ru, CPOperand cl, CPOperand cu, 
+                                                 CPOperand out, SparkAggType 
aggtype, String opcode, String istr)
+       {
+               super(op, in, rl, ru, cl, cu, out, aggtype, opcode, istr);
+       }
+       
+       public FrameIndexingSPInstruction(Operator op, CPOperand lhsInput, 
CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, 
+                                                 CPOperand out, String opcode, 
String istr)
+       {
+               super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, 
istr);
+       }
+       
+       
+       @Override
+       public void processInstruction(ExecutionContext ec)
+                       throws DMLRuntimeException 
+       {       
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               String opcode = getOpcode();
+               
+               //get indexing range
+               long rl = ec.getScalarInput(rowLower.getName(), 
rowLower.getValueType(), rowLower.isLiteral()).getLongValue();
+               long ru = ec.getScalarInput(rowUpper.getName(), 
rowUpper.getValueType(), rowUpper.isLiteral()).getLongValue();
+               long cl = ec.getScalarInput(colLower.getName(), 
colLower.getValueType(), colLower.isLiteral()).getLongValue();
+               long cu = ec.getScalarInput(colUpper.getName(), 
colUpper.getValueType(), colUpper.isLiteral()).getLongValue();
+               IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
+               
+               //left indexing
+               if ( opcode.equalsIgnoreCase("leftIndex") || 
opcode.equalsIgnoreCase("mapLeftIndex"))
+               {
+                       JavaPairRDD<Long,FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+                       PartitionedBroadcast<FrameBlock> broadcastIn2 = null;
+                       JavaPairRDD<Long,FrameBlock> in2 = null;
+                       JavaPairRDD<Long,FrameBlock> out = null;
+                       
+                       //update and check output dimensions
+                       MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+                       MatrixCharacteristics mcLeft = 
ec.getMatrixCharacteristics(input1.getName());
+                       mcOut.set(mcLeft.getRows(), mcLeft.getCols(), 
mcLeft.getRowsPerBlock(), mcLeft.getColsPerBlock());
+                       checkValidOutputDimensions(mcOut);
+                       
+                       //note: always frame rhs, scalars are preprocessed via 
cast to 1x1 frame
+                       MatrixCharacteristics mcRight = 
ec.getMatrixCharacteristics(input2.getName());
+                               
+                       //sanity check matching index range and rhs dimensions
+                       if(!mcRight.dimsKnown()) {
+                               throw new DMLRuntimeException("The right input 
frame dimensions are not specified for FrameIndexingSPInstruction");
+                       }
+                       if(!(ru-rl+1 == mcRight.getRows() && cu-cl+1 == 
mcRight.getCols())) {
+                               throw new DMLRuntimeException("Invalid index 
range of leftindexing: ["+rl+":"+ru+","+cl+":"+cu+"] vs 
["+mcRight.getRows()+"x"+mcRight.getCols()+"]." );
+                       }
+                       
+                       if(opcode.equalsIgnoreCase("mapLeftIndex")) 
+                       {
+                               broadcastIn2 = 
sec.getBroadcastForFrameVariable( input2.getName());
+                               
+                               //partitioning-preserving mappartitions (key 
access required for broadcast loopkup)
+                               out = in1.mapPartitionsToPair(
+                                               new 
LeftIndexPartitionFunction(broadcastIn2, ixrange, mcOut), true);
+                       }
+                       else { //general case
+                               
+                               // zero-out lhs
+                               in1 = in1.flatMapToPair(new ZeroOutLHS(false, 
ixrange, mcLeft));
+                               
+                               // slice rhs, shift and merge with lhs
+                               in2 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() )
+                                           .flatMapToPair(new 
SliceRHSForLeftIndexing(ixrange, mcLeft));
+
+                               out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in1.union(in2));
+                       }
+                       
+                       sec.setRDDHandleForVariable(output.getName(), out);
+                       sec.addLineageRDD(output.getName(), input1.getName());
+                       if( broadcastIn2 != null)
+                               sec.addLineageBroadcast(output.getName(), 
input2.getName());
+                       if(in2 != null) 
+                               sec.addLineageRDD(output.getName(), 
input2.getName());
+               }
+               else
+                       throw new DMLRuntimeException("Invalid opcode (" + 
opcode +") encountered in FrameIndexingSPInstruction.");             
+       }
+               
+       /**
+        * 
+        * @param mcOut
+        * @throws DMLRuntimeException
+        */
+       private static void checkValidOutputDimensions(MatrixCharacteristics 
mcOut) 
+               throws DMLRuntimeException
+       {
+               if(!mcOut.dimsKnown()) {
+                       throw new 
DMLRuntimeException("FrameIndexingSPInstruction: The updated output dimensions 
are invalid: " + mcOut);
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class SliceRHSForLeftIndexing implements 
PairFlatMapFunction<Tuple2<Long,FrameBlock>, Long, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
5724800998701216440L;
+
+               private IndexRange _ixrange = null; 
+               private int _brlen = -1; 
+               private int _bclen = -1;
+               private long _rlen = -1;
+               private long _clen = -1;
+               
+               public SliceRHSForLeftIndexing(IndexRange ixrange, 
MatrixCharacteristics mcLeft) {
+                       _ixrange = ixrange;
+                       _rlen = mcLeft.getRows();
+                       _clen = mcLeft.getCols();
+                       _brlen = (int) 
Math.min(OptimizerUtils.getDefaultFrameSize(), _rlen);
+                       _bclen = (int) mcLeft.getCols();
+               }
+
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> rightKV) 
+                       throws Exception 
+               {
+                       Pair<Long,FrameBlock> in = 
SparkUtils.toIndexedFrameBlock(rightKV);                     
+                       ArrayList<Pair<Long,FrameBlock>> out = new 
ArrayList<Pair<Long,FrameBlock>>();
+                       OperationsOnMatrixValues.performShift(in, _ixrange, 
_brlen, _bclen, _rlen, _clen, out);
+                       return SparkUtils.fromIndexedFrameBlock(out);
+               }               
+       }
+       
+       /**
+        * 
+        */
+       private static class ZeroOutLHS implements 
PairFlatMapFunction<Tuple2<Long,FrameBlock>, Long,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-2672267231152496854L;
+
+               private boolean _complement = false;
+               private IndexRange _ixrange = null;
+               private int _brlen = -1;
+               private int _bclen = -1;
+               private long _rlen = -1;
+               
+               public ZeroOutLHS(boolean complement, IndexRange range, 
MatrixCharacteristics mcLeft) {
+                       _complement = complement;
+                       _ixrange = range;
+                       _brlen = (int) OptimizerUtils.getDefaultFrameSize();
+                       _bclen = (int) mcLeft.getCols();
+                       _rlen = mcLeft.getRows();
+               }
+               
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> kv) 
+                       throws Exception 
+               {
+                       ArrayList<Pair<Long,FrameBlock>> out = new 
ArrayList<Pair<Long,FrameBlock>>();
+
+                       IndexRange curBlockRange = new 
IndexRange(_ixrange.rowStart, _ixrange.rowEnd, _ixrange.colStart, 
_ixrange.colEnd);
+                       
+                       // Global index of row (1-based)
+                       long lGblStartRow = 
((kv._1.longValue()-1)/_brlen)*_brlen+1;
+                       FrameBlock zeroBlk = null;
+                       int iMaxRowsToCopy = 0;
+                       
+                       // Starting local location (0-based) of target block 
where to start copy. 
+                       int iRowStartDest = 
UtilFunctions.computeCellInBlock(kv._1, _brlen);
+                       for(int iRowStartSrc = 0; 
iRowStartSrc<kv._2.getNumRows(); iRowStartSrc += iMaxRowsToCopy, lGblStartRow 
+= _brlen) {
+                               IndexRange range = 
UtilFunctions.getSelectedRangeForZeroOut(new Pair<Long, FrameBlock>(kv._1, 
kv._2), _brlen, _bclen, curBlockRange, lGblStartRow-1, lGblStartRow);
+                               if(range.rowStart == -1 && range.rowEnd == -1 
&& range.colStart == -1 && range.colEnd == -1) {
+                                       throw new Exception("Error while 
getting range for zero-out");
+                               }
+                               //Maximum range of rows in target block 
+                               int iMaxRows=(int) Math.min(_brlen, 
_rlen-lGblStartRow+1);
+                               
+                               // Maximum number of rows to be copied from 
source block to target.
+                               iMaxRowsToCopy = Math.min(iMaxRows, 
kv._2.getNumRows()-iRowStartSrc);
+                               iMaxRowsToCopy = Math.min(iMaxRowsToCopy, 
iMaxRows-iRowStartDest);
+                               
+                               // Zero out the applicable range in this block
+                               zeroBlk = (FrameBlock) 
kv._2.zeroOutOperations(new FrameBlock(), range, _complement, iRowStartSrc, 
iRowStartDest, iMaxRows, iMaxRowsToCopy);
+                               out.add(new Pair<Long, 
FrameBlock>(lGblStartRow, zeroBlk));
+                               curBlockRange.rowStart =  lGblStartRow + _brlen;
+                               iRowStartDest = 
UtilFunctions.computeCellInBlock(iRowStartDest+iMaxRowsToCopy+1, _brlen);
+                       }
+                       return SparkUtils.fromIndexedFrameBlock(out);
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class LeftIndexPartitionFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-911940376947364915L;
+
+               private PartitionedBroadcast<FrameBlock> _binput;
+               private IndexRange _ixrange = null;
+               
+               public 
LeftIndexPartitionFunction(PartitionedBroadcast<FrameBlock> binput, IndexRange 
ixrange, MatrixCharacteristics mc) 
+               {
+                       _binput = binput;
+                       _ixrange = ixrange;
+               }
+
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Long, FrameBlock>> arg0)
+                       throws Exception 
+               {
+                       return new LeftIndexPartitionIterator(arg0);
+               }
+               
+               /**
+                * 
+                */
+               private class LeftIndexPartitionIterator extends 
LazyIterableIterator<Tuple2<Long, FrameBlock>>
+               {
+                       public LeftIndexPartitionIterator(Iterator<Tuple2<Long, 
FrameBlock>> in) {
+                               super(in);
+                       }
+                       
+                       @Override
+                       protected Tuple2<Long, FrameBlock> 
computeNext(Tuple2<Long, FrameBlock> arg) 
+                               throws Exception 
+                       {
+                               int iNumRowsInBlock = arg._2.getNumRows();
+                               int iNumCols = arg._2.getNumColumns();
+                               if(!UtilFunctions.isInFrameBlockRange(arg._1(), 
iNumRowsInBlock, iNumCols, _ixrange)) {
+                                       return arg;
+                               }
+                               
+                               // Calculate global index of left hand side 
block
+                               long lhs_rl = Math.max(_ixrange.rowStart, 
arg._1); //Math.max(_ixrange.rowStart, (arg._1-1)*iNumRowsInBlock + 1);
+                               long lhs_ru = Math.min(_ixrange.rowEnd, 
arg._1+iNumRowsInBlock-1);
+                               long lhs_cl = Math.max(_ixrange.colStart, 1);
+                               long lhs_cu = Math.min(_ixrange.colEnd, 
iNumCols);
+                               
+                               // Calculate global index of right hand side 
block
+                               long rhs_rl = lhs_rl - _ixrange.rowStart + 1;
+                               long rhs_ru = rhs_rl + (lhs_ru - lhs_rl);
+                               long rhs_cl = lhs_cl - _ixrange.colStart + 1;
+                               long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
+                               
+                               // Provide local zero-based index to 
leftIndexingOperations
+                               int lhs_lrl = (int)(lhs_rl- arg._1);
+                               int lhs_lru = (int)(lhs_ru- arg._1);
+                               int lhs_lcl = (int)lhs_cl-1;
+                               int lhs_lcu = (int)lhs_cu-1;
+
+                               FrameBlock ret = arg._2;
+                               int brlen = OptimizerUtils.DEFAULT_BLOCKSIZE;
+                               long rhs_rl_pb = rhs_rl;
+                               long rhs_ru_pb = Math.min(rhs_ru, 
(((rhs_rl-1)/brlen)+1)*brlen); 
+                               while(rhs_rl_pb <= rhs_ru_pb) {
+                                       // Provide global zero-based index to 
sliceOperations, but only for one RHS partition block at a time.
+                                       FrameBlock slicedRHSMatBlock = 
_binput.sliceOperations(rhs_rl_pb, rhs_ru_pb, rhs_cl, rhs_cu, new FrameBlock());
+                                       
+                                       // Provide local zero-based index to 
leftIndexingOperations
+                                       int lhs_lrl_pb = (int) (lhs_lrl + 
(rhs_rl_pb - rhs_rl));
+                                       int lhs_lru_pb = (int) (lhs_lru + 
(rhs_ru_pb - rhs_ru));
+                                       ret = 
ret.leftIndexingOperations(slicedRHSMatBlock, lhs_lrl_pb, lhs_lru_pb, lhs_lcl, 
lhs_lcu, new FrameBlock());
+                                       rhs_rl_pb = rhs_ru_pb + 1;
+                                       rhs_ru_pb = Math.min(rhs_ru, 
rhs_ru_pb+brlen);
+                               }
+                               
+                               return new Tuple2<Long, FrameBlock>(arg._1, 
ret);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
new file mode 100644
index 0000000..04433dc
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
+
+public abstract class IndexingSPInstruction  extends UnarySPInstruction
+{
+       
+       /*
+        * This class implements the matrix indexing functionality inside 
Spark.  
+        * Example instructions: 
+        *     rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
+        *         input=mVar1, output=mVar6, 
+        *         bounds = (Var2,Var3,Var4,Var5)
+        *         rowindex_lower: Var2, rowindex_upper: Var3 
+        *         colindex_lower: Var4, colindex_upper: Var5
+        *     leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7
+        *         triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2"
+        *         the result is stored in mVar7
+        *  
+        */
+       protected CPOperand rowLower, rowUpper, colLower, colUpper;
+       protected SparkAggType _aggType = null;
+       
+       public IndexingSPInstruction(Operator op, CPOperand in, CPOperand rl, 
CPOperand ru, CPOperand cl, CPOperand cu, 
+                                                 CPOperand out, SparkAggType 
aggtype, String opcode, String istr)
+       {
+               super(op, in, out, opcode, istr);
+               rowLower = rl;
+               rowUpper = ru;
+               colLower = cl;
+               colUpper = cu;
+
+               _aggType = aggtype;
+       }
+       
+       public IndexingSPInstruction(Operator op, CPOperand lhsInput, CPOperand 
rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, 
+                                                 CPOperand out, String opcode, 
String istr)
+       {
+               super(op, lhsInput, rhsInput, out, opcode, istr);
+               rowLower = rl;
+               rowUpper = ru;
+               colLower = cl;
+               colUpper = cu;
+       }
+       
+       public static IndexingSPInstruction parseInstruction ( String str ) 
+               throws DMLRuntimeException 
+       {       
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               String opcode = parts[0];
+               
+               if ( opcode.equalsIgnoreCase("rangeReIndex") ) {
+                       if ( parts.length == 8 ) {
+                               // Example: 
rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
+                               CPOperand in = new CPOperand(parts[1]);
+                               CPOperand rl = new CPOperand(parts[2]);
+                               CPOperand ru = new CPOperand(parts[3]);
+                               CPOperand cl = new CPOperand(parts[4]);
+                               CPOperand cu = new CPOperand(parts[5]);
+                               CPOperand out = new CPOperand(parts[6]);
+                               SparkAggType aggtype = 
SparkAggType.valueOf(parts[7]);
+                               if( in.getDataType()==DataType.MATRIX )
+                                       return new 
MatrixIndexingSPInstruction(new SimpleOperator(null), in, rl, ru, cl, cu, out, 
aggtype, opcode, str);
+                               else
+                                       return new 
FrameIndexingSPInstruction(new SimpleOperator(null), in, rl, ru, cl, cu, out, 
aggtype, opcode, str);
+                       }
+                       else {
+                               throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
+                       }
+               } 
+               else if ( opcode.equalsIgnoreCase("leftIndex") || 
opcode.equalsIgnoreCase("mapLeftIndex")) {
+                       if ( parts.length == 8 ) {
+                               // Example: 
leftIndex:mVar1:mvar2:Var3:Var4:Var5:Var6:mVar7
+                               CPOperand lhsInput = new CPOperand(parts[1]);
+                               CPOperand rhsInput = new CPOperand(parts[2]);
+                               CPOperand rl = new CPOperand(parts[3]);
+                               CPOperand ru = new CPOperand(parts[4]);
+                               CPOperand cl = new CPOperand(parts[5]);
+                               CPOperand cu = new CPOperand(parts[6]);
+                               CPOperand out = new CPOperand(parts[7]);
+                               if( lhsInput.getDataType()==DataType.MATRIX )
+                                       return new 
MatrixIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, 
ru, cl, cu, out, opcode, str);
+                               else
+                                       return new 
FrameIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, 
ru, cl, cu, out, opcode, str);
+                       }
+                       else {
+                               throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
+                       }
+               }
+               else {
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a IndexingSPInstruction: " + str);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
index 159eddf..99d1978 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmChainSPInstruction.java
@@ -33,7 +33,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -127,7 +127,7 @@ public class MapmmChainSPInstruction extends SPInstruction
                
                //get rdd and broadcast inputs
                JavaPairRDD<MatrixIndexes,MatrixBlock> inX = 
sec.getBinaryBlockRDDHandleForVariable( _input1.getName() );
-               PartitionedBroadcastMatrix inV = sec.getBroadcastForVariable( 
_input2.getName() );
+               PartitionedBroadcast<MatrixBlock> inV = 
sec.getBroadcastForVariable( _input2.getName() );
                
                //execute mapmmchain (guaranteed to have single output block)
                MatrixBlock out = null;
@@ -137,7 +137,7 @@ public class MapmmChainSPInstruction extends SPInstruction
                        out = RDDAggregateUtils.sumStable(tmp);         
                }
                else { // ChainType.XtwXv / ChainType.XtXvy
-                       PartitionedBroadcastMatrix inW = 
sec.getBroadcastForVariable( _input3.getName() );
+                       PartitionedBroadcast<MatrixBlock> inW = 
sec.getBroadcastForVariable( _input3.getName() );
                        RDDMapMMChainFunction2 fmmc = new 
RDDMapMMChainFunction2(inV, inW, _chainType);
                        JavaPairRDD<MatrixIndexes,MatrixBlock> tmp = 
inX.mapToPair(fmmc);
                        out = RDDAggregateUtils.sumStable(tmp);         
@@ -157,9 +157,9 @@ public class MapmmChainSPInstruction extends SPInstruction
        {
                private static final long serialVersionUID = 
8197406787010296291L;
 
-               private PartitionedBroadcastMatrix _pmV = null;
+               private PartitionedBroadcast<MatrixBlock> _pmV = null;
                
-               public RDDMapMMChainFunction( PartitionedBroadcastMatrix bV) 
+               public RDDMapMMChainFunction( PartitionedBroadcast<MatrixBlock> 
bV) 
                        throws DMLRuntimeException
                {                       
                        //get first broadcast vector (always single block)
@@ -170,7 +170,7 @@ public class MapmmChainSPInstruction extends SPInstruction
                public MatrixBlock call( MatrixBlock arg0 ) 
                        throws Exception 
                {
-                       MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
+                       MatrixBlock pmV = _pmV.getBlock(1, 1);
                        
                        //execute mapmmchain operation
                        MatrixBlock out = new MatrixBlock();
@@ -186,11 +186,11 @@ public class MapmmChainSPInstruction extends SPInstruction
        {
                private static final long serialVersionUID = 
-7926980450209760212L;
 
-               private PartitionedBroadcastMatrix _pmV = null;
-               private PartitionedBroadcastMatrix _pmW = null;
+               private PartitionedBroadcast<MatrixBlock> _pmV = null;
+               private PartitionedBroadcast<MatrixBlock> _pmW = null;
                private ChainType _chainType = null;
                
-               public RDDMapMMChainFunction2( PartitionedBroadcastMatrix bV, 
PartitionedBroadcastMatrix bW, ChainType chain) 
+               public RDDMapMMChainFunction2( 
PartitionedBroadcast<MatrixBlock> bV, PartitionedBroadcast<MatrixBlock> bW, 
ChainType chain) 
                        throws DMLRuntimeException
                {                       
                        //get both broadcast vectors (first always single block)
@@ -203,7 +203,7 @@ public class MapmmChainSPInstruction extends SPInstruction
                public Tuple2<MatrixIndexes, MatrixBlock> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
-                       MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
+                       MatrixBlock pmV = _pmV.getBlock(1, 1);
                        
                        MatrixIndexes ixIn = arg0._1();
                        MatrixBlock blkIn = arg0._2();
@@ -213,7 +213,7 @@ public class MapmmChainSPInstruction extends SPInstruction
                        MatrixBlock blkOut = new MatrixBlock();
                        
                        //execute mapmmchain operation
-                       blkIn.chainMatrixMultOperations(pmV, 
_pmW.getMatrixBlock(rowIx,1), blkOut, _chainType);
+                       blkIn.chainMatrixMultOperations(pmV, 
_pmW.getBlock(rowIx,1), blkOut, _chainType);
                                
                        //output new tuple
                        return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, 
blkOut);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 400ccf0..5b3e9ad 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -40,7 +40,7 @@ import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -117,7 +117,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                
                //get inputs
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( rddVar );
-               PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( 
bcastVar ); 
+               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( bcastVar ); 
                
                //empty input block filter
                if( !_outputEmpty )
@@ -196,9 +196,9 @@ public class MapmmSPInstruction extends BinarySPInstruction
 
                private CacheType _type = null;
                private AggregateBinaryOperator _op = null;
-               private PartitionedBroadcastMatrix _pbc = null;
+               private PartitionedBroadcast<MatrixBlock> _pbc = null;
                
-               public RDDMapMMFunction( CacheType type, 
PartitionedBroadcastMatrix binput )
+               public RDDMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput )
                {
                        _type = type;
                        _pbc = binput;
@@ -221,7 +221,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                        if( _type == CacheType.LEFT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock left = _pbc.getMatrixBlock(1, 
(int)ixIn.getRowIndex());
+                               MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
                                
                                //execute matrix-vector mult
                                
OperationsOnMatrixValues.performAggregateBinary( 
@@ -230,7 +230,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                        else //if( _type == CacheType.RIGHT )
                        {
                                //get the right hand side matrix
-                               MatrixBlock right = 
_pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+                               MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
                                
                                //execute matrix-vector mult
                                OperationsOnMatrixValues.performAggregateBinary(
@@ -252,9 +252,9 @@ public class MapmmSPInstruction extends BinarySPInstruction
        
                private CacheType _type = null;
                private AggregateBinaryOperator _op = null;
-               private PartitionedBroadcastMatrix _pbc = null;
+               private PartitionedBroadcast<MatrixBlock> _pbc = null;
                
-               public RDDMapMMPartitionFunction( CacheType type, 
PartitionedBroadcastMatrix binput )
+               public RDDMapMMPartitionFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput )
                {
                        _type = type;
                        _pbc = binput;
@@ -293,7 +293,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                if( _type == CacheType.LEFT )
                                {
                                        //get the right hand side matrix
-                                       MatrixBlock left = 
_pbc.getMatrixBlock(1, (int)ixIn.getRowIndex());
+                                       MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
                                        
                                        //execute index preserving matrix 
multiplication
                                        left.aggregateBinaryOperations(left, 
blkIn, blkOut, _op);                                               
@@ -301,7 +301,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                else //if( _type == CacheType.RIGHT )
                                {
                                        //get the right hand side matrix
-                                       MatrixBlock right = 
_pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+                                       MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
 
                                        //execute index preserving matrix 
multiplication
                                        blkIn.aggregateBinaryOperations(blkIn, 
right, blkOut, _op);     
@@ -322,9 +322,9 @@ public class MapmmSPInstruction extends BinarySPInstruction
                
                private CacheType _type = null;
                private AggregateBinaryOperator _op = null;
-               private PartitionedBroadcastMatrix _pbc = null;
+               private PartitionedBroadcast<MatrixBlock> _pbc = null;
                
-               public RDDFlatMapMMFunction( CacheType type, 
PartitionedBroadcastMatrix binput )
+               public RDDFlatMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput )
                {
                        _type = type;
                        _pbc = binput;
@@ -349,7 +349,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                int len = _pbc.getNumRowBlocks();
                                for( int i=1; i<=len; i++ ) 
                                {
-                                       MatrixBlock left = 
_pbc.getMatrixBlock(i, (int)ixIn.getRowIndex());
+                                       MatrixBlock left = _pbc.getBlock(i, 
(int)ixIn.getRowIndex());
                                        MatrixIndexes ixOut = new 
MatrixIndexes();
                                        MatrixBlock blkOut = new MatrixBlock();
                                        
@@ -367,7 +367,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                for( int j=1; j<=len; j++ ) 
                                {
                                        //get the right hand side matrix
-                                       MatrixBlock right = 
_pbc.getMatrixBlock((int)ixIn.getColumnIndex(), j);
+                                       MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), j);
                                        MatrixIndexes ixOut = new 
MatrixIndexes();
                                        MatrixBlock blkOut = new MatrixBlock();
                                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 4248999..e7bf894 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -33,10 +33,9 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.functions.IsBlockInRange;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
@@ -46,13 +45,11 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
 import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class MatrixIndexingSPInstruction  extends UnarySPInstruction
+public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 {
-       
        /*
         * This class implements the matrix indexing functionality inside CP.  
         * Example instructions: 
@@ -66,72 +63,17 @@ public class MatrixIndexingSPInstruction  extends 
UnarySPInstruction
         *         the result is stored in mVar7
         *  
         */
-       protected CPOperand rowLower, rowUpper, colLower, colUpper;
-       protected SparkAggType _aggType = null;
-       
+
        public MatrixIndexingSPInstruction(Operator op, CPOperand in, CPOperand 
rl, CPOperand ru, CPOperand cl, CPOperand cu, 
                                                  CPOperand out, SparkAggType 
aggtype, String opcode, String istr)
        {
-               super(op, in, out, opcode, istr);
-               rowLower = rl;
-               rowUpper = ru;
-               colLower = cl;
-               colUpper = cu;
-
-               _aggType = aggtype;
+               super(op, in, rl, ru, cl, cu, out, aggtype, opcode, istr);
        }
        
        public MatrixIndexingSPInstruction(Operator op, CPOperand lhsInput, 
CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu, 
                                                  CPOperand out, String opcode, 
String istr)
        {
-               super(op, lhsInput, rhsInput, out, opcode, istr);
-               rowLower = rl;
-               rowUpper = ru;
-               colLower = cl;
-               colUpper = cu;
-       }
-       
-       public static MatrixIndexingSPInstruction parseInstruction ( String str 
) 
-               throws DMLRuntimeException 
-       {       
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               String opcode = parts[0];
-               
-               if ( opcode.equalsIgnoreCase("rangeReIndex") ) {
-                       if ( parts.length == 8 ) {
-                               // Example: 
rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
-                               CPOperand in = new CPOperand(parts[1]);
-                               CPOperand rl = new CPOperand(parts[2]);
-                               CPOperand ru = new CPOperand(parts[3]);
-                               CPOperand cl = new CPOperand(parts[4]);
-                               CPOperand cu = new CPOperand(parts[5]);
-                               CPOperand out = new CPOperand(parts[6]);
-                               SparkAggType aggtype = 
SparkAggType.valueOf(parts[7]);
-                               return new MatrixIndexingSPInstruction(new 
SimpleOperator(null), in, rl, ru, cl, cu, out, aggtype, opcode, str);
-                       }
-                       else {
-                               throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
-                       }
-               } 
-               else if ( opcode.equalsIgnoreCase("leftIndex") || 
opcode.equalsIgnoreCase("mapLeftIndex")) {
-                       if ( parts.length == 8 ) {
-                               // Example: 
leftIndex:mVar1:mvar2:Var3:Var4:Var5:Var6:mVar7
-                               CPOperand lhsInput = new CPOperand(parts[1]);
-                               CPOperand rhsInput = new CPOperand(parts[2]);
-                               CPOperand rl = new CPOperand(parts[3]);
-                               CPOperand ru = new CPOperand(parts[4]);
-                               CPOperand cl = new CPOperand(parts[5]);
-                               CPOperand cu = new CPOperand(parts[6]);
-                               CPOperand out = new CPOperand(parts[7]);
-                               return new MatrixIndexingSPInstruction(new 
SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str);
-                       }
-                       else {
-                               throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
-                       }
-               }
-               else {
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixIndexingSPInstruction: " + str);
-               }
+               super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, 
istr);
        }
        
        @Override
@@ -181,7 +123,7 @@ public class MatrixIndexingSPInstruction  extends 
UnarySPInstruction
                else if ( opcode.equalsIgnoreCase("leftIndex") || 
opcode.equalsIgnoreCase("mapLeftIndex"))
                {
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-                       PartitionedBroadcastMatrix broadcastIn2 = null;
+                       PartitionedBroadcast<MatrixBlock> broadcastIn2 = null;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
                        
@@ -334,12 +276,12 @@ public class MatrixIndexingSPInstruction  extends 
UnarySPInstruction
        {
                private static final long serialVersionUID = 
1757075506076838258L;
                
-               private PartitionedBroadcastMatrix _binput;
+               private PartitionedBroadcast<MatrixBlock> _binput;
                private IndexRange _ixrange = null;
                private int _brlen = -1;
                private int _bclen = -1;
                
-               public LeftIndexPartitionFunction(PartitionedBroadcastMatrix 
binput, IndexRange ixrange, MatrixCharacteristics mc) 
+               public 
LeftIndexPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, IndexRange 
ixrange, MatrixCharacteristics mc) 
                {
                        _binput = binput;
                        _ixrange = ixrange;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 0b0ee35..a69a67b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -38,7 +38,7 @@ import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
 import org.apache.sysml.runtime.instructions.spark.functions.IsBlockInRange;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -115,8 +115,8 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                                        .mapToPair(new 
PMapMMRebaseBlocksFunction(i/mc1.getRowsPerBlock()));
                        
                        int rlen = (int)Math.min(mc1.getRows()-i, 
NUM_ROWBLOCKS*mc1.getRowsPerBlock());
-                       PartitionedMatrixBlock pmb = 
SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L);
-                       Broadcast<PartitionedMatrixBlock> bpmb = 
sec.getSparkContext().broadcast(pmb);
+                       PartitionedBlock<MatrixBlock> pmb = 
SparkExecutionContext.toPartitionedMatrixBlock(rdd, rlen, (int)mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1L);
+                       Broadcast<PartitionedBlock<MatrixBlock>> bpmb = 
sec.getSparkContext().broadcast(pmb);
                        
                        //matrix multiplication
                        JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2
@@ -178,10 +178,10 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction
                private static final long serialVersionUID = 
-4520080421816885321L;
 
                private AggregateBinaryOperator _op = null;
-               private Broadcast<PartitionedMatrixBlock> _pbc = null;
+               private Broadcast<PartitionedBlock<MatrixBlock>> _pbc = null;
                private long _offset = -1;
                
-               public PMapMMFunction( Broadcast<PartitionedMatrixBlock> 
binput, long offset )
+               public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> 
binput, long offset )
                {
                        _pbc = binput;
                        _offset = offset;
@@ -195,7 +195,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
-                       PartitionedMatrixBlock pm = _pbc.value();
+                       PartitionedBlock<MatrixBlock> pm = _pbc.value();
                        
                        MatrixIndexes ixIn = arg0._1();
                        MatrixBlock blkIn = arg0._2();
@@ -207,7 +207,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                        
                        //get the right hand side matrix
                        for( int i=1; i<=pm.getNumRowBlocks(); i++ ) {
-                               MatrixBlock left = pm.getMatrixBlock(i, 
(int)ixIn.getRowIndex());
+                               MatrixBlock left = pm.getBlock(i, 
(int)ixIn.getRowIndex());
                        
                                //execute matrix-vector mult
                                
OperationsOnMatrixValues.performAggregateBinary( 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 0d45ae1..a80dd1b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -45,7 +45,7 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupJoin;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ExtractGroupNWeights;
@@ -196,7 +196,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        String targetVar = params.get(Statement.GAGG_TARGET);
                        String groupsVar = params.get(Statement.GAGG_GROUPS);   
                
                        JavaPairRDD<MatrixIndexes,MatrixBlock> target = 
sec.getBinaryBlockRDDHandleForVariable(targetVar);
-                       PartitionedBroadcastMatrix groups = 
sec.getBroadcastForVariable(groupsVar);
+                       PartitionedBroadcast<MatrixBlock> groups = 
sec.getBroadcastForVariable(groupsVar);
                        MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics( targetVar );
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                        CPOperand ngrpOp = new 
CPOperand(params.get(Statement.GAGG_NUM_GROUPS));
@@ -252,7 +252,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                                
                                //execute basic grouped aggregate (extract and 
preagg)
                                if( broadcastGroups ) {
-                                       PartitionedBroadcastMatrix pbm = 
sec.getBroadcastForVariable(groupsVar);
+                                       PartitionedBroadcast<MatrixBlock> pbm = 
sec.getBroadcastForVariable(groupsVar);
                                        groupWeightedCells = target
                                                        .flatMapToPair(new 
ExtractGroupBroadcast(pbm, mc1.getColsPerBlock(), ngroups, _optr));             
                             
                                }
@@ -314,7 +314,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                                //get input rdd handle
                                JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable( rddInVar );
                                JavaPairRDD<MatrixIndexes,MatrixBlock> off;
-                               PartitionedBroadcastMatrix broadcastOff;
+                               PartitionedBroadcast<MatrixBlock> broadcastOff;
                                long brlen = mcIn.getRowsPerBlock();
                                long bclen = mcIn.getColsPerBlock();
                                long numRep = (long)Math.ceil( rows ? 
(double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen);
@@ -529,9 +529,9 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                private long _brlen;
                private long _bclen;
                
-               private PartitionedBroadcastMatrix _off = null;
+               private PartitionedBroadcast<MatrixBlock> _off = null;
                                
-               public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, 
long brlen, long bclen, PartitionedBroadcastMatrix off) 
+               public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, 
long brlen, long bclen, PartitionedBroadcast<MatrixBlock> off) 
                {
                        _rmRows = rmRows;
                        _len = len;
@@ -549,9 +549,9 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        //IndexedMatrixValue offsets = 
SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2()._2());
                        IndexedMatrixValue offsets = null;
                        if(_rmRows)
-                               offsets = 
SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getMatrixBlock((int)arg0._1().getRowIndex(), 1));
+                               offsets = 
SparkUtils.toIndexedMatrixBlock(arg0._1(), 
_off.getBlock((int)arg0._1().getRowIndex(), 1));
                        else
-                               offsets = 
SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock(1, 
(int)arg0._1().getColumnIndex()));
+                               offsets = 
SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getBlock(1, 
(int)arg0._1().getColumnIndex()));
                        
                        //execute remove empty operations
                        ArrayList<IndexedMatrixValue> out = new 
ArrayList<IndexedMatrixValue>();
@@ -606,13 +606,13 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
        {
                private static final long serialVersionUID = 
6795402640178679851L;
                
-               private PartitionedBroadcastMatrix _pbm = null;
+               private PartitionedBroadcast<MatrixBlock> _pbm = null;
                private Operator _op = null;
                private int _ngroups = -1;
                private int _brlen = -1;
                private int _bclen = -1;
                
-               public RDDMapGroupedAggFunction(PartitionedBroadcastMatrix pbm, 
Operator op, int ngroups, int brlen, int bclen) 
+               public 
RDDMapGroupedAggFunction(PartitionedBroadcast<MatrixBlock> pbm, Operator op, 
int ngroups, int brlen, int bclen) 
                {
                        _pbm = pbm;
                        _op = op;
@@ -628,7 +628,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        //get all inputs
                        MatrixIndexes ix = arg0._1();
                        MatrixBlock target = arg0._2();         
-                       MatrixBlock groups = 
_pbm.getMatrixBlock((int)ix.getRowIndex(), 1);
+                       MatrixBlock groups = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
                        
                        //execute map grouped aggregate operations
                        IndexedMatrixValue in1 = 
SparkUtils.toIndexedMatrixBlock(ix, target);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
index 44313e6..b1e7d07 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
@@ -37,7 +37,7 @@ import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -106,7 +106,7 @@ public class PmmSPInstruction extends BinarySPInstruction
                
                //get inputs
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( rddVar );
-               PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( 
bcastVar ); 
+               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( bcastVar ); 
                
                //execute pmm instruction
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
@@ -130,11 +130,11 @@ public class PmmSPInstruction extends BinarySPInstruction
        {
                private static final long serialVersionUID = 
-1696560050436469140L;
                
-               private PartitionedBroadcastMatrix _pmV = null;
+               private PartitionedBroadcast<MatrixBlock> _pmV = null;
                private long _rlen = -1;
                private int _brlen = -1;
                
-               public RDDPMMFunction( CacheType type, 
PartitionedBroadcastMatrix binput, long rlen, int brlen ) 
+               public RDDPMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput, long rlen, int brlen ) 
                        throws DMLRuntimeException
                {
                        _brlen = brlen;
@@ -151,7 +151,7 @@ public class PmmSPInstruction extends BinarySPInstruction
                        MatrixBlock mb2 = arg0._2();
                        
                        //get the right hand side matrix
-                       MatrixBlock mb1 = 
_pmV.getMatrixBlock((int)ixIn.getRowIndex(), 1);
+                       MatrixBlock mb1 = 
_pmV.getBlock((int)ixIn.getRowIndex(), 1);
                        
                        //compute target block indexes
                        long minPos = UtilFunctions.toLong( mb1.minNonZero() );

Reply via email to