Repository: systemml
Updated Branches:
  refs/heads/master 4cf95c92e -> 0a984a43b


[SYSTEMML-1906] Performance codegen row ops over compressed matrices

This patch improves the performance of codegen row-wise operations over
compressed dense and sparse matrices. So far we used dense/sparse row
iterators over the compressed matrix block, which internally reused the
existing column group iterators. However, for the OLE and RLE encoding
schemes, these iterators were realized via a set of value iterators,
which did not perform very well. The key challenge is the translation
from the value-based, column-wise compressed form to uncompressed rows. 

We now use dedicated row iterators for the individual column encoding
schemes. For DDC and UC, these are trivial. For OLE and RLE, however, we
determine a vector of dictionary codes per logical segment (via a single
pass over all value offset lists) and simply read out the value tuples
for these codes on the individual next calls. Furthermore, we now use
static task partitioning (with segment alignment) for multi-threaded row
operations to avoid unnecessary iterator initialization overhead. On the
Airline78 (dense) and Mnist8m (sparse) dataset, this patch improves
performance from 4.1s to 1s, and from 66s to 21s, respectively


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0a984a43
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0a984a43
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0a984a43

Branch: refs/heads/master
Commit: 0a984a43b1062aaf58bf7a2ce019adef264ba459
Parents: 4cf95c9
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Thu Sep 14 00:26:45 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Thu Sep 14 00:27:04 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     |  33 +--
 .../apache/sysml/runtime/compress/ColGroup.java |  19 ++
 .../sysml/runtime/compress/ColGroupDDC.java     |  47 +++-
 .../sysml/runtime/compress/ColGroupDDC1.java    |  12 +-
 .../sysml/runtime/compress/ColGroupDDC2.java    |  14 +-
 .../sysml/runtime/compress/ColGroupOLE.java     |  67 ++++-
 .../sysml/runtime/compress/ColGroupRLE.java     |  78 +++++-
 .../runtime/compress/ColGroupUncompressed.java  |  51 ++++
 .../runtime/compress/CompressedMatrixBlock.java |  76 ++----
 .../CompressedRowAggregateLargeTest.java        | 270 +++++++++++++++++++
 .../functions/codegen/ZPackageSuite.java        |   1 +
 11 files changed, 589 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index f1cef34..659059e 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
 import java.util.stream.IntStream;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
@@ -185,20 +186,25 @@ public abstract class SpoofRowwise extends SpoofOperator
                        && LibSpoofPrimitives.isFlipOuter(out.getNumRows(), 
out.getNumColumns());
                
                //input preparation
+               MatrixBlock a = inputs.get(0);
                SideInput[] b = prepInputMatrices(inputs, 1, inputs.size()-1, 
true, _tB1);
                double[] scalars = prepInputScalars(scalarObjects);
                
                //core parallel execute
                ExecutorService pool = Executors.newFixedThreadPool( k );
-               int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
+               int nk = (a instanceof CompressedMatrixBlock) ? k :
+                       UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
                int blklen = (int)(Math.ceil((double)m/nk));
+               if( a instanceof CompressedMatrixBlock )
+                       blklen = BitmapEncoder.getAlignedBlocksize(blklen);
+               
                try
                {
                        if( _type.isColumnAgg() || _type == RowType.FULL_AGG ) {
                                //execute tasks
                                ArrayList<ParColAggTask> tasks = new 
ArrayList<ParColAggTask>();
                                for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new 
ParColAggTask(inputs.get(0), b, scalars, n, n2, i*blklen, 
Math.min((i+1)*blklen, m)));
+                                       tasks.add(new ParColAggTask(a, b, 
scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
                                List<Future<double[]>> taskret = 
pool.invokeAll(tasks); 
                                //aggregate partial results
                                int len = _type.isColumnAgg() ? 
out.getNumRows()*out.getNumColumns() : 1;
@@ -210,7 +216,7 @@ public abstract class SpoofRowwise extends SpoofOperator
                                //execute tasks
                                ArrayList<ParExecTask> tasks = new 
ArrayList<ParExecTask>();
                                for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new 
ParExecTask(inputs.get(0), b, out, scalars, n, n2, i*blklen, 
Math.min((i+1)*blklen, m)));
+                                       tasks.add(new ParExecTask(a, b, out, 
scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
                                List<Future<Long>> taskret = 
pool.invokeAll(tasks);
                                //aggregate nnz, no need to aggregate results
                                long nnz = 0;
@@ -304,24 +310,9 @@ public abstract class SpoofRowwise extends SpoofOperator
                if( a.isEmptyBlock(false) )
                        return;
                
-               if( !a.isInSparseFormat() ) { //DENSE
-                       Iterator<double[]> iter = a.getDenseRowIterator(rl, ru);
-                       for( int i=rl; iter.hasNext(); i++ ) {
-                               genexec(iter.next(), 0, b, scalars, c, n, i);
-                       }
-               }
-               else { //SPARSE
-                       Iterator<SparseRow> iter = a.getSparseRowIterator(rl, 
ru);
-                       SparseRow empty = new SparseRowVector(1);
-                       for( int i=rl; iter.hasNext(); i++ ) {
-                               SparseRow row = iter.next();
-                               if( !row.isEmpty() )
-                                       genexec(row.values(), 
-                                               row.indexes(), 0, b, scalars, 
c, row.size(), n, i);
-                               else
-                                       genexec(empty.values(), 
-                                               empty.indexes(), 0, b, scalars, 
c, 0, n, i);
-                       }
+               Iterator<double[]> iter = a.getDenseRowIterator(rl, ru);
+               for( int i=rl; iter.hasNext(); i++ ) {
+                       genexec(iter.next(), 0, b, scalars, c, n, i);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index dbed2b4..ba6509c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -261,10 +261,29 @@ public abstract class ColGroup implements Serializable
        public abstract void unaryAggregateOperations(AggregateUnaryOperator 
op, MatrixBlock result)
                throws DMLRuntimeException;
        
+       /**
+        * Create a column group iterator for a row index range.
+        * 
+        * @param rl row lower index, inclusive
+        * @param ru row upper index, exclusive
+        * @param inclZeros include zero values into scope of iterator
+        * @param rowMajor use a row major iteration order
+        * @return an iterator instance
+        */
        public abstract Iterator<IJV> getIterator(int rl, int ru,
                        boolean inclZeros, boolean rowMajor);
        
        /**
+        * Create a dense row iterator for a row index range. This iterator
+        * implies the inclusion of zeros and row-major iteration order.
+        * 
+        * @param rl row lower index, inclusive
+        * @param ru row upper index, exclusive
+        * @return an iterator instance
+        */
+       public abstract Iterator<double[]> getRowIterator(int rl, int ru);
+       
+       /**
         * Count the number of non-zeros per row
         * 
         * @param rnnz non-zeros per row

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
index 4bf7c20..3618651 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
@@ -214,7 +214,16 @@ public abstract class ColGroupDDC extends ColGroupValue
                        }       
                }
        }
-
+       
+       /**
+        * Generic get value for byte-length-agnostic access
+        * to first column.
+        * 
+        * @param r global row index
+        * @return value
+        */
+       protected abstract double getData(int r);
+       
        /**
         * Generic get value for byte-length-agnostic access.
         * 
@@ -233,6 +242,8 @@ public abstract class ColGroupDDC extends ColGroupValue
         */
        protected abstract void setData(int r, int code);
        
+       protected abstract int getCode(int r);
+       
        @Override
        public long estimateInMemorySize() {
                return super.estimateInMemorySize();
@@ -244,6 +255,11 @@ public abstract class ColGroupDDC extends ColGroupValue
                return new DDCIterator(rl, ru, inclZeros);
        }
        
+       @Override
+       public Iterator<double[]> getRowIterator(int rl, int ru) {
+               return new DDCRowIterator(rl, ru);
+       }
+       
        private class DDCIterator implements Iterator<IJV>
        {
                //iterator configuration 
@@ -288,4 +304,33 @@ public abstract class ColGroupDDC extends ColGroupValue
                        while( !_inclZeros && _value==0);
                }
        }
+       
+       private class DDCRowIterator implements Iterator<double[]>
+       {
+               //iterator configuration 
+               private final int _ru;
+               //iterator state
+               private final double[] _buff = new double[getNumCols()]; 
+               private int _rpos = -1;
+               
+               public DDCRowIterator(int rl, int ru) {
+                       _ru = ru;
+                       _rpos = rl;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return (_rpos < _ru);
+               }
+
+               @Override
+               public double[] next() {
+                       //copy entire value tuple and 
+                       final int clen = getNumCols();
+                       System.arraycopy(getValues(), getCode(_rpos)*clen, 
_buff, 0, clen);
+                       //advance position to next row
+                       _rpos++;
+                       return _buff;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index d003aa5..89ca931 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -83,6 +83,11 @@ public class ColGroupDDC1 extends ColGroupDDC
        }
        
        @Override
+       protected double getData(int r) {
+               return _values[(_data[r]&0xFF)];
+       }
+       
+       @Override
        protected double getData(int r, int colIx) {
                return _values[(_data[r]&0xFF)*getNumCols()+colIx];
        }
@@ -93,6 +98,11 @@ public class ColGroupDDC1 extends ColGroupDDC
        }
        
        @Override
+       protected int getCode(int r) {
+               return (_data[r]&0xFF);
+       }
+       
+       @Override
        public void write(DataOutput out) throws IOException {
                int numCols = getNumCols();
                int numVals = getNumValues();
@@ -288,7 +298,7 @@ public class ColGroupDDC1 extends ColGroupDDC
                //temporary array also avoids false sharing in multi-threaded 
environments
                double[] vals = allocDVector(numVals, true);
                for( int i=0; i<nrow; i++ )
-                       vals[_data[i]&0xFF] += a.getData(i, 0);
+                       vals[_data[i]&0xFF] += a.getData(i);
                
                //post-scaling of pre-aggregate with distinct values
                postScaling(vals, c);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
index ec7aa18..d9c851d 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
@@ -85,16 +85,26 @@ public class ColGroupDDC2 extends ColGroupDDC
        }
        
        @Override
+       protected double getData(int r) {
+               return _values[_data[r]];
+       }
+       
+       @Override
        protected double getData(int r, int colIx) {
                return _values[_data[r]*getNumCols()+colIx];
        }
-
+       
        @Override
        protected void setData(int r, int code) {
                _data[r] = (char)code;
        }
        
        @Override
+       protected int getCode(int r) {
+               return _data[r];
+       }
+       
+       @Override
        public void write(DataOutput out) throws IOException {
                int numCols = getNumCols();
                int numVals = getNumValues();
@@ -281,7 +291,7 @@ public class ColGroupDDC2 extends ColGroupDDC
                        //temporary array also avoids false sharing in 
multi-threaded environments
                        double[] vals = allocDVector(numVals, true);
                        for( int i=0; i<nrow; i++ ) {
-                               vals[_data[i]] += a.getData(i, 0);
+                               vals[_data[i]] += a.getData(i);
                        }
                        
                        //post-scaling of pre-aggregate with distinct values

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index 7574c73..1f2cd50 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -473,7 +473,7 @@ public class ColGroupOLE extends ColGroupOffset
                        //iterate over bitmap blocks and add partial results
                        double vsum = 0;
                        for( int j = boff+1; j < boff+1+_data[boff]; j++ )
-                               vsum += a.getData(_data[j], 0);
+                               vsum += a.getData(_data[j]);
                        
                        //scale partial results by values and write results
                        for( int j = 0; j < numCols; j++ )
@@ -780,7 +780,12 @@ public class ColGroupOLE extends ColGroupOffset
        public Iterator<Integer> getIterator(int k, int rl, int ru) {
                return new OLEValueIterator(k, rl, ru);
        }
-
+       
+       @Override
+       public Iterator<double[]> getRowIterator(int rl, int ru) {
+               return new OLERowIterator(rl, ru);
+       }
+       
        private class OLEValueIterator implements Iterator<Integer>
        {
                private final int _ru;
@@ -848,4 +853,62 @@ public class ColGroupOLE extends ColGroupOffset
                        }
                }
        }
+       
+       private class OLERowIterator implements Iterator<double[]>
+       {
+               //iterator configuration 
+               private final int _ru;
+               //iterator state
+               private final double[] _buff = new double[getNumCols()]; 
+               private final int[] _apos;
+               private final int[] _vcodes;
+               private int _rpos = -1;
+               
+               public OLERowIterator(int rl, int ru) {
+                       _ru = ru;
+                       _rpos = rl;
+                       _apos = skipScan(getNumValues(), rl);
+                       _vcodes = new 
int[Math.min(BitmapEncoder.BITMAP_BLOCK_SZ, ru-rl)];
+                       getNextSegment();
+               }
+               
+               @Override
+               public boolean hasNext() {
+                       return (_rpos < _ru);
+               }
+               
+               @Override
+               public double[] next() {
+                       //copy entire value tuple or reset to zero
+                       int ix = _rpos%BitmapEncoder.BITMAP_BLOCK_SZ;
+                       final int clen = getNumCols();
+                       if( _vcodes[ix] >= 0 )
+                               System.arraycopy(getValues(), _vcodes[ix]*clen, 
_buff, 0, clen);
+                       else
+                               Arrays.fill(_buff, 0);
+                       //advance position to next row
+                       _rpos++;
+                       if( _rpos%BitmapEncoder.BITMAP_BLOCK_SZ==0 && _rpos<_ru 
)
+                               getNextSegment();
+                       return _buff;
+               }
+               
+               public void getNextSegment() {
+                       //materialize value codes for entire segment in a 
+                       //single pass over all values (store value code by pos)
+                       Arrays.fill(_vcodes, -1);
+                       final int numVals = getNumValues();
+                       for (int k = 0; k < numVals; k++)  {
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               int bix = _apos[k];
+                               if( bix < blen ) {
+                                       int slen = _data[boff+bix];
+                                       for(int blckIx = 1; blckIx <= slen; 
blckIx++)
+                                               _vcodes[_data[boff+bix + 
blckIx]] = k;
+                                       _apos[k] += slen+1;
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index 964e513..478fd31 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -790,11 +790,17 @@ public class ColGroupRLE extends ColGroupOffset
                return new RLEValueIterator(k, 0, getNumRows());
        }
        
+       
        @Override
        public Iterator<Integer> getIterator(int k, int rl, int ru) {
                return new RLEValueIterator(k, rl, ru);
        }
-
+       
+       @Override
+       public Iterator<double[]> getRowIterator(int rl, int ru) {
+               return new RLERowIterator(rl, ru);
+       }
+       
        private class RLEValueIterator implements Iterator<Integer>
        {
                private final int _ru;
@@ -848,4 +854,74 @@ public class ColGroupRLE extends ColGroupOffset
                        }
                }
        }
+       
+       private class RLERowIterator implements Iterator<double[]>
+       {
+               //iterator configuration 
+               private final int _ru;
+               //iterator state
+               private final double[] _buff = new double[getNumCols()];
+               private final int[] _astart;
+               private final int[] _apos;
+               private final int[] _vcodes;
+               private int _rpos = -1;
+               
+               public RLERowIterator(int rl, int ru) {
+                       _ru = ru;
+                       _rpos = rl;
+                       _astart = new int[getNumValues()];
+                       _apos = skipScan(getNumValues(), rl, _astart);
+                       _vcodes = new 
int[Math.min(BitmapEncoder.BITMAP_BLOCK_SZ, ru-rl)];
+                       getNextSegment();
+               }
+               
+               @Override
+               public boolean hasNext() {
+                       return (_rpos < _ru);
+               }
+               
+               @Override
+               public double[] next() {
+                       //copy entire value tuple or reset to zero
+                       int ix = _rpos%BitmapEncoder.BITMAP_BLOCK_SZ;
+                       final int clen = getNumCols();
+                       if( _vcodes[ix] >= 0 )
+                               System.arraycopy(getValues(), _vcodes[ix]*clen, 
_buff, 0, clen);
+                       else
+                               Arrays.fill(_buff, 0);
+                       //advance position to next row
+                       _rpos++;
+                       if( _rpos%BitmapEncoder.BITMAP_BLOCK_SZ==0 && _rpos<_ru 
)
+                               getNextSegment();
+                       return _buff;
+               }
+               
+               public void getNextSegment() {
+                       //materialize value codes for entire segment in a 
+                       //single pass over all values (store value code by pos)
+                       Arrays.fill(_vcodes, -1);
+                       final int numVals = getNumValues();
+                       final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
+                       for (int k = 0; k < numVals; k++) {
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               int bix = _apos[k];
+                               int start = _astart[k];
+                               int end = (_rpos/blksz+1)*blksz;
+                               while( bix < blen && start < end ) {
+                                       int lstart = _data[boff + bix];
+                                       int llen = _data[boff + bix + 1];
+                                       //set codes of entire run, with 
awareness of unaligned runs/segments
+                                       Arrays.fill(_vcodes, 
Math.min(Math.max(_rpos, start+lstart), end)-_rpos, 
+                                               
Math.min(start+lstart+llen,end)-_rpos, k);
+                                       if( start+lstart+llen >= end )
+                                               break;
+                                       start += lstart + llen;
+                                       bix += 2;
+                               }
+                               _apos[k] = bix;
+                               _astart[k] = start;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 7e54e4e..b27215a 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
@@ -416,6 +417,11 @@ public class ColGroupUncompressed extends ColGroup
                return new UCIterator(rl, ru, inclZeros);
        }
        
+       @Override
+       public Iterator<double[]> getRowIterator(int rl, int ru) {
+               return new UCRowIterator(rl, ru);
+       }
+       
        private class UCIterator implements Iterator<IJV>
        {
                //iterator configuration
@@ -460,4 +466,49 @@ public class ColGroupUncompressed extends ColGroup
                        while( !_inclZeros && _value==0 );
                }
        }
+       
+       private class UCRowIterator implements Iterator<double[]>
+       {
+               //iterator configuration
+               private final int _ru;
+               //iterator state
+               private final double[] _buff = new double[getNumCols()];
+               private int _rpos = -1;
+               
+               public UCRowIterator(int rl, int ru) {
+                       _ru = ru;
+                       _rpos = rl;
+               }
+               
+               @Override
+               public boolean hasNext() {
+                       return (_rpos < _ru);
+               }
+               
+               @Override
+               public double[] next() {
+                       //copy entire dense/sparse row
+                       if( _data.isAllocated() ) {
+                               if( _data.isInSparseFormat() ) {
+                                       Arrays.fill(_buff, 0); //reset
+                                       if( 
!_data.getSparseBlock().isEmpty(_rpos) ) {
+                                               SparseBlock sblock = 
_data.getSparseBlock();
+                                               int apos = sblock.pos(_rpos);
+                                               int alen = sblock.size(_rpos);
+                                               int[] aix = 
sblock.indexes(_rpos);
+                                               double[] avals = 
sblock.values(_rpos);
+                                               for(int k=apos; k<apos+alen; 
k++)
+                                                       _buff[aix[k]] = 
avals[k];
+                                       }
+                               }
+                               else {
+                                       final int clen = getNumCols();
+                                       System.arraycopy(_data.getDenseBlock(), 
_rpos*clen, _buff, 0, clen);
+                               }
+                       }
+                       //advance position to next row
+                       _rpos++;
+                       return _buff;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 418394b..3299594 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -2305,9 +2305,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                protected final int _ru;
                
                //iterator state
-               private Iterator<IJV>[] _iters = null;
-               protected final int[] _ixbuff = new int[clen];
-               protected final double[] _vbuff = new double[clen];
+               protected Iterator<double[]>[] _iters = null;
                protected int _rpos;
                
                @SuppressWarnings("unchecked")
@@ -2318,43 +2316,16 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        //initialize array of column group iterators
                        _iters = new Iterator[_colGroups.size()];
                        for( int i=0; i<_colGroups.size(); i++ )
-                               _iters[i] = _colGroups.get(i).getIterator(
-                                       _rl, _ru, true, true);
-                       Arrays.fill(_ixbuff, -1);
+                               _iters[i] = 
_colGroups.get(i).getRowIterator(_rl, _ru);
                        
                        //get initial row
-                       _rpos = rl-1;
-                       getNextRow();
+                       _rpos = rl;
                }
                
                @Override
                public boolean hasNext() {
                        return (_rpos < _ru);
                }
-               
-               @Override
-               public abstract T next();
-               
-               protected void getNextRow() {
-                       _rpos++;
-                       //read iterators if necessary
-                       for(int j=0; j<_iters.length; j++) {
-                               ColGroup grp = _colGroups.get(j);
-                               if( _ixbuff[grp.getColIndex(0)] < _rpos ) {
-                                       if( _iters[j].hasNext() ) {
-                                               for( int k=0; 
k<grp.getNumCols(); k++ ) {
-                                                       IJV cell = 
_iters[j].next();
-                                                       _ixbuff[cell.getJ()] = 
cell.getI();
-                                                       _vbuff[cell.getJ()] = 
cell.getV();
-                                               }
-                                       }
-                                       else {
-                                               for( int k=0; 
k<grp.getNumCols(); k++ )
-                                                       
_ixbuff[grp.getColIndex(k)] = _ru;
-                                       }
-                               }
-                       }
-               }
        }
        
        private class DenseRowIterator extends RowIterator<double[]>
@@ -2367,13 +2338,15 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                @Override
                public double[] next() {
-                       if( !hasNext() )
-                               throw new RuntimeException("No more rows in row 
partition ["+_rl+","+_ru+")");
-                       //copy currently buffered row entries
-                       for( int j=0; j<clen; j++ )
-                               _ret[j] = (_ixbuff[j] == _rpos) ? _vbuff[j] : 0;
+                       //copy group rows into consolidated row
+                       for(int j=0; j<_iters.length; j++) {
+                               ColGroup grp = _colGroups.get(j);
+                               double[] row = _iters[j].next();
+                               for( int k=0; k<row.length; k++ )
+                                       _ret[grp.getColIndex(k)] = row[k];
+                       }
                        //advance to next row and return buffer
-                       getNextRow();
+                       _rpos++;
                        return _ret;
                }
        }
@@ -2381,27 +2354,28 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        private class SparseRowIterator extends RowIterator<SparseRow>
        {
                private final SparseRowVector _ret = new SparseRowVector(clen);
+               private final double[] _tmp = new double[clen];
                
                public SparseRowIterator(int rl, int ru) {
                        super(rl, ru);
                }
-
-               @Override
-               public boolean hasNext() {
-                       return (_rpos < _ru);
-               }
-
+               
                @Override
                public SparseRow next() {
-                       if( !hasNext() )
-                               throw new RuntimeException("No more rows in row 
partition ["+_rl+","+_ru+")");
-                       //copy currently buffered row entries
+                       //copy group rows into consolidated dense vector
+                       //to avoid binary search+shifting or final sort
+                       for(int j=0; j<_iters.length; j++) {
+                               ColGroup grp = _colGroups.get(j);
+                               double[] row = _iters[j].next();
+                               for( int k=0; k<row.length; k++ )
+                                       _tmp[grp.getColIndex(k)] = row[k];
+                       }
+                       //append non-zero values to consolidated sparse row
                        _ret.setSize(0);
-                       for( int j=0; j<clen; j++ )
-                               if( _ixbuff[j] == _rpos )
-                                       _ret.append(j, _vbuff[j]);
+                       for(int i=0; i<_tmp.length; i++)
+                               _ret.append(i, _tmp[i]);
                        //advance to next row and return buffer
-                       getNextRow();
+                       _rpos++;
                        return _ret;
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
new file mode 100644
index 0000000..13f99cc
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.codegen;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class CompressedRowAggregateLargeTest extends AutomatedTestBase 
+{      
+       private static final String TEST_NAME1 = "CompressedRowAggregateMain";
+       private static final String TEST_DIR = "functions/codegen/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
CompressedRowAggregateLargeTest.class.getSimpleName() + "/";
+       private final static String TEST_CONF = 
"SystemML-config-codegen-compress.xml";
+       private final static File   TEST_CONF_FILE = new File(SCRIPT_DIR + 
TEST_DIR, TEST_CONF);
+       
+       private static final int rows = 7*BitmapEncoder.BITMAP_BLOCK_SZ;
+       private static final int cols = 7;
+       private static final double sparsity1 = 0.9;
+       private static final double sparsity2 = 0.1;
+       private static final double sparsity3 = 0.0;
+       private static final double eps = Math.pow(10, -4); //large values
+       
+       public enum SparsityType {
+               DENSE,
+               SPARSE,
+               EMPTY,
+       }
+       
+       public enum ValueType {
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
+       }
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration( TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+       }
+               
+       @Test
+       public void testCompressedRowAggregateMainDenseConstCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.CONST, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRandCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRand2CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRand3CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseConstCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.CONST, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRandCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRand2CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRand3CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyConstCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.CONST, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRandCP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRand2CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND_ROUND_DDC, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRand3CP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND_ROUND_OLE, ExecType.CP );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseConstSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.CONST, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRandSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRand2SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainDenseRand3SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseConstSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.CONST, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRandSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRand2SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainSparseRand3SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyConstSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.CONST, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRandSP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRand2SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+       }
+       
+       @Test
+       public void testCompressedRowAggregateMainEmptyRand3SP() {
+               testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, 
ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+       }
+       
+       private void testCompressedRowAggregate(String testname, SparsityType 
stype, ValueType vtype, ExecType et)
+       {       
+               boolean oldRewrites = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; 
break;
+               }
+       
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == 
RUNTIME_PLATFORM.HYBRID_SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try
+               {
+                       OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = true;
+                       TestConfiguration config = 
getTestConfiguration(testname);
+                       loadTestConfiguration(config);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + testname + ".dml";
+                       programArgs = new String[]{"-explain", "-stats", 
+                                       "-args", input("X"), output("R") };
+                       
+                       fullRScriptName = HOME + testname + ".R";
+                       rCmd = getRCmd(inputDir(), expectedDir());              
        
+
+                       //generate input data
+                       double sparsity = -1;
+                       switch( stype ){
+                               case DENSE: sparsity = sparsity1; break;
+                               case SPARSE: sparsity = sparsity2; break;
+                               case EMPTY: sparsity = sparsity3; break;
+                       }
+                       
+                       //generate input data
+                       double min = (vtype==ValueType.CONST)? 10 : -10;
+                       double[][] X = TestUtils.generateTestMatrix(rows, cols, 
min, 10, sparsity, 7);
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
+                               X = TestUtils.round(X);
+                       }
+                       writeInputMatrixWithMTD("X", X, true);
+                       
+                       //run tests
+                       runTest(true, false, null, -1); 
+                       runRScript(true); 
+                       
+                       //compare matrices 
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
+                       HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");     
+                       TestUtils.compareMatrices(dmlfile, rfile, eps, 
"Stat-DML", "Stat-R");
+                       
Assert.assertTrue(heavyHittersContainsSubString("spoofRA", 2) 
+                               || heavyHittersContainsSubString("sp_spoofRA", 
2));
+                       
Assert.assertTrue(heavyHittersContainsSubString("compress")
+                               || 
heavyHittersContainsSubString("sp_compress"));
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = 
oldRewrites;
+                       OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+                       OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
+       }       
+
+       /**
+        * Override default configuration with custom test configuration to 
ensure
+        * scratch space and local temporary directory locations are also 
updated.
+        */
+       @Override
+       protected File getConfigTemplateFile() {
+               // Instrumentation in this test's output log to show custom 
configuration file used for template.
+               System.out.println("This test case overrides default 
configuration with " + TEST_CONF_FILE.getPath());
+               return TEST_CONF_FILE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
index fda71a5..75b66a1 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
@@ -42,6 +42,7 @@ import org.junit.runners.Suite;
        CompressedMultiAggregateTest.class,
        CompressedOuterProductTest.class,
        CompressedRowAggregateTest.class,
+       CompressedRowAggregateLargeTest.class,
        CPlanComparisonTest.class,
        CPlanVectorPrimitivesTest.class,
        DAGCellwiseTmplTest.class,

Reply via email to