This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new b9f4686 [SYSTEMDS-3245] CSR sparse support for tansformapply
b9f4686 is described below
commit b9f4686e2153ff44e41378d76301a6d24ffd1666
Author: arnabp <[email protected]>
AuthorDate: Mon Dec 13 09:53:41 2021 +0100
[SYSTEMDS-3245] CSR sparse support for tansformapply
This patch adds supports for CSR sparse output for
transformapply. CSR is more efficient for memory-bound apply
phase. Multi-threaded apply is now 30% faster for dummycoding
100 columns each having 5M rows and 100K distinct values.
---
.../runtime/transform/encode/ColumnEncoder.java | 18 +++++-
.../transform/encode/ColumnEncoderDummycode.java | 41 ++++++++++----
.../transform/encode/ColumnEncoderPassThrough.java | 23 +++++++-
.../transform/encode/MultiColumnEncoder.java | 64 ++++++++++++++--------
4 files changed, 107 insertions(+), 39 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index f69da6f..9db3772 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -41,6 +41,8 @@ import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.data.SparseRowVector;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
@@ -129,6 +131,7 @@ public abstract class ColumnEncoder implements Encoder,
Comparable<ColumnEncoder
}*/
protected void applySparse(CacheBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk){
+ boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
int index = _colID - 1;
// Apply loop tiling to exploit CPU caches
double[] codes = getCodeCol(in, rowStart, blk);
@@ -137,9 +140,18 @@ public abstract class ColumnEncoder implements Encoder,
Comparable<ColumnEncoder
for(int i = rowStart; i < rowEnd; i+=B) {
int lim = Math.min(i+B, rowEnd);
for (int ii=i; ii<lim; ii++) {
- SparseRowVector row = (SparseRowVector)
out.getSparseBlock().get(ii);
- row.values()[index] = codes[ii-rowStart];
- row.indexes()[index] = outputCol;
+ if (mcsr) {
+ SparseRowVector row = (SparseRowVector)
out.getSparseBlock().get(ii);
+ row.values()[index] =
codes[ii-rowStart];
+ row.indexes()[index] = outputCol;
+ }
+ else { //csr
+ // Manually fill the column-indexes and
values array
+ SparseBlockCSR csrblock =
(SparseBlockCSR)out.getSparseBlock();
+ int rptr[] = csrblock.rowPointers();
+ csrblock.indexes()[rptr[ii]+index] =
outputCol;
+ csrblock.values()[rptr[ii]+index] =
codes[ii-rowStart];
+ }
}
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index a5670ae..63cf86c 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -32,6 +32,8 @@ import java.util.Set;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
@@ -85,6 +87,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
throw new DMLRuntimeException("ColumnEncoderDummycode
called with: " + in.getClass().getSimpleName() +
" and not MatrixBlock");
}
+ boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
Set<Integer> sparseRowsWZeros = null;
int index = _colID - 1;
for(int r = rowStart; r < getEndIndex(in.getNumRows(),
rowStart, blk); r++) {
@@ -103,17 +106,35 @@ public class ColumnEncoderDummycode extends ColumnEncoder
{
//
// indexes = [0,2] ===> indexes = [0,3]
// values = [1,2] values = [1,1]
- double val =
out.getSparseBlock().get(r).values()[index];
- if(Double.isNaN(val)){
- if(sparseRowsWZeros == null)
- sparseRowsWZeros = new HashSet<>();
- sparseRowsWZeros.add(r);
- out.getSparseBlock().get(r).values()[index] = 0;
- continue;
+ if (mcsr) {
+ double val =
out.getSparseBlock().get(r).values()[index];
+ if(Double.isNaN(val)){
+ if(sparseRowsWZeros == null)
+ sparseRowsWZeros = new
HashSet<>();
+ sparseRowsWZeros.add(r);
+
out.getSparseBlock().get(r).values()[index] = 0;
+ continue;
+ }
+ int nCol = outputCol + (int) val - 1;
+ out.getSparseBlock().get(r).indexes()[index] =
nCol;
+ out.getSparseBlock().get(r).values()[index] = 1;
+ }
+ else { //csr
+ SparseBlockCSR csrblock =
(SparseBlockCSR)out.getSparseBlock();
+ int rptr[] = csrblock.rowPointers();
+ double val = csrblock.values()[rptr[r]+index];
+ if(Double.isNaN(val)){
+ if(sparseRowsWZeros == null)
+ sparseRowsWZeros = new
HashSet<>();
+ sparseRowsWZeros.add(r);
+ csrblock.values()[rptr[r]+index] = 0;
//test
+ continue;
+ }
+ // Manually fill the column-indexes and values
array
+ int nCol = outputCol + (int) val - 1;
+ csrblock.indexes()[rptr[r]+index] = nCol;
+ csrblock.values()[rptr[r]+index] = 1;
}
- int nCol = outputCol + (int) val - 1;
- out.getSparseBlock().get(r).indexes()[index] = nCol;
- out.getSparseBlock().get(r).values()[index] = 1;
}
if(sparseRowsWZeros != null){
addSparseRowsWZeros(sparseRowsWZeros);
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 57405b0..f8b467d 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -27,6 +27,8 @@ import java.util.Set;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -77,6 +79,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
protected void applySparse(CacheBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk){
Set<Integer> sparseRowsWZeros = null;
+ boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
int index = _colID - 1;
// Apply loop tiling to exploit CPU caches
double[] codes = getCodeCol(in, rowStart, blk);
@@ -86,14 +89,28 @@ public class ColumnEncoderPassThrough extends ColumnEncoder
{
int lim = Math.min(i+B, rowEnd);
for (int ii=i; ii<lim; ii++) {
double v = codes[ii-rowStart];
- SparseRowVector row = (SparseRowVector)
out.getSparseBlock().get(ii);
if(v == 0) {
if(sparseRowsWZeros == null)
sparseRowsWZeros = new
HashSet<>();
sparseRowsWZeros.add(ii);
}
- row.values()[index] = v;
- row.indexes()[index] = outputCol;
+ if (mcsr) {
+ SparseRowVector row = (SparseRowVector)
out.getSparseBlock().get(ii);
+ row.values()[index] = v;
+ row.indexes()[index] = outputCol;
+ }
+ else { //csr
+ if(v == 0) {
+ if(sparseRowsWZeros == null)
+ sparseRowsWZeros = new
HashSet<>();
+ sparseRowsWZeros.add(ii);
+ }
+ // Manually fill the column-indexes and
values array
+ SparseBlockCSR csrblock =
(SparseBlockCSR)out.getSparseBlock();
+ int rptr[] = csrblock.rowPointers();
+ csrblock.indexes()[rptr[ii]+index] =
outputCol;
+ csrblock.values()[rptr[ii]+index] =
codes[ii-rowStart];
+ }
}
}
if(sparseRowsWZeros != null){
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 7206600..76301ce 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -48,7 +48,7 @@ import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -325,33 +325,51 @@ public class MultiColumnEncoder implements Encoder {
private static void outputMatrixPreProcessing(MatrixBlock output,
CacheBlock input, boolean hasDC) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- output.allocateBlock();
if(output.isInSparseFormat()) {
- SparseBlock block = output.getSparseBlock();
- if(!(block instanceof SparseBlockMCSR))
- throw new RuntimeException(
- "Transform apply currently only
supported for MCSR sparse and dense output Matrices");
- if (hasDC && OptimizerUtils.getTransformNumThreads()>1)
{
- // DC forces a single threaded allocation after
the build phase and
- // before the apply starts. Below code
parallelizes sparse allocation.
- IntStream.range(0, output.getNumRows())
- .parallel().forEach(r -> {
- block.allocate(r,
input.getNumColumns());
-
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
- });
+ if (MatrixBlock.DEFAULT_SPARSEBLOCK !=
SparseBlock.Type.CSR
+ && MatrixBlock.DEFAULT_SPARSEBLOCK !=
SparseBlock.Type.MCSR)
+ throw new RuntimeException("Transformapply is
only supported for MCSR and CSR output matrix");
+ boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
+ if (mcsr) {
+ output.allocateBlock();
+ SparseBlock block = output.getSparseBlock();
+ if (hasDC &&
OptimizerUtils.getTransformNumThreads()>1) {
+ // DC forces a single threaded
allocation after the build phase and
+ // before the apply starts. Below code
parallelizes sparse allocation.
+ IntStream.range(0, output.getNumRows())
+ .parallel().forEach(r -> {
+ block.allocate(r,
input.getNumColumns());
+
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ });
+ }
+ else {
+ for(int r = 0; r < output.getNumRows();
r++) {
+ // allocate all sparse rows so
MT sync can be done.
+ // should be rare that rows
have only 0
+ block.allocate(r,
input.getNumColumns());
+ // Setting the size here makes
it possible to run all sparse apply tasks without any sync
+ // could become problematic if
the input is very sparse since we allocate the same size as the input
+ // should be fine in theory ;)
+
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ }
+ }
}
- else {
- for(int r = 0; r < output.getNumRows(); r++) {
- // allocate all sparse rows so MT sync
can be done.
- // should be rare that rows have only 0
- block.allocate(r,
input.getNumColumns());
- // Setting the size here makes it
possible to run all sparse apply tasks without any sync
- // could become problematic if the
input is very sparse since we allocate the same size as the input
- // should be fine in theory ;)
-
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ else { //csr
+ int size = output.getNumRows() *
input.getNumColumns();
+ SparseBlock csrblock = new
SparseBlockCSR(output.getNumRows(), size, size);
+ // Manually fill the row pointers based on
nnzs/row (= #cols in the input)
+ // Not using the set() methods to 1) avoid
binary search and shifting,
+ // 2) reduce thread contentions on the arrays
+ int[] rptr =
((SparseBlockCSR)csrblock).rowPointers();
+ for (int i=0; i<rptr.length-1; i++) { //TODO:
parallelize
+ rptr[i+1] = rptr[i] +
input.getNumColumns();
}
+ output.setSparseBlock(csrblock);
}
}
+ else //dense
+ output.allocateBlock();
+
if(DMLScript.STATISTICS) {
LOG.debug("Elapsed time for allocation: "+ ((double)
System.nanoTime() - t0) / 1000000 + " ms");
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);