This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 33b58c97a4 [SYSTEMDS-3359] Sample-based recode map size estimation
33b58c97a4 is described below
commit 33b58c97a49c63a107dc629cc9f9258c66e462be
Author: arnabp <[email protected]>
AuthorDate: Mon Apr 25 23:24:09 2022 +0200
[SYSTEMDS-3359] Sample-based recode map size estimation
This patch improves the transformencode optimizer by adding
a sample-based estimation of the recode/partial recode map sizes.
We fallback to full column recode build tasks if the partial
maps do not fit in the memory. Future commits will fine tune
the task graph with 1) column-specific build blocks counts
and 2) serialized column-build tasks -- both based on the
memory estimates.
Closes #1597
---
.../estim/CompressedSizeEstimatorSample.java | 2 +-
.../runtime/transform/encode/ColumnEncoder.java | 9 +++++
.../transform/encode/ColumnEncoderComposite.java | 8 ++++
.../transform/encode/ColumnEncoderRecode.java | 35 ++++++++++++++++
.../transform/encode/MultiColumnEncoder.java | 46 +++++++++++++++++++++-
.../sysds/utils/stats/TransformStatistics.java | 8 ++++
6 files changed, 106 insertions(+), 2 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 484f90e63f..041fa9b86c 100644
---
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -226,7 +226,7 @@ public class CompressedSizeEstimatorSample extends
CompressedSizeEstimator {
return _sample.getNonZeros();
}
- private static int[] getSortedSample(int range, int sampleSize, long
seed, int k) {
+ public static int[] getSortedSample(int range, int sampleSize, long
seed, int k) {
// set meta data and allocate dense block
final int[] a = new int[sampleSize];
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index bb543d8986..9c7832c2bf 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -60,6 +60,7 @@ public abstract class ColumnEncoder implements Encoder,
Comparable<ColumnEncoder
private static final long serialVersionUID = 2299156350718979064L;
protected int _colID;
protected ArrayList<Integer> _sparseRowsWZeros = null;
+ protected long _estMetaSize = 0;
protected enum TransformType{
BIN, RECODE, DUMMYCODE, FEATURE_HASH, PASS_THROUGH, N_A
@@ -281,6 +282,14 @@ public abstract class ColumnEncoder implements Encoder,
Comparable<ColumnEncoder
_colID += columnOffset;
}
+ public void setEstMetaSize(long estSize) {
+ _estMetaSize = estSize;
+ }
+
+ public long getEstMetaSize() {
+ return _estMetaSize;
+ }
+
@Override
public int compareTo(ColumnEncoder o) {
return Integer.compare(getEncoderType(this), getEncoderType(o));
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index ac2156e89b..59dd3157c0 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -360,6 +360,14 @@ public class ColumnEncoderComposite extends ColumnEncoder {
return false;
}
+ public void computeRCDMapSizeEstimate(CacheBlock in, int[]
sampleIndices) {
+ for (ColumnEncoder e : _columnEncoders)
+ if (e.getClass().equals(ColumnEncoderRecode.class))
+ ((ColumnEncoderRecode)
e).computeRCDMapSizeEstimate(in, sampleIndices);
+ long totEstSize =
_columnEncoders.stream().mapToLong(ColumnEncoder::getEstMetaSize).sum();
+ setEstMetaSize(totEstSize);
+ }
+
@Override
public void shiftCol(int columnOffset) {
super.shiftCol(columnOffset);
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index 3afce5a3b5..8ed89856d8 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -128,6 +129,40 @@ public class ColumnEncoderRecode extends ColumnEncoder {
return (tmp != null) ? tmp : -1;
}
+ public void computeRCDMapSizeEstimate(CacheBlock in, int[]
sampleIndices) {
+ if (getEstMetaSize() != 0)
+ return;
+
+ // Find the frequencies of distinct values in the sample
+ HashMap<String, Integer> distinctFreq = new HashMap<>();
+ long totSize = 0;
+ for (int sind : sampleIndices) {
+ String key = in.getString(sind, _colID-1);
+ if (key == null)
+ continue;
+ //distinctFreq.put(key, distinctFreq.getOrDefault(key,
(long)0) + 1);
+ if (distinctFreq.containsKey(key))
+ distinctFreq.put(key, distinctFreq.get(key) +
1);
+ else {
+ distinctFreq.put(key, 1);
+ // Maintain total size of the keys
+ totSize += (key.length() * 2L + 16);
//sizeof(String) = len(chars) + header
+ }
+ }
+
+ // Estimate total #distincts using Hass and Stokes estimator
+ int[] freq = distinctFreq.values().stream().mapToInt(v ->
v).toArray();
+ int estDistCount = SampleEstimatorFactory.distinctCount(freq,
in.getNumRows(),
+ sampleIndices.length,
SampleEstimatorFactory.EstimationType.HassAndStokes);
+
+ // Compute total size estimates for each partial recode map
+ // We assume each partial map contains all distinct values and
have the same size
+ long avgKeySize = totSize / distinctFreq.size();
+ long valSize = 16L; //sizeof(Long) = 8 + header
+ long estMapSize = estDistCount * (avgKeySize + valSize);
+ setEstMetaSize(estMapSize);
+ }
+
@Override
protected TransformType getTransformType() {
return TransformType.RECODE;
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 019a140a2a..fe49097296 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -44,7 +44,9 @@ import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorSample;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.data.SparseRowVector;
@@ -401,11 +403,14 @@ public class MultiColumnEncoder implements Encoder {
int nRow = in.getNumRows();
int nThread = OptimizerUtils.getTransformNumThreads(); //VCores
int minNumRows = 16000; //min rows per partition
+ List<ColumnEncoderComposite> recodeEncoders = new ArrayList<>();
// Count #Builds and #Applies (= #Col)
int nBuild = 0;
for (ColumnEncoderComposite e : _columnEncoders)
- if (e.hasBuild())
+ if (e.hasBuild()) {
nBuild++;
+ recodeEncoders.add(e);
+ }
int nApply = in.getNumColumns();
// #BuildBlocks = (2 * #PhysicalCores)/#build
if (numBlocks[0] == 0 && nBuild > 0 && nBuild < nThread)
@@ -420,6 +425,19 @@ public class MultiColumnEncoder implements Encoder {
while (numBlocks[1] > 1 && nRow/numBlocks[1] < minNumRows)
numBlocks[1]--;
+ // Reduce #build blocks if all don't fit in memory
+ if (numBlocks[0] > 1) {
+ // Estimate recode map sizes
+ estimateRCMapSize(in, recodeEncoders);
+ long totEstSize =
recodeEncoders.stream().mapToLong(ColumnEncoderComposite::getEstMetaSize).sum();
+ // Worst case scenario: all partial maps contain all
distinct values
+ long totPartMapSize = totEstSize * numBlocks[0];
+ if (totPartMapSize >
InfrastructureAnalyzer.getLocalMaxMemory())
+ numBlocks[0] = 1;
+ // TODO: Maintain #blocks per encoder. Reduce only the
ones with large maps
+ // TODO: If this not enough, add dependencies between
recode build tasks
+ }
+
// Set to 1 if not set by the above logics
for (int i=0; i<2; i++)
if (numBlocks[i] == 0)
@@ -428,6 +446,32 @@ public class MultiColumnEncoder implements Encoder {
return numBlocks;
}
+ private void estimateRCMapSize(CacheBlock in,
List<ColumnEncoderComposite> rcList) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ // Collect sample row indices
+ int k = OptimizerUtils.getTransformNumThreads();
+ int sampleSize = (int) (0.1 * in.getNumRows());
+ int seed = (int) System.nanoTime();
+ int[] sampleInds =
CompressedSizeEstimatorSample.getSortedSample(in.getNumRows(), sampleSize,
seed, 1);
+
+ // Concurrent (col-wise) recode map size estimation
+ ExecutorService myPool = CommonThreadPool.get(k);
+ try {
+ myPool.submit(() -> {
+ rcList.stream().parallel().forEach(e -> {
+ e.computeRCDMapSizeEstimate(in,
sampleInds);
+ });
+ }).get();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ if(DMLScript.STATISTICS) {
+ LOG.debug("Elapsed time for RC map size estimation: " +
((double) System.nanoTime() - t0) / 1000000 + " ms");
+
TransformStatistics.incMapSizeEstimationTime(System.nanoTime() - t0);
+ }
+ }
private static void outputMatrixPreProcessing(MatrixBlock output,
CacheBlock input, boolean hasDC) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
diff --git
a/src/main/java/org/apache/sysds/utils/stats/TransformStatistics.java
b/src/main/java/org/apache/sysds/utils/stats/TransformStatistics.java
index d572b42832..f1a4f1f3d8 100644
--- a/src/main/java/org/apache/sysds/utils/stats/TransformStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/TransformStatistics.java
@@ -40,6 +40,7 @@ public class TransformStatistics {
private static final LongAdder outMatrixPreProcessingTime = new
LongAdder();
private static final LongAdder outMatrixPostProcessingTime = new
LongAdder();
+ private static final LongAdder mapSizeEstimationTime = new LongAdder();
public static void incEncoderCount(long encoders) {
encoderCount.add(encoders);
@@ -93,6 +94,10 @@ public class TransformStatistics {
outMatrixPostProcessingTime.add(t);
}
+ public static void incMapSizeEstimationTime(long t) {
+ mapSizeEstimationTime.add(t);
+ }
+
public static long getEncodeBuildTime() {
return binningBuildTime.longValue() +
imputeBuildTime.longValue() +
recodeBuildTime.longValue();
@@ -121,6 +126,7 @@ public class TransformStatistics {
imputeApplyTime.reset();
outMatrixPreProcessingTime.reset();
outMatrixPostProcessingTime.reset();
+ mapSizeEstimationTime.reset();
}
public static String displayStatistics() {
@@ -168,6 +174,8 @@ public class TransformStatistics {
outMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n");
sb.append("TransformEncode PostProc.
time:\t").append(String.format("%.3f",
outMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n");
+ sb.append("TransformEncode SizeEst.
time:\t").append(String.format("%.3f",
+
mapSizeEstimationTime.longValue()*1e-9)).append(" sec.\n");
return sb.toString();
}
return "";