This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 344ca0b67f [MINOR] CLA Factory logging cleanup
344ca0b67f is described below
commit 344ca0b67f7d496ef841dcf57b5bdcb36076d632
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Mon Feb 3 21:53:11 2025 +0100
[MINOR] CLA Factory logging cleanup
---
.../runtime/compress/CompressedMatrixBlock.java | 8 ++--
.../compress/CompressedMatrixBlockFactory.java | 55 ++++++++++++++++------
.../compress/CompressionSettingsBuilder.java | 3 --
.../runtime/compress/CompressionStatistics.java | 6 ++-
.../sysds/runtime/compress/colgroup/ASDC.java | 8 ++--
.../sysds/runtime/compress/colgroup/ASDCZero.java | 14 +++---
6 files changed, 62 insertions(+), 32 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index bee86addf2..c78d651ff0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -226,6 +226,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
* @param colGroups new ColGroups in the MatrixBlock
*/
public void allocateColGroupList(List<AColGroup> colGroups) {
+ cachedMemorySize = -1;
_colGroups = colGroups;
}
@@ -351,7 +352,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
List<Future<Long>> tasks = new ArrayList<>();
for(AColGroup g : _colGroups)
tasks.add(pool.submit(() ->
g.getNumberNonZeros(rlen)));
-
long nnz = 0;
for(Future<Long> t : tasks)
nnz += t.get();
@@ -398,7 +398,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
public long estimateCompressedSizeInMemory() {
if(cachedMemorySize <= -1L) {
-
long total = baseSizeInMemory();
// take into consideration duplicate dictionaries
Set<IDictionary> dicts = new HashSet<>();
@@ -413,7 +412,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
}
cachedMemorySize = total;
return total;
-
}
else
return cachedMemorySize;
@@ -1002,6 +1000,10 @@ public class CompressedMatrixBlock extends MatrixBlock {
return getUncompressed((String) null);
}
+ public MatrixBlock getUncompressed(int k){
+ return getUncompressed((String) null, k);
+ }
+
public MatrixBlock getUncompressed(String operation) {
return getUncompressed(operation,
ConfigurationManager.isParallelMatrixOperations() ?
InfrastructureAnalyzer.getLocalParallelism() : 1);
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 83b21ab235..93240644a1 100644
---
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -37,6 +37,7 @@ import
org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.cost.ACostEstimate;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
import org.apache.sysds.runtime.compress.cost.InstructionTypeCounter;
@@ -159,7 +160,7 @@ public class CompressedMatrixBlockFactory {
return compress(mb, k, compSettings, (WTreeRoot) null);
}
- public static Future<Void> compressAsync(ExecutionContext ec, String
varName) {
+ public static Future<Void> compressAsync(ExecutionContext ec, String
varName) {
return compressAsync(ec, varName, null);
}
@@ -168,7 +169,7 @@ public class CompressedMatrixBlockFactory {
final ExecutorService pool = CommonThreadPool.get(); // We have
to guarantee that a thread pool is allocated.
return CompletableFuture.runAsync(() -> {
// method call or code to be async
- try{
+ try {
CacheableData<?> data =
ec.getCacheableData(varName);
if(data instanceof MatrixObject) {
MatrixObject mo = (MatrixObject) data;
@@ -178,10 +179,11 @@ public class CompressedMatrixBlockFactory {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
+ mbc.sum(); // calculate sum to
forcefully materialize counts
}
}
}
- finally{
+ finally {
pool.shutdown();
}
}, pool);
@@ -288,11 +290,16 @@ public class CompressedMatrixBlockFactory {
_stats.originalSize = mb.getInMemorySize();
_stats.originalCost = costEstimator.getCost(mb);
+ final double orgSum;
+ if(CompressedMatrixBlock.debug)
+ orgSum = mb.sum(k).getDouble(0, 0);
+ else
+ orgSum = 0;
if(mb.isEmpty()) // empty input return empty compression
return createEmpty();
res = new CompressedMatrixBlock(mb); // copy metadata and
allocate soft reference
-
+ logInit();
classifyPhase();
if(compressionGroups == null)
return abortCompression();
@@ -308,6 +315,12 @@ public class CompressedMatrixBlockFactory {
if(res == null)
return abortCompression();
+ if(CompressedMatrixBlock.debug) {
+ final double afterComp = mb.sum(k).getDouble(0, 0);
+ final double deltaSum = Math.abs(orgSum - afterComp);
+ LOG.debug("compression Sum: Before:" + orgSum + "
after: " + afterComp + " |delta|: " + deltaSum);
+ }
+
return new ImmutablePair<>(res, _stats);
}
@@ -334,7 +347,8 @@ public class CompressedMatrixBlockFactory {
final double scale = Math.sqrt(nCols);
final double threshold = _stats.estimatedCostCols / scale;
- if(threshold < _stats.originalCost) {
+ if(threshold < _stats.originalCost *
+ ((costEstimator instanceof ComputationCostEstimator) &&
!(mb instanceof CompressedMatrixBlock) ? 15 : 0.8)) {
if(nCols > 1)
coCodePhase();
else // LOG a short cocode phase (since there is one
column we don't cocode)
@@ -406,7 +420,7 @@ public class CompressedMatrixBlockFactory {
compSettings.transposed = false;
break;
default:
- compSettings.transposed =
transposeHeuristics(compressionGroups.getNumberColGroups() , mb);
+ compSettings.transposed =
transposeHeuristics(compressionGroups.getNumberColGroups(), mb);
}
}
@@ -442,20 +456,20 @@ public class CompressedMatrixBlockFactory {
_stats.compressedSize = res.getInMemorySize();
_stats.compressedCost =
costEstimator.getCost(res.getColGroups(), res.getNumRows());
-
- final double ratio = _stats.getRatio();
- final double denseRatio = _stats.getDenseRatio();
-
_stats.setColGroupsCounts(res.getColGroups());
- if(ratio < 1 && denseRatio < 100.0) {
+
+ if(_stats.compressedCost > _stats.originalCost) {
LOG.info("--dense size: " + _stats.denseSize);
LOG.info("--original size: " + _stats.originalSize);
LOG.info("--compressed size: " +
_stats.compressedSize);
- LOG.info("--compression ratio: " + ratio);
+ LOG.info("--compression ratio: " + _stats.getRatio());
+ LOG.info("--original Cost: " + _stats.originalCost);
+ LOG.info("--Compressed Cost: " +
_stats.compressedCost);
+ LOG.info("--Cost Ratio: " +
_stats.getCostRatio());
LOG.debug("--col groups types " +
_stats.getGroupsTypesString());
LOG.debug("--col groups sizes " +
_stats.getGroupsSizesString());
logLengths();
- LOG.info("Abort block compression because compression
ratio is less than 1.");
+ LOG.info("Abort block compression because cost ratio is
less than 1. ");
res = null;
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
@@ -472,9 +486,23 @@ public class CompressedMatrixBlockFactory {
private Pair<MatrixBlock, CompressionStatistics> abortCompression() {
LOG.warn("Compression aborted at phase: " + phase);
+ if(mb instanceof CompressedMatrixBlock && mb.getInMemorySize()
> _stats.denseSize) {
+ MatrixBlock ucmb = ((CompressedMatrixBlock)
mb).getUncompressed("Decompressing for abort: ", k);
+ return new ImmutablePair<>(ucmb, _stats);
+ }
return new ImmutablePair<>(mb, _stats);
}
+ private void logInit() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("--Seed used for comp : " +
compSettings.seed);
+ LOG.debug(String.format("--number columns to compress:
%10d", mb.getNumColumns()));
+ LOG.debug(String.format("--number rows to compress :
%10d", mb.getNumRows()));
+ LOG.debug(String.format("--sparsity :
%10.5f", mb.getSparsity()));
+ LOG.debug(String.format("--nonZeros :
%10d", mb.getNonZeros()));
+ }
+ }
+
private void logPhase() {
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(),
phase);
@@ -486,7 +514,6 @@ public class CompressedMatrixBlockFactory {
else {
switch(phase) {
case 0:
- LOG.debug("--Seed used for comp
: " + compSettings.seed);
LOG.debug("--compression phase
" + phase + " Classify : " + getLastTimePhase());
LOG.debug("--Individual Columns
Estimated Compression: " + _stats.estimatedSizeCols);
if(mb instanceof
CompressedMatrixBlock) {
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index ec5512266e..dc0908dc9b 100644
---
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -35,10 +35,7 @@ import
org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory.Est
*/
public class CompressionSettingsBuilder {
private double samplingRatio;
- // private double samplePower = 0.6;
private double samplePower = 0.65;
- // private double samplePower = 0.68;
- // private double samplePower = 0.7;
private boolean allowSharedDictionary = false;
private String transposeInput;
private int seed = -1;
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
index d54eb2c352..01e7c8bc1a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
@@ -108,6 +108,10 @@ public class CompressionStatistics {
return compressedSize == 0.0 ? Double.POSITIVE_INFINITY :
(double) originalSize / compressedSize;
}
+ public double getCostRatio() {
+ return compressedSize == 0.0 ? Double.POSITIVE_INFINITY :
(double) originalCost / compressedCost;
+ }
+
public double getDenseRatio() {
return compressedSize == 0.0 ? Double.POSITIVE_INFINITY :
(double) denseSize / compressedSize;
}
@@ -121,7 +125,7 @@ public class CompressionStatistics {
sb.append("\nCompressed Size : " + compressedSize);
sb.append("\nCompressionRatio : " + getRatio());
sb.append("\nDenseCompressionRatio : " + getDenseRatio());
-
+
if(colGroupCounts != null) {
sb.append("\nCompressionTypes : " +
getGroupsTypesString());
sb.append("\nCompressionGroupSizes : " +
getGroupsSizesString());
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
index 685b18d963..d519737277 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
@@ -34,7 +34,7 @@ import
org.apache.sysds.runtime.compress.estim.EstimationFactors;
* This column group is handy in cases where sparse unsafe operations is
executed on very sparse columns. Then the zeros
* would be materialized in the group without any overhead.
*/
-public abstract class ASDC extends AMorphingMMColGroup implements
AOffsetsGroup , IContainDefaultTuple {
+public abstract class ASDC extends AMorphingMMColGroup implements
AOffsetsGroup, IContainDefaultTuple {
private static final long serialVersionUID = 769993538831949086L;
/** Sparse row indexes for the data */
@@ -62,7 +62,7 @@ public abstract class ASDC extends AMorphingMMColGroup
implements AOffsetsGroup
@Override
public final CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
EstimationFactors ef = new EstimationFactors(getNumValues(),
_numRows, getNumberOffsets(), _dict.getSparsity());
- return new CompressedSizeInfoColGroup(_colIndexes, ef, nRow,
getCompType(),getEncoding());
+ return new CompressedSizeInfoColGroup(_colIndexes, ef,
estimateInMemorySize(), getCompType(), getEncoding());
}
@Override
@@ -74,12 +74,12 @@ public abstract class ASDC extends AMorphingMMColGroup
implements AOffsetsGroup
public AColGroup morph(CompressionType ct, int nRow) {
if(ct == getCompType())
return this;
- else if (ct == CompressionType.SDCFOR)
+ else if(ct == CompressionType.SDCFOR)
return this; // it does not make sense to change to FOR.
else
return super.morph(ct, nRow);
}
-
+
@Override
protected boolean allowShallowIdentityRightMult() {
return false;
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
index 1b8d6da613..3de98a1c23 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
@@ -67,7 +67,7 @@ public abstract class ASDCZero extends APreAgg implements
AOffsetsGroup, IContai
AIterator it) {
if(mb.isEmpty()) // early abort.
return;
-
+
final DenseBlock res = result.getDenseBlock();
final double[] resV = res.values(r);
final int offRet = res.pos(r);
@@ -108,11 +108,11 @@ public abstract class ASDCZero extends APreAgg implements
AOffsetsGroup, IContai
final int v = it.value();
while(apos < alen && aix[apos] < v)
apos++; // go though sparse block until offset start.
- if(cu < last)
+ if(cu < last)
leftMultByMatrixNoPreAggSingleRowSparseInside(v, it,
apos, alen, aix, aval, resV, offRet, cu);
- else if(aix[alen - 1] < last)
+ else if(aix[alen - 1] < last)
leftMultByMatrixNoPreAggSingleRowSparseLessThan(v, it,
apos, alen, aix, aval, resV, offRet);
- else
+ else
leftMultByMatrixNoPreAggSingleRowSparseTail(v, it,
apos, alen, aix, aval, resV, offRet, cu, last);
}
@@ -245,7 +245,7 @@ public abstract class ASDCZero extends APreAgg implements
AOffsetsGroup, IContai
@Override
public final CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
EstimationFactors ef = new EstimationFactors(getNumValues(),
_numRows, getNumberOffsets(), _dict.getSparsity());
- return new CompressedSizeInfoColGroup(_colIndexes, ef, nRow,
getCompType(), getEncoding());
+ return new CompressedSizeInfoColGroup(_colIndexes, ef,
this.estimateInMemorySize(), getCompType(), getEncoding());
}
@Override
@@ -257,12 +257,12 @@ public abstract class ASDCZero extends APreAgg implements
AOffsetsGroup, IContai
public AColGroup morph(CompressionType ct, int nRow) {
if(ct == getCompType())
return this;
- else if (ct == CompressionType.SDCFOR)
+ else if(ct == CompressionType.SDCFOR)
return this; // it does not make sense to change to FOR.
else
return super.morph(ct, nRow);
}
-
+
@Override
protected boolean allowShallowIdentityRightMult() {
return true;