http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
index c021c04..1953e99 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -51,7 +51,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -237,8 +237,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                        || 
WeightedCrossEntropy.OPCODE.equalsIgnoreCase(getOpcode())
                        || 
WeightedUnaryMM.OPCODE.equalsIgnoreCase(getOpcode())) 
                {
-                       PartitionedBroadcastMatrix bc1 = 
sec.getBroadcastForVariable( input2.getName() );
-                       PartitionedBroadcastMatrix bc2 = 
sec.getBroadcastForVariable( input3.getName() );
+                       PartitionedBroadcast<MatrixBlock> bc1 = 
sec.getBroadcastForVariable( input2.getName() );
+                       PartitionedBroadcast<MatrixBlock> bc2 = 
sec.getBroadcastForVariable( input3.getName() );
                        
                        //partitioning-preserving mappartitions (key access 
required for broadcast loopkup)
                        boolean noKeyChange = (qop.wtype3 == null || 
qop.wtype3.isBasic()); //only wdivmm changes keys
@@ -251,8 +251,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                //reduce-side operation (two/three/four rdd inputs, 
zero/one/two broadcasts)
                else 
                {
-                       PartitionedBroadcastMatrix bc1 = _cacheU ? 
sec.getBroadcastForVariable( input2.getName() ) : null;
-                       PartitionedBroadcastMatrix bc2 = _cacheV ? 
sec.getBroadcastForVariable( input3.getName() ) : null;
+                       PartitionedBroadcast<MatrixBlock> bc1 = _cacheU ? 
sec.getBroadcastForVariable( input2.getName() ) : null;
+                       PartitionedBroadcast<MatrixBlock> bc2 = _cacheV ? 
sec.getBroadcastForVariable( input3.getName() ) : null;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> inU = (!_cacheU) 
? sec.getBinaryBlockRDDHandleForVariable( input2.getName() ) : null;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> inV = (!_cacheV) 
? sec.getBinaryBlockRDDHandleForVariable( input3.getName() ) : null;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> inW = 
(qop.hasFourInputs() && !_input4.isLiteral()) ? 
@@ -361,10 +361,10 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                private static final long serialVersionUID = 
-3175397651350954930L;
                
                protected QuaternaryOperator _qop = null;
-               protected PartitionedBroadcastMatrix _pmU = null;
-               protected PartitionedBroadcastMatrix _pmV = null;
+               protected PartitionedBroadcast<MatrixBlock> _pmU = null;
+               protected PartitionedBroadcast<MatrixBlock> _pmV = null;
                
-               public RDDQuaternaryBaseFunction( QuaternaryOperator qop, 
PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) {
+               public RDDQuaternaryBaseFunction( QuaternaryOperator qop, 
PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) {
                        _qop = qop;             
                        _pmU = bcU;
                        _pmV = bcV;
@@ -388,7 +388,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
        {
                private static final long serialVersionUID = 
-8209188316939435099L;
                
-               public RDDQuaternaryFunction1( QuaternaryOperator qop, 
PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
+               public RDDQuaternaryFunction1( QuaternaryOperator qop, 
PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) 
                        throws DMLRuntimeException
                {
                        super(qop, bcU, bcV);
@@ -415,8 +415,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                                MatrixBlock blkIn = arg._2();
                                MatrixBlock blkOut = new MatrixBlock();
                                
-                               MatrixBlock mbU = 
_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1);
-                               MatrixBlock mbV = 
_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+                               MatrixBlock mbU = 
_pmU.getBlock((int)ixIn.getRowIndex(), 1);
+                               MatrixBlock mbV = 
_pmV.getBlock((int)ixIn.getColumnIndex(), 1);
                                
                                //execute core operation
                                blkIn.quaternaryOperations(_qop, mbU, mbV, 
null, blkOut);
@@ -437,7 +437,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
        {
                private static final long serialVersionUID = 
7493974462943080693L;
                
-               public RDDQuaternaryFunction2( QuaternaryOperator qop, 
PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
+               public RDDQuaternaryFunction2( QuaternaryOperator qop, 
PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) 
                        throws DMLRuntimeException
                {
                        super(qop, bcU, bcV);
@@ -452,8 +452,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                        MatrixBlock blkIn2 = arg0._2()._2();
                        MatrixBlock blkOut = new MatrixBlock();
                        
-                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
+                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
                        MatrixBlock mbW = (_qop.hasFourInputs()) ? blkIn2 : 
null;
                        
                        //execute core operation
@@ -473,7 +473,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
        {
                private static final long serialVersionUID = 
-2294086455843773095L;
                
-               public RDDQuaternaryFunction3( QuaternaryOperator qop, 
PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
+               public RDDQuaternaryFunction3( QuaternaryOperator qop, 
PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) 
                        throws DMLRuntimeException
                {
                        super(qop, bcU, bcV);
@@ -490,8 +490,8 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                        
                        MatrixBlock blkOut = new MatrixBlock();
                        
-                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : 
+                       MatrixBlock mbU = 
(_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+                       MatrixBlock mbV = 
(_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : 
                                              (_pmU!=null)? blkIn2 : blkIn3;
                        MatrixBlock mbW = (_qop.hasFourInputs())? blkIn3 : null;
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
index e2cb782..8974a7e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
@@ -42,7 +42,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -158,7 +158,7 @@ public class UaggOuterChainSPInstruction extends 
BinarySPInstruction
                }
                else
                {
-                       PartitionedBroadcastMatrix bv = 
sec.getBroadcastForVariable( bcastVar ); 
+                       PartitionedBroadcast<MatrixBlock> bv = 
sec.getBroadcastForVariable( bcastVar ); 
                        
                        //partitioning-preserving map-to-pair (under 
constraints)
                        out = in1.mapPartitionsToPair( new 
RDDMapGenUAggOuterChainFunction(bv, _uaggOp, _aggOp, _bOp, mcIn), noKeyChange 
);     
@@ -314,7 +314,7 @@ public class UaggOuterChainSPInstruction extends 
BinarySPInstruction
        {
                private static final long serialVersionUID = 
8197406787010296291L;
 
-               private PartitionedBroadcastMatrix _pbc = null;
+               private PartitionedBroadcast<MatrixBlock> _pbc = null;
                
                // Operators
                private AggregateUnaryOperator _uaggOp = null;
@@ -327,7 +327,7 @@ public class UaggOuterChainSPInstruction extends 
BinarySPInstruction
                private MatrixValue _tmpVal1 = null;
                private MatrixValue _tmpVal2 = null;
 
-               public 
RDDMapGenUAggOuterChainFunction(PartitionedBroadcastMatrix binput, 
AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, 
+               public 
RDDMapGenUAggOuterChainFunction(PartitionedBroadcast<MatrixBlock> binput, 
AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, 
                                MatrixCharacteristics mc)
                {
                        _pbc = binput;
@@ -374,7 +374,7 @@ public class UaggOuterChainSPInstruction extends 
BinarySPInstruction
                                
                                for(int bidx=1; bidx <= in2_colBlocks; bidx++) 
                                {
-                                       MatrixValue in2Val = 
_pbc.getMatrixBlock(1, bidx);
+                                       MatrixValue in2Val = _pbc.getBlock(1, 
bidx);
                                        
                                        //outer block operation
                                        
OperationsOnMatrixValues.performBinaryIgnoreIndexes(in1Val, in2Val, _tmpVal1, 
_bOp);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index f42c29e..0cfec66 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -146,7 +146,7 @@ public class WriteSPInstruction extends SPInstruction
                        //prepare output info according to meta data
                        String outFmt = input3.getName();
                        OutputInfo oi = OutputInfo.stringToOutputInfo(outFmt);
-                       
+                               
                        //core matrix/frame write
                        if( input1.getDataType()==DataType.MATRIX )
                                processMatrixWriteInstruction(sec, fname, oi);
@@ -311,7 +311,7 @@ public class WriteSPInstruction extends SPInstruction
                }
                
                // write meta data file
-               MapReduceTool.writeMetaDataFile(fname + ".mtd", 
input1.getValueType(), DataType.FRAME, mc, oi, formatProperties);       
+               MapReduceTool.writeMetaDataFile(fname + ".mtd", 
input1.getValueType(), null, DataType.FRAME, mc, oi, formatProperties); 
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 13c68e2..6091edf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -22,15 +22,16 @@ package org.apache.sysml.runtime.instructions.spark.data;
 import java.lang.ref.SoftReference;
 
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 
-public class BroadcastObject extends LineageObject
+public class BroadcastObject<T extends CacheBlock> extends LineageObject
 {
        //soft reference storage for graceful cleanup in case of memory pressure
-       private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
+       protected SoftReference<PartitionedBroadcast<T>> _bcHandle = null;
        
-       public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName 
)
+       public BroadcastObject( PartitionedBroadcast<T> bvar, String varName )
        {
-               _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
+               _bcHandle = new SoftReference<PartitionedBroadcast<T>>(bvar);
                _varName = varName;
        }
        
@@ -38,7 +39,8 @@ public class BroadcastObject extends LineageObject
         * 
         * @return
         */
-       public PartitionedBroadcastMatrix getBroadcast()
+       @SuppressWarnings("rawtypes")
+       public PartitionedBroadcast getBroadcast()
        {
                return _bcHandle.get();
        }
@@ -50,13 +52,13 @@ public class BroadcastObject extends LineageObject
        public boolean isValid() 
        {
                //check for evicted soft reference
-               PartitionedBroadcastMatrix pbm = _bcHandle.get();
+               PartitionedBroadcast<T> pbm = _bcHandle.get();
                if( pbm == null )
                        return false;
                
                //check for validity of individual broadcasts
-               Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
-               for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+               Broadcast<PartitionedBlock<T>>[] tmp = pbm.getBroadcasts();
+               for( Broadcast<PartitionedBlock<T>> bc : tmp )
                        if( !bc.isValid() )
                                return false;           
                return true;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
new file mode 100644
index 0000000..20fcd0b
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
+import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
+import org.apache.sysml.runtime.util.IndexRange;
+
+/**
+ * This class is for partitioned matrix/frame blocks, to be used
+ * as broadcasts. Distributed tasks require block-partitioned broadcasts but a 
lazy partitioning per
+ * task would create instance-local copies and hence replicate broadcast 
variables which are shared
+ * by all tasks within an executor.  
+ * 
+ */
+public class PartitionedBlock<T extends CacheBlock> implements Externalizable
+{
+
+       protected T[] _partBlocks = null; 
+       protected long _rlen = -1;
+       protected long _clen = -1;
+       protected int _brlen = -1;
+       protected int _bclen = -1;
+       protected int _offset = 0;
+       
+       public PartitionedBlock() {
+               //do nothing (required for Externalizable)
+       }
+       
+       
+       public long getNumRows() {
+               return _rlen;
+       }
+       
+       public long getNumCols() {
+               return _clen;
+       }
+       
+       public long getNumRowsPerBlock() {
+               return _brlen;
+       }
+       
+       public long getNumColumnsPerBlock() {
+               return _bclen;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public int getNumRowBlocks() 
+       {
+               return (int)Math.ceil((double)_rlen/_brlen);
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public int getNumColumnBlocks() 
+       {
+               return (int)Math.ceil((double)_clen/_bclen);
+       }
+       
+       @SuppressWarnings("unchecked")
+       public PartitionedBlock(T block, int brlen, int bclen) 
+       {
+               //get the input frame block
+               int rlen = block.getNumRows();
+               int clen = block.getNumColumns();
+               
+               //partitioning input broadcast
+               _rlen = rlen;
+               _clen = clen;
+               _brlen = brlen;
+               _bclen = bclen;
+
+               int nrblks = getNumRowBlocks();
+               int ncblks = getNumColumnBlocks();
+               
+               try
+               {
+                       _partBlocks = 
(T[])Array.newInstance((block.getClass()), nrblks * ncblks);
+                       for( int i=0, ix=0; i<nrblks; i++ )
+                               for( int j=0; j<ncblks; j++, ix++ )
+                               {
+                                       T tmp = (T) 
block.getClass().newInstance();
+                                       block.sliceOperations(i*_brlen, 
Math.min((i+1)*_brlen, rlen)-1, 
+                                                                  j*_bclen, 
Math.min((j+1)*_bclen, clen)-1, tmp);
+                                       _partBlocks[ix] = tmp;
+                               }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException("Failed partitioning of 
broadcast variable input.", ex);
+               }
+               
+               _offset = 0;
+       }
+
+       @SuppressWarnings("unchecked")
+       public PartitionedBlock(int rlen, int clen, int brlen, int bclen, T 
block) 
+       {
+               //partitioning input broadcast
+               _rlen = rlen;
+               _clen = clen;
+               _brlen = brlen;
+               _bclen = bclen;
+               
+               int nrblks = getNumRowBlocks();
+               int ncblks = getNumColumnBlocks();
+               _partBlocks = (T[])Array.newInstance((block.getClass()), nrblks 
* ncblks);
+
+       }
+       
+       /**
+        * 
+        * @param rowIndex
+        * @param colIndex
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       public T getBlock(int rowIndex, int colIndex) 
+               throws DMLRuntimeException 
+       {
+               //check for valid block index
+               int nrblks = getNumRowBlocks();
+               int ncblks = getNumColumnBlocks();
+               if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || 
colIndex > ncblks ) {
+                       throw new DMLRuntimeException("Block indexes 
["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]");
+               }
+               
+               //get the requested frame/matrix block
+               int rix = rowIndex - 1;
+               int cix = colIndex - 1;
+               int ix = rix*ncblks+cix - _offset;
+               return _partBlocks[ix];
+       }
+       
+       /**
+        * 
+        * @param rowIndex
+        * @param colIndex
+        * @param mb
+        * @throws DMLRuntimeException
+        */
+       public void setBlock(int rowIndex, int colIndex, T block) 
+               throws DMLRuntimeException
+       {
+               //check for valid block index
+               int nrblks = getNumRowBlocks();
+               int ncblks = getNumColumnBlocks();
+               if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || 
colIndex > ncblks ) {
+                       throw new DMLRuntimeException("Block indexes 
["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]");
+               }
+               
+               //get the requested matrix block
+               int rix = rowIndex - 1;
+               int cix = colIndex - 1;
+               int ix = rix*ncblks+cix - _offset;
+               _partBlocks[ ix ] = block;
+               
+       }
+       
+       /**
+        * 
+        * @param offset
+        * @param numBlks
+        * @return
+        */
+       @SuppressWarnings("unchecked")
+       public PartitionedBlock<T> createPartition( int offset, int numBlks, T 
block )
+       {
+               PartitionedBlock<T> ret = new PartitionedBlock<T>();
+               ret._rlen = _rlen;
+               ret._clen = _clen;
+               ret._brlen = _brlen;
+               ret._bclen = _bclen;
+
+               _partBlocks = (T[])Array.newInstance(block.getClass(), numBlks);
+               ret._offset = offset;
+               System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, 
numBlks);
+               
+               return ret;
+       }
+
+       /**
+        * 
+        * @return
+        */     
+       public long getInMemorySize()
+       {
+               long ret = 24; //header
+               ret += 32;    //block array
+               
+               if( _partBlocks != null )
+                       for( T block : _partBlocks )
+                               ret += block.getInMemorySize();
+               
+               return ret;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       
+       public long getExactSerializedSize()
+       {
+               long ret = 24; //header
+               
+               if( _partBlocks != null )
+                       for( T block :  _partBlocks )
+                               ret += block.getExactSerializedSize();
+               
+               return ret;
+       }
+
+       /**
+        * Utility for slice operations over partitioned matrices, where the 
index range can cover
+        * multiple blocks. The result is always a single result matrix block. 
All semantics are 
+        * equivalent to the core matrix block slice operations. 
+        * 
+        * @param rl
+        * @param ru
+        * @param cl
+        * @param cu
+        * @param matrixBlock
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       @SuppressWarnings("unchecked")
+       public T sliceOperations(long rl, long ru, long cl, long cu, T block) 
+               throws DMLRuntimeException 
+       {
+               int lrl = (int) rl;
+               int lru = (int) ru;
+               int lcl = (int) cl;
+               int lcu = (int) cu;
+               
+               ArrayList<Pair<?, ?>> allBlks = block.getPairList();
+               int start_iix = (lrl-1)/_brlen+1;
+               int end_iix = (lru-1)/_brlen+1;
+               int start_jix = (lcl-1)/_bclen+1;
+               int end_jix = (lcu-1)/_bclen+1;
+                               
+               for( int iix = start_iix; iix <= end_iix; iix++ )
+                       for(int jix = start_jix; jix <= end_jix; jix++)         
+                       {
+                               T in = getBlock(iix, jix);
+                               IndexRange ixrange = new IndexRange(rl, ru, cl, 
cu);
+                               ArrayList<Pair<?, ?>> outlist = 
block.performSlice(ixrange, _brlen, _bclen, iix, jix, in);
+                               allBlks.addAll(outlist);
+                       }
+               
+               if(allBlks.size() == 1) {
+                       return (T) allBlks.get(0).getValue();
+               }
+               else {
+                       //allocate output matrix
+                       Constructor<?> constr;
+                       try {
+                               constr = 
block.getClass().getConstructor(int.class, int.class, boolean.class);
+                               T ret = (T) constr.newInstance(lru-lrl+1, 
lcu-lcl+1, false);
+                               for(Pair<?, ?> kv : allBlks) {
+                                       ret.merge((T)kv.getValue(), false);
+                               }
+                               return ret;
+                       } catch (Exception e) {
+                               throw new DMLRuntimeException(e);
+                       }
+               }
+       }
+
+       /**
+        * Redirects the default java serialization via externalizable to our 
default 
+        * hadoop writable serialization for efficient broadcast 
deserialization. 
+        * 
+        * @param is
+        * @throws IOException
+        */
+       public void readExternal(ObjectInput is) 
+               throws IOException
+       {
+               DataInput dis = is;
+               
+               if( is instanceof ObjectInputStream ) {
+                       //fast deserialize of dense/sparse blocks
+                       ObjectInputStream ois = (ObjectInputStream)is;
+                       dis = new FastBufferedDataInputStream(ois);
+               }
+               
+               readHeaderAndPayload(dis);
+       }
+       
+       /**
+        * Redirects the default java serialization via externalizable to our 
default 
+        * hadoop writable serialization for efficient broadcast serialization. 
+        * 
+        * @param is
+        * @throws IOException
+        */
+       public void writeExternal(ObjectOutput os) 
+               throws IOException
+       {
+               if( os instanceof ObjectOutputStream ) {
+                       //fast serialize of dense/sparse blocks
+                       ObjectOutputStream oos = (ObjectOutputStream)os;
+                       FastBufferedDataOutputStream fos = new 
FastBufferedDataOutputStream(oos);
+                       writeHeaderAndPayload(fos);
+                       fos.flush();
+               }
+               else {
+                       //default serialize (general case)
+                       writeHeaderAndPayload(os);      
+               }
+       }
+       
+       /**
+        * 
+        * @param dos
+        * @throws IOException 
+        */
+       private void writeHeaderAndPayload(DataOutput dos) 
+               throws IOException
+       {
+               dos.writeLong(_rlen);
+               dos.writeLong(_clen);
+               dos.writeInt(_brlen);
+               dos.writeInt(_bclen);
+               dos.writeInt(_offset);
+               dos.writeInt(_partBlocks.length);
+               
+               for( T block : _partBlocks )
+                       block.write(dos);
+       }
+
+       /**
+        * 
+        * @param din
+        * @throws IOException 
+        */
+       @SuppressWarnings("unchecked")
+       private void readHeaderAndPayload(DataInput dis) 
+               throws IOException
+       {
+               _rlen = dis.readInt();
+               _clen = dis.readInt();
+               _brlen = dis.readInt();
+               _bclen = dis.readInt();
+               _offset = dis.readInt();
+               
+               int len = dis.readInt();
+               
+               try
+               {
+                       _partBlocks = (T[])Array.newInstance(getClass(), len);
+                       for( int i=0; i<len; i++ ) {
+                               _partBlocks[i].readFields(dis);
+                       }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException("Failed partitioning of 
broadcast variable input.", ex);
+               }
+               
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
new file mode 100644
index 0000000..d924cc0
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.io.Serializable;
+
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+
+/**
+ * This class is a wrapper around an array of broadcasts of partitioned 
matrix/frame blocks,
+ * which is required due to 2GB limitations of Spark's broadcast handling. 
Without this
+ * partitioning of Broadcast<PartitionedBlock> into 
Broadcast<PartitionedBlock>[],
+ * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
issue.
+ * Despite various jiras, this issue still showed up in Spark 1.4/1.5. 
+ * 
+ */
+public class PartitionedBroadcast<T extends CacheBlock> implements Serializable
+{
+       private static final long serialVersionUID = 7041959166079438401L;
+
+       protected static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M 
cells ~ 1.6GB 
+       
+       private Broadcast<PartitionedBlock<T>>[] _pbc = null;
+       
+       public PartitionedBroadcast() {
+               //do nothing (required for Externalizable)
+       }
+       
+       public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] broadcasts)
+       {
+               _pbc = broadcasts;
+       }
+       
+       public Broadcast<PartitionedBlock<T>>[] getBroadcasts() {
+               return _pbc;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public int getNumRowBlocks() {
+               return _pbc[0].value().getNumRowBlocks();
+       }
+       
+       public int getNumColumnBlocks() {
+               return _pbc[0].value().getNumColumnBlocks();
+       }
+       
+       /**
+        * 
+        * @param rlen
+        * @param clen
+        * @param brlen
+        * @param bclen
+        * @return
+        */
+       public static int computeBlocksPerPartition(long rlen, long clen, long 
brlen, long bclen) {
+               return (int) Math.floor( BROADCAST_PARTSIZE /  
+                               Math.min(rlen, brlen) / Math.min(clen, bclen));
+       }
+       /**
+        * 
+        * @param rowIndex
+        * @param colIndex
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       public T getBlock(int rowIndex, int colIndex) 
+               throws DMLRuntimeException 
+       {
+               int pix = 0;
+               
+               if( _pbc.length > 1 ) { 
+                       //compute partition index
+                       PartitionedBlock<T> tmp = _pbc[0].value();
+                       int numPerPart = 
computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), 
+                                       tmp.getNumRowsPerBlock(), 
tmp.getNumColumnsPerBlock());
+                       int ix = 
(rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1);
+                       pix = ix / numPerPart;
+               }
+                       
+               return _pbc[pix].value().getBlock(rowIndex, colIndex);
+       }
+       
+       public T sliceOperations(long rl, long ru, long cl, long cu, T block) 
+                       throws DMLRuntimeException 
+       {
+               T ret = null;
+               
+               for( Broadcast<PartitionedBlock<T>> bc : _pbc ) {
+                       PartitionedBlock<T> pm = bc.value();
+                       T tmp = pm.sliceOperations(rl, ru, cl, cu, block);
+                       if( ret != null )
+                               ret.merge(tmp, false);
+                       else
+                               ret = tmp;
+               }
+               
+               return ret;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
deleted file mode 100644
index f79d8c1..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.io.Serializable;
-
-import org.apache.spark.broadcast.Broadcast;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-
-/**
- * This class is a wrapper around an array of broadcasts of partitioned matrix 
blocks,
- * which is required due to 2GB limitations of Spark's broadcast handling. 
Without this
- * partitioning of Broadcast<PartitionedMatrixBlock> into 
Broadcast<PartitionedMatrixBlock>[],
- * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
issue.
- * Despite various jiras, this issue still showed up in Spark 1.4/1.5. 
- * 
- */
-public class PartitionedBroadcastMatrix implements Serializable
-{
-       private static final long serialVersionUID = 1225135967889810877L;
-       private static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M 
cells ~ 1.6GB 
-       
-       private Broadcast<PartitionedMatrixBlock>[] _pbc = null;
-       
-       public PartitionedBroadcastMatrix(Broadcast<PartitionedMatrixBlock>[] 
broadcasts)
-       {
-               _pbc = broadcasts;
-       }
-       
-       public Broadcast<PartitionedMatrixBlock>[] getBroadcasts() {
-               return _pbc;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public int getNumRowBlocks() {
-               return _pbc[0].value().getNumRowBlocks();
-       }
-       
-       public int getNumColumnBlocks() {
-               return _pbc[0].value().getNumColumnBlocks();
-       }
-       
-       /**
-        * 
-        * @param rowIndex
-        * @param colIndex
-        * @return
-        * @throws DMLRuntimeException 
-        */
-       public MatrixBlock getMatrixBlock(int rowIndex, int colIndex) 
-               throws DMLRuntimeException 
-       {
-               if( _pbc.length > 1 ) { 
-                       //compute partition index
-                       PartitionedMatrixBlock tmp = _pbc[0].value();
-                       int numPerPart = 
computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), 
-                                       tmp.getNumRowsPerBlock(), 
tmp.getNumColumnsPerBlock());
-                       int ix = 
(rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1);
-                       int pix = ix / numPerPart;
-                       
-                       //get matrix block from partition
-                       return _pbc[pix].value().getMatrixBlock(rowIndex, 
colIndex);    
-               }
-               else { //single partition
-                       return _pbc[0].value().getMatrixBlock(rowIndex, 
colIndex);
-               }
-               
-       }
-       
-       public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, 
MatrixBlock matrixBlock) 
-               throws DMLRuntimeException 
-       {
-               MatrixBlock ret = null;
-               
-               for( Broadcast<PartitionedMatrixBlock> bc : _pbc ) {
-                       PartitionedMatrixBlock pm = bc.value();
-                       MatrixBlock tmp = pm.sliceOperations(rl, ru, cl, cu, 
new MatrixBlock());
-                       if( ret != null )
-                               ret.merge(tmp, false);
-                       else
-                               ret = tmp;
-               }
-               
-               return ret;
-       }
-       
-       /**
-        * 
-        * @param rlen
-        * @param clen
-        * @param brlen
-        * @param bclen
-        * @return
-        */
-       public static int computeBlocksPerPartition(long rlen, long clen, long 
brlen, long bclen) {
-               return (int) Math.floor( BROADCAST_PARTSIZE /  
-                               Math.min(rlen, brlen) / Math.min(clen, bclen));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
deleted file mode 100644
index 901b130..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
-import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
-import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
-import org.apache.sysml.runtime.util.IndexRange;
-
-/**
- * The main purpose of this class is to provide a handle for partitioned 
matrix blocks, to be used
- * as broadcasts. Distributed tasks require block-partitioned broadcasts but a 
lazy partitioning per
- * task would create instance-local copies and hence replicate broadcast 
variables which are shared
- * by all tasks within an executor.  
- * 
- */
-public class PartitionedMatrixBlock implements Externalizable
-{
-
-       private static final long serialVersionUID = -5706923809800365593L;
-
-       private MatrixBlock[] _partBlocks = null; 
-       private int _rlen = -1;
-       private int _clen = -1;
-       private int _brlen = -1;
-       private int _bclen = -1;
-       private int _offset = 0;
-       
-       public PartitionedMatrixBlock() {
-               //do nothing (required for Externalizable)
-       }
-       
-       public PartitionedMatrixBlock(MatrixBlock mb, int brlen, int bclen) 
-       {
-               //get the input matrix block
-               int rlen = mb.getNumRows();
-               int clen = mb.getNumColumns();
-               
-               //partitioning input broadcast
-               _rlen = rlen;
-               _clen = clen;
-               _brlen = brlen;
-               _bclen = bclen;
-               
-               int nrblks = getNumRowBlocks();
-               int ncblks = getNumColumnBlocks();
-               _partBlocks = new MatrixBlock[nrblks * ncblks];
-               
-               try
-               {
-                       for( int i=0, ix=0; i<nrblks; i++ )
-                               for( int j=0; j<ncblks; j++, ix++ )
-                               {
-                                       MatrixBlock tmp = new MatrixBlock();
-                                       mb.sliceOperations(i*brlen, 
Math.min((i+1)*brlen, rlen)-1, 
-                                                                  j*bclen, 
Math.min((j+1)*bclen, clen)-1, tmp);
-                                       _partBlocks[ix] = tmp;
-                               }
-               }
-               catch(Exception ex) {
-                       throw new RuntimeException("Failed partitioning of 
broadcast variable input.", ex);
-               }
-               
-               _offset = 0;
-       }
-       
-       public PartitionedMatrixBlock(int rlen, int clen, int brlen, int bclen) 
-       {
-               //partitioning input broadcast
-               _rlen = rlen;
-               _clen = clen;
-               _brlen = brlen;
-               _bclen = bclen;
-               
-               int nrblks = getNumRowBlocks();
-               int ncblks = getNumColumnBlocks();
-               _partBlocks = new MatrixBlock[nrblks * ncblks];         
-       }
-       
-       public long getNumRows() {
-               return _rlen;
-       }
-       
-       public long getNumCols() {
-               return _clen;
-       }
-       
-       public long getNumRowsPerBlock() {
-               return _brlen;
-       }
-       
-       public long getNumColumnsPerBlock() {
-               return _bclen;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public int getNumRowBlocks() 
-       {
-               return (int)Math.ceil((double)_rlen/_brlen);
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public int getNumColumnBlocks() 
-       {
-               return (int)Math.ceil((double)_clen/_bclen);
-       }
-       
-       /**
-        * 
-        * @param rowIndex
-        * @param colIndex
-        * @return
-        * @throws DMLRuntimeException 
-        */
-       public MatrixBlock getMatrixBlock(int rowIndex, int colIndex) 
-               throws DMLRuntimeException 
-       {
-               //check for valid block index
-               int nrblks = getNumRowBlocks();
-               int ncblks = getNumColumnBlocks();
-               if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || 
colIndex > ncblks ) {
-                       throw new DMLRuntimeException("Block indexes 
["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]");
-               }
-               
-               //get the requested matrix block
-               int rix = rowIndex - 1;
-               int cix = colIndex - 1;
-               int ix = rix*ncblks+cix - _offset;
-               return _partBlocks[ ix ];
-       }
-       
-       /**
-        * 
-        * @param rowIndex
-        * @param colIndex
-        * @param mb
-        * @throws DMLRuntimeException
-        */
-       public void setMatrixBlock(int rowIndex, int colIndex, MatrixBlock mb) 
-               throws DMLRuntimeException
-       {
-               //check for valid block index
-               int nrblks = getNumRowBlocks();
-               int ncblks = getNumColumnBlocks();
-               if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || 
colIndex > ncblks ) {
-                       throw new DMLRuntimeException("Block indexes 
["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]");
-               }
-               
-               //get the requested matrix block
-               int rix = rowIndex - 1;
-               int cix = colIndex - 1;
-               int ix = rix*ncblks+cix - _offset;
-               _partBlocks[ ix ] = mb;
-               
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public long estimateSizeInMemory()
-       {
-               long ret = 24; //header
-               ret += 32;    //block array
-               
-               if( _partBlocks != null )
-                       for( MatrixBlock mb : _partBlocks )
-                               ret += mb.estimateSizeInMemory();
-               
-               return ret;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public long estimateSizeOnDisk()
-       {
-               long ret = 24; //header
-               
-               if( _partBlocks != null )
-                       for( MatrixBlock mb : _partBlocks )
-                               ret += mb.estimateSizeOnDisk();
-               
-               return ret;
-       }
-       
-       /**
-        * 
-        * @param offset
-        * @param numBlks
-        * @return
-        */
-       public PartitionedMatrixBlock createPartition( int offset, int numBlks )
-       {
-               PartitionedMatrixBlock ret = new PartitionedMatrixBlock();
-               ret._rlen = _rlen;
-               ret._clen = _clen;
-               ret._brlen = _brlen;
-               ret._bclen = _bclen;
-               ret._partBlocks = new MatrixBlock[numBlks];
-               ret._offset = offset;
-               
-               System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, 
numBlks);
-               
-               return ret;
-       }
-
-       /**
-        * Utility for slice operations over partitioned matrices, where the 
index range can cover
-        * multiple blocks. The result is always a single result matrix block. 
All semantics are 
-        * equivalent to the core matrix block slice operations. 
-        * 
-        * @param rl
-        * @param ru
-        * @param cl
-        * @param cu
-        * @param matrixBlock
-        * @return
-        * @throws DMLRuntimeException 
-        */
-       public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, 
MatrixBlock matrixBlock) 
-               throws DMLRuntimeException 
-       {
-               int lrl = (int) rl;
-               int lru = (int) ru;
-               int lcl = (int) cl;
-               int lcu = (int) cu;
-               
-               ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> allBlks = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
-               int start_iix = (lrl-1)/_brlen+1;
-               int end_iix = (lru-1)/_brlen+1;
-               int start_jix = (lcl-1)/_bclen+1;
-               int end_jix = (lcu-1)/_bclen+1;
-                               
-               for( int iix = start_iix; iix <= end_iix; iix++ )
-                       for(int jix = start_jix; jix <= end_jix; jix++)         
-                       {
-                               MatrixBlock in = getMatrixBlock(iix, jix);
-                               IndexedMatrixValue imv = new 
IndexedMatrixValue(new MatrixIndexes(iix, jix), in);
-                               
-                               ArrayList<IndexedMatrixValue> outlist = new 
ArrayList<IndexedMatrixValue>();
-                               IndexRange ixrange = new IndexRange(rl, ru, cl, 
cu);
-                               OperationsOnMatrixValues.performSlice(imv, 
ixrange, _brlen, _bclen, outlist);
-                               
allBlks.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
-                       }
-               
-               if(allBlks.size() == 1) {
-                       return allBlks.get(0)._2;
-               }
-               else {
-                       //allocate output matrix
-                       MatrixBlock ret = new MatrixBlock(lru-lrl+1, lcu-lcl+1, 
false);
-                       for(Tuple2<MatrixIndexes, MatrixBlock> kv : allBlks) {
-                               ret.merge(kv._2, false);
-                       }
-                       return ret;
-               }
-       }
-       
-       /**
-        * Redirects the default java serialization via externalizable to our 
default 
-        * hadoop writable serialization for efficient broadcast 
deserialization. 
-        * 
-        * @param is
-        * @throws IOException
-        */
-       public void readExternal(ObjectInput is) 
-               throws IOException
-       {
-               DataInput dis = is;
-               
-               if( is instanceof ObjectInputStream ) {
-                       //fast deserialize of dense/sparse blocks
-                       ObjectInputStream ois = (ObjectInputStream)is;
-                       dis = new FastBufferedDataInputStream(ois);
-               }
-               
-               readHeaderAndPayload(dis);
-       }
-       
-       /**
-        * Redirects the default java serialization via externalizable to our 
default 
-        * hadoop writable serialization for efficient broadcast serialization. 
-        * 
-        * @param is
-        * @throws IOException
-        */
-       public void writeExternal(ObjectOutput os) 
-               throws IOException
-       {
-               if( os instanceof ObjectOutputStream ) {
-                       //fast serialize of dense/sparse blocks
-                       ObjectOutputStream oos = (ObjectOutputStream)os;
-                       FastBufferedDataOutputStream fos = new 
FastBufferedDataOutputStream(oos);
-                       writeHeaderAndPayload(fos);
-                       fos.flush();
-               }
-               else {
-                       //default serialize (general case)
-                       writeHeaderAndPayload(os);      
-               }
-       }
-       
-       /**
-        * 
-        * @param dos
-        * @throws IOException 
-        */
-       private void writeHeaderAndPayload(DataOutput dos) 
-               throws IOException
-       {
-               dos.writeInt(_rlen);
-               dos.writeInt(_clen);
-               dos.writeInt(_brlen);
-               dos.writeInt(_bclen);
-               dos.writeInt(_offset);
-               dos.writeInt(_partBlocks.length);
-               for( MatrixBlock mb : _partBlocks )
-                       mb.write(dos);
-       }
-
-       /**
-        * 
-        * @param din
-        * @throws IOException 
-        */
-       private void readHeaderAndPayload(DataInput dis) 
-               throws IOException
-       {
-               _rlen = dis.readInt();
-               _clen = dis.readInt();
-               _brlen = dis.readInt();
-               _bclen = dis.readInt();
-               _offset = dis.readInt();
-               
-               int len = dis.readInt();
-               _partBlocks = new MatrixBlock[len];
-               
-               for( int i=0; i<len; i++ ) {
-                       _partBlocks[i] = new MatrixBlock();
-                       _partBlocks[i].readFields(dis);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java
index 9e31878..df891f7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java
@@ -57,4 +57,4 @@ public class CopyFrameBlockPairFunction implements 
PairFunction<Tuple2<LongWrita
                        return new Tuple2<Long,FrameBlock>(arg0._1().get(), 
arg0._2());
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index e8c6dbe..2a3aedf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -27,7 +27,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import scala.Tuple2;
 
 import org.apache.sysml.hops.OptimizerUtils;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.WeightedCell;
@@ -139,9 +139,9 @@ public abstract class ExtractGroup implements Serializable
        {
                private static final long serialVersionUID = 
5709955602290131093L;
                
-               private PartitionedBroadcastMatrix _pbm = null;
+               private PartitionedBroadcast<MatrixBlock> _pbm = null;
                
-               public ExtractGroupBroadcast( PartitionedBroadcastMatrix pbm, 
long bclen, long ngroups, Operator op ) {
+               public ExtractGroupBroadcast( PartitionedBroadcast<MatrixBlock> 
pbm, long bclen, long ngroups, Operator op ) {
                        super(bclen, ngroups, op);
                        _pbm = pbm;
                }
@@ -152,7 +152,7 @@ public abstract class ExtractGroup implements Serializable
                                throws Exception 
                {
                        MatrixIndexes ix = arg._1;
-                       MatrixBlock group = 
_pbm.getMatrixBlock((int)ix.getRowIndex(), 1);
+                       MatrixBlock group = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
                        MatrixBlock target = arg._2;
                        
                        return execute(ix, group, target);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java
index b110861..bc6035d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java
@@ -25,7 +25,7 @@ import org.apache.spark.broadcast.Broadcast;
 import scala.Tuple2;
 
 import org.apache.sysml.lops.BinaryM.VectorType;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
@@ -36,10 +36,10 @@ public class MatrixVectorBinaryOpFunction implements 
PairFunction<Tuple2<MatrixI
        private static final long serialVersionUID = -7695883019452417300L;
        
        private BinaryOperator _op = null;
-       private Broadcast<PartitionedMatrixBlock> _pmV = null;
+       private Broadcast<PartitionedBlock<MatrixBlock>> _pmV = null;
        private VectorType _vtype = null;
        
-       public MatrixVectorBinaryOpFunction( BinaryOperator op, 
Broadcast<PartitionedMatrixBlock> binput, VectorType vtype ) 
+       public MatrixVectorBinaryOpFunction( BinaryOperator op, 
Broadcast<PartitionedBlock<MatrixBlock>> binput, VectorType vtype ) 
        {
                _op = op;
                _pmV = binput;
@@ -56,7 +56,7 @@ public class MatrixVectorBinaryOpFunction implements 
PairFunction<Tuple2<MatrixI
                //get the rhs block 
                int rix= (int)((_vtype==VectorType.COL_VECTOR) ? 
ix.getRowIndex() : 1);
                int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : 
ix.getColumnIndex());
-               MatrixBlock in2 = _pmV.value().getMatrixBlock(rix, cix);
+               MatrixBlock in2 = _pmV.value().getBlock(rix, cix);
                        
                //execute the binary operation
                MatrixBlock ret = (MatrixBlock) (in1.binaryOperations (_op, 
in2, new MatrixBlock()));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
index b298693..597c028 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
@@ -27,7 +27,7 @@ import scala.Tuple2;
 
 import org.apache.sysml.lops.BinaryM.VectorType;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
@@ -37,10 +37,10 @@ public class MatrixVectorBinaryOpPartitionFunction 
implements PairFlatMapFunctio
        private static final long serialVersionUID = 9096091404578628534L;
        
        private BinaryOperator _op = null;
-       private PartitionedBroadcastMatrix _pmV = null;
+       private PartitionedBroadcast<MatrixBlock> _pmV = null;
        private VectorType _vtype = null;
        
-       public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, 
PartitionedBroadcastMatrix binput, VectorType vtype ) 
+       public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, 
PartitionedBroadcast<MatrixBlock> binput, VectorType vtype ) 
        {
                _op = op;
                _pmV = binput;
@@ -76,7 +76,7 @@ public class MatrixVectorBinaryOpPartitionFunction implements 
PairFlatMapFunctio
                        //get the rhs block 
                        int rix= (int)((_vtype==VectorType.COL_VECTOR) ? 
ix.getRowIndex() : 1);
                        int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : 
ix.getColumnIndex());
-                       MatrixBlock in2 = _pmV.getMatrixBlock(rix, cix);
+                       MatrixBlock in2 = _pmV.getBlock(rix, cix);
                                
                        //execute the binary operation
                        MatrixBlock ret = (MatrixBlock) (in1.binaryOperations 
(_op, in2, new MatrixBlock()));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
index 2947c72..3b90ff3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
@@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
 
-import 
org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
@@ -35,9 +35,9 @@ public class OuterVectorBinaryOpFunction implements 
PairFlatMapFunction<Tuple2<M
        private static final long serialVersionUID = 1730704346934726826L;
        
        private BinaryOperator _op;
-       private PartitionedBroadcastMatrix _pmV;
+       private PartitionedBroadcast<MatrixBlock> _pmV;
        
-       public OuterVectorBinaryOpFunction( BinaryOperator op, 
PartitionedBroadcastMatrix binput ) 
+       public OuterVectorBinaryOpFunction( BinaryOperator op, 
PartitionedBroadcast<MatrixBlock> binput ) 
        {
                _op = op;
                _pmV = binput;
@@ -84,7 +84,7 @@ public class OuterVectorBinaryOpFunction implements 
PairFlatMapFunction<Tuple2<M
                                MatrixIndexes ix = _currBlk._1();
                                MatrixBlock in1 = _currBlk._2();
                                
-                               MatrixBlock in2 = _pmV.getMatrixBlock(1, 
_currPos);
+                               MatrixBlock in2 = _pmV.getBlock(1, _currPos);
                                MatrixBlock resultBlk = 
(MatrixBlock)in1.binaryOperations (_op, in2, new MatrixBlock());
                                resultBlk.examSparsity(); 
                                ret = new Tuple2<MatrixIndexes,MatrixBlock>(

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 274efe0..b63724a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -178,7 +178,6 @@ public class FrameRDDConverterUtils
                return textCellToBinaryBlockLongIndex(sc, input, mcOut, 
lschema);
        }
 
-               
        /**
         * 
         * @param sc
@@ -198,7 +197,7 @@ public class FrameRDDConverterUtils
                
                //aggregate partial matrix blocks
                JavaPairRDD<Long,FrameBlock> out = 
-                               RDDAggregateUtils.mergeByFrameKey( output ); 
+                               (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( output ); 
 
                return out;
        }
@@ -264,7 +263,7 @@ public class FrameRDDConverterUtils
                        
                        //aggregate partial frame blocks
                        if(mcIn.getCols() > mcIn.getColsPerBlock())
-                               out = RDDAggregateUtils.mergeByFrameKey( out ); 
        //TODO: Will need better merger
+                               out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( out );
                }
                else
                        out = input.mapToPair(new 
MatrixToBinaryBlockOneColumnBlockFunction(mcIn));
@@ -396,8 +395,6 @@ public class FrameRDDConverterUtils
                private boolean _fill = false;
                private int _maxRowsPerBlock = -1; 
                
-               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements, size of default matrix block 
-
                
                public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean hasHeader, String delim, boolean fill)
                {
@@ -405,7 +402,7 @@ public class FrameRDDConverterUtils
                        _hasHeader = hasHeader;
                        _delim = delim;
                        _fill = fill;
-                       _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 
1);
+                       _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
                }
 
                @Override
@@ -529,10 +526,7 @@ public class FrameRDDConverterUtils
        {
                private static final long serialVersionUID = 
-729614449626680946L;
 
-               //internal buffer size (aligned w/ default matrix block size)
-               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements (8MB), size of default matrix block
                protected int _bufflen = -1;
-               
                protected long _rlen = -1;
                protected long _clen = -1;
                
@@ -542,7 +536,7 @@ public class FrameRDDConverterUtils
                        _clen = mc.getCols();
                        
                        //determine upper bounded buffer len
-                       _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE);
+                       _bufflen = (int) Math.min(_rlen*_clen, 
FrameBlock.BUFFER_SIZE);
                }
 
 
@@ -624,15 +618,12 @@ public class FrameRDDConverterUtils
                private int _maxRowsPerBlock = -1;
        
                
-               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements (Default matrix block size) 
-
-               
                public MatrixToBinaryBlockFunction(MatrixCharacteristics mc)
                {
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
                        _clen = mc.getCols();
-                       _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 
1);
+                       _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
index 48a8496..288a8b5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
@@ -38,7 +38,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.SortIndex;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
 import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -218,8 +218,8 @@ public class RDDSortUtils
                        
sortedIxSrc.quickSetValue((int)sortedIx.quickGetValue(i,0)-1, 0, i+1);          
        
 
                //broadcast index vector
-               PartitionedMatrixBlock pmb = new 
PartitionedMatrixBlock(sortedIxSrc, brlen, bclen);             
-               Broadcast<PartitionedMatrixBlock> _pmb = 
sec.getSparkContext().broadcast(pmb);  
+               PartitionedBlock<MatrixBlock> pmb = new 
PartitionedBlock<MatrixBlock>(sortedIxSrc, brlen, bclen);               
+               Broadcast<PartitionedBlock<MatrixBlock>> _pmb = 
sec.getSparkContext().broadcast(pmb);   
 
                //sort data with broadcast index vector
                JavaPairRDD<MatrixIndexes, RowMatrixBlock> ret = data
@@ -641,9 +641,9 @@ public class RDDSortUtils
                private long _rlen = -1;
                private int _brlen = -1;
 
-               private Broadcast<PartitionedMatrixBlock> _pmb = null;
+               private Broadcast<PartitionedBlock<MatrixBlock>> _pmb = null;
                
-               public ShuffleMatrixBlockRowsInMemFunction(long rlen, int 
brlen, Broadcast<PartitionedMatrixBlock> pmb)
+               public ShuffleMatrixBlockRowsInMemFunction(long rlen, int 
brlen, Broadcast<PartitionedBlock<MatrixBlock>> pmb)
                {
                        _rlen = rlen;
                        _brlen = brlen;
@@ -694,7 +694,7 @@ public class RDDSortUtils
                                        //produce next output tuple
                                        MatrixIndexes ixmap = _currBlk._1();
                                        MatrixBlock data = _currBlk._2();
-                                       MatrixBlock mbTargetIndex = 
_pmb.value().getMatrixBlock((int)ixmap.getRowIndex(), 1);
+                                       MatrixBlock mbTargetIndex = 
_pmb.value().getBlock((int)ixmap.getRowIndex(), 1);
                                        
                                        long valix = (long) 
mbTargetIndex.getValue(_currPos, 0);
                                        long rix = 
UtilFunctions.computeBlockIndex(valix, _brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 98975da..90d0148 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -92,6 +92,29 @@ public class SparkUtils
                return ret;
        }
        
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static Pair<MatrixIndexes,MatrixBlock> 
fromIndexedMatrixBlockToPair( IndexedMatrixValue in ){
+               return new Pair<MatrixIndexes,MatrixBlock>(in.getIndexes(), 
(MatrixBlock)in.getValue());
+       }
+       
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> 
fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in )
+       {
+               ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
+               for( IndexedMatrixValue imv : in )
+                       ret.add(fromIndexedMatrixBlockToPair(imv));
+               
+               return ret;
+       }
+       
        
        /**
         * 
@@ -102,7 +125,6 @@ public class SparkUtils
                return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue());
        }
        
-       
        /**
         * 
         * @param in
@@ -132,6 +154,25 @@ public class SparkUtils
        
        /**
         * 
+        * @param in
+        * @return
+        */
+       public static Pair<Long,FrameBlock> toIndexedFrameBlock( 
Tuple2<Long,FrameBlock> in ) {
+               return new Pair<Long,FrameBlock>(in._1(), in._2());
+       }
+       
+       /**
+        * 
+        * @param ix
+        * @param mb
+        * @return
+        */
+       public static Pair<Long,FrameBlock> toIndexedFrameBlock( Long ix, 
FrameBlock fb ) {
+               return new Pair<Long,FrameBlock>(ix, fb);
+       }
+
+       /**
+        * 
         * @param mb
         * @param blen
         * @return

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 0065fe8..e51bd9d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -46,10 +46,12 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  * 
  */
 @SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
-public class FrameBlock implements Writable, CacheBlock, Externalizable
+public class FrameBlock implements Writable, CacheBlock, Externalizable  
 {
        private static final long serialVersionUID = -3993450030207130665L;
        
+       public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, 
size of default matrix block 
+
        //internal configuration
        private static final boolean REUSE_RECODE_MAPS = true;
        
@@ -146,6 +148,16 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        }
        
        /**
+        * Sets the schema of the frame block.
+        * 
+        * @return
+        */
+       public void setSchema(List<ValueType> schema) {
+               _schema = schema;
+               _colnames = createColNames(schema.size());
+       }
+
+       /**
         * Returns the column names of the frame block.
         * 
         * @return
@@ -288,13 +300,14 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @param val
         */
        public void set(int r, int c, Object val) {
-               _coldata.get(c).set(r, val);
+               _coldata.get(c).set(r, 
UtilFunctions.objectToObject(_schema.get(c), val));
        }
-       
+
        public void reset(int nrow) 
        {
                getSchema().clear();
                getColumnNames().clear();
+               
                if(_coldata != null)
                        for(int i=0; i < _coldata.size(); ++i)
                                _coldata.get(i)._size = nrow;
@@ -567,6 +580,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        throw new DMLRuntimeException("Invalid values for frame 
indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " +
                                                        "must be within frame 
dimensions ["+getNumRows()+","+getNumColumns()+"].");
                }               
+
                if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < 
rhsFrame.getNumColumns()) {
                        throw new DMLRuntimeException("Invalid values for frame 
indexing: " +
                                        "dimensions of the source frame 
["+rhsFrame.getNumRows()+"x" + rhsFrame.getNumColumns() + "] " +
@@ -574,12 +588,14 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                                        (rl+1) +":" + (ru+1) + ", " + (cl+1) + 
":" + (cu+1) + "].");
                }
                
+               
                //allocate output frame (incl deep copy schema)
                if( ret == null )
                        ret = new FrameBlock();
-               ret._numRows = _numRows;
+               ret._numRows = _numRows;                                        
                        
                ret._schema = new ArrayList<ValueType>(_schema);
                ret._colnames = new ArrayList<String>(_colnames);
+               ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta);
                
                //copy data to output and partial overwrite w/ rhs
                for( int j=0; j<getNumColumns(); j++ ) {
@@ -618,9 +634,10 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @param ret
         * @return
         */
-       public FrameBlock sliceOperations(int rl, int ru, int cl, int cu, 
FrameBlock ret) 
+       public FrameBlock sliceOperations(int rl, int ru, int cl, int cu, 
CacheBlock retCache) 
                throws DMLRuntimeException
        {
+               FrameBlock ret = (FrameBlock)retCache;
                // check the validity of bounds
                if (   rl < 0 || rl >= getNumRows() || ru < rl || ru >= 
getNumRows()
                        || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= 
getNumColumns() ) {
@@ -638,6 +655,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                for( int j=cl; j<=cu; j++ ) {
                        ret._schema.add(_schema.get(j));
                        ret._colnames.add(_colnames.get(j));
+                       ret._colmeta.add(_colmeta.get(j));
                }       
                ret._numRows = ru-rl+1;
 
@@ -652,6 +670,41 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                return ret;
        }
        
+       
+       public void sliceOperations(ArrayList<Pair<Long,FrameBlock>> outlist, 
IndexRange range, int rowCut)
+       {
+               FrameBlock top=null, bottom=null;
+               Iterator<Pair<Long,FrameBlock>> p=outlist.iterator();
+               
+               if(range.rowStart<rowCut)
+                       top=(FrameBlock) p.next().getValue();
+               
+               if(range.rowEnd>=rowCut)
+                       bottom=(FrameBlock) p.next().getValue();
+               
+               if(getNumRows() > 0)
+               {
+                       int r=(int) range.rowStart;
+                       
+                       for(; r<Math.min(rowCut, range.rowEnd+1); r++)
+                       {
+                               Object[] row = new Object[(int) 
(range.colEnd-range.colStart+1)];
+                               for(int c=(int) range.colStart; 
c<range.colEnd+1; c++)
+                                       row[(int) (c-range.colStart)] = 
get(r,c);
+                               top.appendRow(row);
+                       }
+
+                       for(; r<=range.rowEnd; r++)
+                       {
+                               Object[] row = new Object[(int) 
(range.colEnd-range.colStart+1)];
+                               for(int c=(int) range.colStart; 
c<range.colEnd+1; c++)
+                                       row[(int) (c-range.colStart)] = 
get(r,c);
+                               bottom.appendRow(row);
+                       }
+               }
+
+       }
+
        /**
         * Appends the given argument frameblock 'that' to this frameblock by 
         * creating a deep copy to prevent side effects. For cbind, the frames
@@ -789,12 +842,18 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                return map;
        }
 
+       public void merge(CacheBlock that, boolean bDummy) 
+                       throws DMLRuntimeException
+       {
+               merge((FrameBlock)that);
+       }
+       
        /**
         * 
         * @param that
         * @throws DMLRuntimeException 
         */
-       public void merge(FrameBlock that ) 
+       public void merge(FrameBlock that) 
                throws DMLRuntimeException
        {
                //check for empty input source (nothing to merge)
@@ -802,35 +861,77 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        return;
                
                //check dimensions (before potentially copy to prevent implicit 
dimension change) 
-               if( getNumRows() != that.getNumRows() || getNumColumns() != 
that.getNumColumns() )
+               if ( getNumRows() != that.getNumRows() || getNumColumns() != 
that.getNumColumns() )
                        throw new DMLRuntimeException("Dimension mismatch on 
merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", 
source="+that.getNumRows()+"x"+that.getNumColumns()+")");
                
                //core frame block merge through cell copy
-               for( int i=0; i<getNumRows(); i++ ) {
+               for( int i=0; i<that.getNumRows(); i++ ) {
                        for( int j=0; j<getNumColumns(); j++ ) {
-                               switch( _schema.get(j) ) {
-                                       case STRING:  
-                                               if (that.get(i,j) != null)
-                                                       set(i,j,that.get(i, j));
-                                               break;
-                                       case BOOLEAN: 
-                                               if ((Boolean)that.get(i,j) != 
Boolean.getBoolean("false"))
-                                                       set(i,j,that.get(i, j));
-                                               break;
-                                       case INT:     
-                                               if ((Long)that.get(i,j) != 0)
-                                                       set(i,j,that.get(i, j));
-                                               break;
-                                       case DOUBLE:  
-                                               if ((Double)that.get(i,j) != 
0.0)
-                                                       set(i,j,that.get(i, j));
-                                               break;
-                                       default: throw new 
RuntimeException("Unsupported value type: "+_schema.get(j));
-                               }
+                               Object obj = 
UtilFunctions.objectToObject(getSchema().get(j), that.get(i,j), true);
+                               if (obj != null)                        // Do 
not update with "null" data
+                                       set(i, j,obj);
                        }
                }
                
        }
+       
+       /**
+        * This function ZERO OUT the data in the slicing window applicable for 
this block.
+        * 
+        * 
+        * @param result
+        * @param range
+        * @param complementary
+        * @param iRowStartSrc
+        * @param iRowStartDest
+        * @param brlen
+        * @param iMaxRowsToCopy
+        * 
+        */
+       public FrameBlock zeroOutOperations(FrameBlock result, IndexRange 
range, boolean complementary, int iRowStartSrc, int iRowStartDest, int brlen, 
int iMaxRowsToCopy)
+                       throws DMLRuntimeException 
+       {
+               int clen = getNumColumns();
+               
+               if(result==null)
+                       result=new FrameBlock(getSchema());
+               else 
+               {
+                       result.reset(0);
+                       result.setSchema(getSchema());
+               }
+               result.ensureAllocatedColumns(brlen);
+               
+               if(complementary)
+               {
+                       for(int r=(int) range.rowStart; 
r<=range.rowEnd&&r+iRowStartDest<brlen; r++)
+                       {
+                               for(int c=(int) range.colStart; 
c<=range.colEnd; c++)
+                                       result.set(r+iRowStartDest, c, 
get(r+iRowStartSrc,c));
+                       }
+               }else
+               {
+                       int r=iRowStartDest;
+                       for(; r<(int)range.rowStart && 
r-iRowStartDest<iMaxRowsToCopy ; r++)
+                               for(int c=0; c<clen; c++/*, offset++*/)
+                                       result.set(r, c, 
get(r+iRowStartSrc-iRowStartDest,c));
+                       
+                       for(; r<=(int)range.rowEnd && 
r-iRowStartDest<iMaxRowsToCopy ; r++)
+                       {
+                               for(int c=0; c<(int)range.colStart; c++)
+                                       result.set(r, c, 
get(r+iRowStartSrc-iRowStartDest,c));
+
+                               for(int c=(int)range.colEnd+1; c<clen; c++)
+                                       result.set(r, c, 
get(r+iRowStartSrc-iRowStartDest,c));
+                       }
+                       
+                       for(; r-iRowStartDest<iMaxRowsToCopy ; r++)
+                               for(int c=0; c<clen; c++)
+                                       result.set(r, c, 
get(r+iRowStartSrc-iRowStartDest,c));
+               }
+               
+               return result;
+       }
 
        
        ///////
@@ -887,6 +988,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                }
        }
        
+       
        /**
         * 
         */
@@ -949,7 +1051,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        _data[index] = value;
                }
                public void set(int rl, int ru, Array value) {
-                       System.arraycopy(((StringArray)value)._data, 0, _data, 
rl, ru-rl+1);
+                       set(rl, ru, value, 0);
                }
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((StringArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
@@ -995,7 +1097,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        _data[index] = (value!=null) ? value : false;
                }
                public void set(int rl, int ru, Array value) {
-                       System.arraycopy(((BooleanArray)value)._data, 0, _data, 
rl, ru-rl+1);
+                       set(rl, ru, value, 0);
                }
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((BooleanArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
@@ -1042,7 +1144,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        _data[index] = (value!=null) ? value : 0L;
                }
                public void set(int rl, int ru, Array value) {
-                       System.arraycopy(((LongArray)value)._data, 0, _data, 
rl, ru-rl+1);
+                       set(rl, ru, value, 0);
                }
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((LongArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
@@ -1089,7 +1191,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        _data[index] = (value!=null) ? value : 0d;
                }
                public void set(int rl, int ru, Array value) {
-                       System.arraycopy(((DoubleArray)value)._data, 0, _data, 
rl, ru-rl+1);
+                       set(rl,ru, value, 0);
                }
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((DoubleArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
@@ -1147,4 +1249,15 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        _mvValue = mvVal;
                }
        }
+
+       @Override
+       public ArrayList getPairList() {
+               return new ArrayList<Pair<Long, FrameBlock>>();
+       }
+
+       public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int 
brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
+       {
+               return OperationsOnMatrixValues.performSlice(ixrange, brlen, 
bclen, iix, jix, (FrameBlock)in);
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 54cba0f..9b40cbd 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -86,6 +86,7 @@ import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 
+
 public class MatrixBlock extends MatrixValue implements CacheBlock, 
Externalizable
 {
        private static final long serialVersionUID = 7319972089143154056L;
@@ -1734,6 +1735,12 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                Arrays.fill(denseBlock, ix2, ix2+rowLen, 0);
        }
 
+       public void merge(CacheBlock that, boolean appendOnly) 
+                       throws DMLRuntimeException
+       {
+               merge((MatrixBlock)that, appendOnly);
+       }
+
        
        /**
         * Merge disjoint: merges all non-zero values of the given input into 
the current
@@ -4034,7 +4041,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * 
         * @throws DMLRuntimeException 
         */
-       public MatrixBlock sliceOperations(int rl, int ru, int cl, int cu, 
MatrixBlock ret) 
+       public MatrixBlock sliceOperations(int rl, int ru, int cl, int cu, 
CacheBlock ret) 
                throws DMLRuntimeException 
        {       
                // check the validity of bounds
@@ -4046,7 +4053,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                
                // Output matrix will have the same sparsity as that of the 
input matrix.
                // (assuming a uniform distribution of non-zeros in the input)
-               MatrixBlock result=checkType(ret);
+               MatrixBlock result=checkType((MatrixBlock)ret);
                long estnnz= (long) 
((double)this.nonZeros/rlen/clen*(ru-rl+1)*(cu-cl+1));
                boolean result_sparsity = this.sparse && 
MatrixBlock.evalSparseFormatInMemory(ru-rl+1, cu-cl+1, estnnz);
                if(result==null)
@@ -6213,4 +6220,15 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
                public SparsityEstimate(){}
        }
+
+       public ArrayList<Pair<MatrixIndexes, MatrixBlock>> getPairList()
+       {
+               return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
+       }
+       
+       @SuppressWarnings("unchecked")
+       public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int 
brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
+       {
+               return OperationsOnMatrixValues.performSlice(ixrange, brlen, 
bclen, iix, jix, (MatrixBlock)in);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 03a36fe..be12228 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -21,10 +21,14 @@
 package org.apache.sysml.runtime.matrix.data;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
@@ -292,6 +296,17 @@ public class OperationsOnMatrixValues
                value1.aggregateBinaryOperations(value1, value2, valueOut, op);
        }
        
+       @SuppressWarnings("rawtypes")
+       public static ArrayList performSlice(IndexRange ixrange, int brlen, int 
bclen, int iix, int jix, MatrixBlock in) 
+                       throws DMLRuntimeException
+       {
+               IndexedMatrixValue imv = new IndexedMatrixValue(new 
MatrixIndexes(iix, jix), (MatrixBlock)in);
+               ArrayList<IndexedMatrixValue> outlist = new 
ArrayList<IndexedMatrixValue>();
+               performSlice(imv, ixrange, brlen, bclen, outlist);
+       
+               return SparkUtils.fromIndexedMatrixBlockToPair(outlist);
+       }
+
        /**
         * 
         * @param val
@@ -463,4 +478,144 @@ public class OperationsOnMatrixValues
                        }
                }
        }
+       
+       @SuppressWarnings("rawtypes")
+       public static ArrayList performSlice(IndexRange ixrange, int brlen, int 
bclen, int iix, int jix, FrameBlock in) 
+                       throws DMLRuntimeException
+       {
+               Pair<Long, FrameBlock> lfp = new Pair<Long, FrameBlock>(new 
Long(((iix-1)*brlen)+1), in);
+               ArrayList<Pair<Long, FrameBlock>> outlist = new 
ArrayList<Pair<Long, FrameBlock>>();
+               performSlice(lfp, ixrange, brlen, bclen, outlist);
+       
+               return outlist;
+       }
+
+       
+       /**
+        * This function will get slice of the input frame block overlapping in 
overall slice(Range), slice has requested for.
+        * 
+        * @param val
+        * @param range
+        * @param brlen
+        * @param bclen
+        * @param outlist
+        * @throws DMLRuntimeException
+        */
+       public static void performSlice(Pair<Long,FrameBlock> in, IndexRange 
ixrange, int brlen, int bclen, ArrayList<Pair<Long,FrameBlock>> outlist) 
+               throws DMLRuntimeException
+       {
+               long index = in.getKey();
+               FrameBlock block = in.getValue();
+               
+               // Get Block indexes (rows and columns boundaries)
+               long cellIndexTopRow = index;
+               long cellIndexBottomRow = index+block.getNumRows()-1;
+               long cellIndexLeftCol = 1;
+               long cellIndexRightCol = block.getNumColumns();
+               
+               // Calculate block boundaries with range of slice to be 
performed (Global index)
+               long cellIndexOverlapTop = Math.max(cellIndexTopRow, 
ixrange.rowStart);
+               long cellIndexOverlapBottom = Math.min(cellIndexBottomRow, 
ixrange.rowEnd);
+               long cellIndexOverlapLeft = Math.max(cellIndexLeftCol, 
ixrange.colStart);
+               long cellIndexOverlapRight = Math.min(cellIndexRightCol, 
ixrange.colEnd);
+               
+               //check if block is outside the indexing range
+               if(cellIndexOverlapTop>cellIndexOverlapBottom || 
cellIndexOverlapLeft>cellIndexOverlapRight) {
+                       return;
+               }
+               
+               // Create IndexRange for the slice to be performed on this 
block.
+               IndexRange tmpRange = new IndexRange(
+                               cellIndexOverlapTop - index,
+                               cellIndexOverlapBottom - index,
+                               cellIndexOverlapLeft -1,
+                               cellIndexOverlapRight - 1);
+               
+               // Get Top Row and Left column cutting point. 
+               int rowCut=(int)(ixrange.rowStart-index);
+               
+               // Get indices for result block
+               long 
resultBlockIndexTop=UtilFunctions.computeBlockIndex(cellIndexOverlapTop, brlen);
+               long 
resultBlockIndexBottom=UtilFunctions.computeBlockIndex(cellIndexOverlapBottom, 
brlen);
+               
+               //allocate space for the output value
+               for(long r=resultBlockIndexTop; r<=resultBlockIndexBottom; r++)
+               {
+                       List<ValueType> schema = 
UtilFunctions.getSubSchema(block.getSchema(), tmpRange.colStart, 
tmpRange.colEnd);
+                       long iResultIndex = (r-1)*brlen+tmpRange.rowStart;
+                       Pair<Long,FrameBlock> out=new Pair<Long,FrameBlock>(new 
Long(iResultIndex+1), new FrameBlock(schema));
+                       outlist.add(out);
+               }
+               
+               //execute actual slice operation
+               block.sliceOperations(outlist, tmpRange, rowCut);
+       }
+
+       /**
+        * 
+        * @param in
+        * @param ixrange
+        * @param brlen
+        * @param bclen
+        * @param rlen
+        * @param clen
+        * @param outlist
+        * @throws DMLRuntimeException
+        */
+       public static void performShift(Pair<Long,FrameBlock> in, IndexRange 
ixrange, int brlenLeft, int clenLeft/*, int bclen*/, long rlen, long clen, 
ArrayList<Pair<Long,FrameBlock>> outlist) 
+               throws DMLRuntimeException
+       {
+               Long ix = in.getKey();
+               FrameBlock fb = in.getValue();
+               long start_lhs_globalRowIndex = ixrange.rowStart + (ix-1);
+               long start_lhs_globalColIndex = ixrange.colStart;
+               long end_lhs_globalRowIndex = start_lhs_globalRowIndex + 
fb.getNumRows() - 1;
+               long end_lhs_globalColIndex = ixrange.colEnd;
+               
+               long start_lhs_rowIndex = 
UtilFunctions.computeBlockIndex(start_lhs_globalRowIndex, brlenLeft);
+               long end_lhs_rowIndex = 
UtilFunctions.computeBlockIndex(end_lhs_globalRowIndex, brlenLeft);
+
+               for(long leftRowIndex = start_lhs_rowIndex; leftRowIndex <= 
end_lhs_rowIndex; leftRowIndex++) {
+                               
+                       // Calculate global index of right hand side block
+                       long lhs_rl = Math.max((leftRowIndex-1)*brlenLeft+1, 
start_lhs_globalRowIndex);
+                       long lhs_ru = Math.min(leftRowIndex*brlenLeft, 
end_lhs_globalRowIndex);
+                       long lhs_cl = start_lhs_globalColIndex;
+                       long lhs_cu = end_lhs_globalColIndex;
+                       
+                       int lhs_lrl = UtilFunctions.computeCellInBlock(lhs_rl, 
brlenLeft);
+                       int lhs_lru = UtilFunctions.computeCellInBlock(lhs_ru, 
brlenLeft);
+                       int lhs_lcl = (int)lhs_cl-1;
+                       int lhs_lcu = (int)lhs_cu-1;
+                       
+                       long rhs_rl = lhs_rl - (ixrange.rowStart-1) - (ix-1);
+                       long rhs_ru = rhs_rl + (lhs_ru - lhs_rl);
+                       long rhs_cl = lhs_cl - ixrange.colStart + 1;
+                       long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
+                       
+                       // local indices are 0 (zero) based.
+                       int rhs_lrl = (int) 
(UtilFunctions.computeCellInBlock(rhs_rl, fb.getNumRows()));
+                       int rhs_lru = (int) 
(UtilFunctions.computeCellInBlock(rhs_ru, fb.getNumRows()));
+                       int rhs_lcl = (int)rhs_cl-1;
+                       int rhs_lcu = (int)rhs_cu-1;
+                                                                               
                                                                                
+                       FrameBlock slicedRHSBlk = fb.sliceOperations(rhs_lrl, 
rhs_lru, rhs_lcl, rhs_lcu, new FrameBlock());
+                       
+                       int lbclen = clenLeft;
+                       
+                       List<ValueType> schemaPartialLeft = 
Collections.nCopies(lhs_lcl, ValueType.STRING);
+                       List<ValueType> schemaRHS = 
UtilFunctions.getSubSchema(fb.getSchema(), rhs_lcl, rhs_lcl-lhs_lcl+lhs_lcu);
+                       List<ValueType> schema = new 
ArrayList<ValueType>(schemaPartialLeft);
+                       schema.addAll(schemaRHS);
+                       List<ValueType> schemaPartialRight = 
Collections.nCopies(lbclen-schema.size(), ValueType.STRING);
+                       schema.addAll(schemaPartialRight);
+                       FrameBlock resultBlock = new FrameBlock(schema);
+                       int iRHSRows = 
(int)(leftRowIndex<=rlen/brlenLeft?brlenLeft:rlen-(rlen/brlenLeft)*brlenLeft);
+                       resultBlock.ensureAllocatedColumns(iRHSRows);
+                       
+                       resultBlock = 
resultBlock.leftIndexingOperations(slicedRHSBlk, lhs_lrl, lhs_lru, lhs_lcl, 
lhs_lcu, new FrameBlock());
+                       outlist.add(new Pair<Long, 
FrameBlock>((leftRowIndex-1)*brlenLeft+1, resultBlock));
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
index 423607c..4d4a975 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
@@ -92,7 +92,7 @@ public class OutputInfo implements Serializable
                else 
                        throw new DMLRuntimeException("Unrecognized output 
info: " + oi);
        }
-
+               
        public static OutputInfo stringToOutputInfo (String str) {
                if ( str.equalsIgnoreCase("textcell")) {
                        return TextCellOutputInfo;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java 
b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
index e7aebb4..b059a06 100644
--- 
a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
+++ 
b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
@@ -201,7 +201,8 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
 
        @Override
        public void writeUTF(String s) throws IOException {
-               throw new IOException("Not supported.");
+               byte[] strBytes = s.getBytes("UTF-8");
+               write(strBytes, 0, strBytes.length);
        }
 
 

Reply via email to