This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 9196234 [MINOR] Add transformencode tuning parameters to config
9196234 is described below
commit 9196234061b33541b9471ec6eb49af841c7ec20d
Author: arnabp <[email protected]>
AuthorDate: Sat Nov 6 15:00:10 2021 +0100
[MINOR] Add transformencode tuning parameters to config
---
src/main/java/org/apache/sysds/conf/ConfigurationManager.java | 8 ++++++++
src/main/java/org/apache/sysds/conf/DMLConfig.java | 4 ++++
src/main/java/org/apache/sysds/hops/OptimizerUtils.java | 3 ++-
.../cp/MultiReturnParameterizedBuiltinCPInstruction.java | 2 +-
.../sysds/runtime/transform/encode/MultiColumnEncoder.java | 10 +++++-----
.../transform/TransformFrameEncodeMultithreadedTest.java | 2 +-
6 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 4162d84..b1fa997 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -175,6 +175,10 @@ public class ConfigurationManager
return
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
}
+ public static boolean isStagedParallelTransform() {
+ return
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE_STAGED);
+ }
+
public static int getParallelApplyBlocks(){
return
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_APPLY_BLOCKS);
}
@@ -182,6 +186,10 @@ public class ConfigurationManager
public static int getParallelBuildBlocks(){
return
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_BUILD_BLOCKS);
}
+
+ public static int getNumThreads() {
+ return
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_NUM_THREADS);
+ }
public static boolean isParallelParFor() {
return
getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index c956862..c020fdd 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -68,8 +68,10 @@ public class DMLConfig
public static final String CP_PARALLEL_OPS =
"sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO =
"sysds.cp.parallel.io";
public static final String PARALLEL_ENCODE =
"sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and
apply
+ public static final String PARALLEL_ENCODE_STAGED =
"sysds.parallel.encode.staged";
public static final String PARALLEL_ENCODE_APPLY_BLOCKS =
"sysds.parallel.encode.applyBlocks";
public static final String PARALLEL_ENCODE_BUILD_BLOCKS =
"sysds.parallel.encode.buildBlocks";
+ public static final String PARALLEL_ENCODE_NUM_THREADS =
"sysds.parallel.encode.numThreads";
public static final String COMPRESSED_LINALG =
"sysds.compressed.linalg";
public static final String COMPRESSED_LOSSY =
"sysds.compressed.lossy";
public static final String COMPRESSED_VALID_COMPRESSIONS =
"sysds.compressed.valid.compressions";
@@ -130,8 +132,10 @@ public class DMLConfig
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
_defaultVals.put(PARALLEL_ENCODE, "false" );
+ _defaultVals.put(PARALLEL_ENCODE_STAGED, "false" );
_defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "1");
_defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "1");
+ _defaultVals.put(PARALLEL_ENCODE_NUM_THREADS, "-1");
_defaultVals.put(COMPRESSED_LINALG,
Compression.CompressConfig.FALSE.name() );
_defaultVals.put(COMPRESSED_LOSSY, "false" );
_defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 1b94413..6a41ea8 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -1017,10 +1017,11 @@ public class OptimizerUtils
return ret;
}
- public static int getTransformNumThreads(int maxNumThreads)
+ public static int getTransformNumThreads()
{
//by default max local parallelism (vcores)
int ret = InfrastructureAnalyzer.getLocalParallelism();
+ int maxNumThreads = ConfigurationManager.getNumThreads();
//apply external max constraint (e.g., set by parfor or other
rewrites)
if( maxNumThreads > 0 ) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 800aa51..14dbc3a 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -87,7 +87,7 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
// execute block transform encode
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec,
colnames, fin.getNumColumns(), null);
// TODO: Assign #threads in compiler and pass via the
instruction string
- MatrixBlock data = encoder.encode(fin,
OptimizerUtils.getTransformNumThreads(-1)); // build and apply
+ MatrixBlock data = encoder.encode(fin,
OptimizerUtils.getTransformNumThreads()); // build and apply
FrameBlock meta = encoder.getMetaData(new
FrameBlock(fin.getNumColumns(), ValueType.STRING));
meta.setColumnNames(colnames);
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 3e2b310..4f5c2af 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
+import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.data.SparseBlock;
@@ -56,9 +57,8 @@ import org.apache.sysds.utils.Statistics;
public class MultiColumnEncoder implements Encoder {
protected static final Log LOG =
LogFactory.getLog(MultiColumnEncoder.class.getName());
- private static final boolean MULTI_THREADED = true;
// If true build and apply separately by placing a synchronization
barrier
- public static boolean MULTI_THREADED_STAGES = false;
+ public static boolean MULTI_THREADED_STAGES =
ConfigurationManager.isStagedParallelTransform();
// Only affects if MULTI_THREADED_STAGES is true
// if true apply tasks for each column will complete
@@ -87,7 +87,7 @@ public class MultiColumnEncoder implements Encoder {
public MatrixBlock encode(CacheBlock in, int k) {
MatrixBlock out;
try {
- if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES &&
!hasLegacyEncoder()) {
+ if(k > 1 && !MULTI_THREADED_STAGES &&
!hasLegacyEncoder()) {
out = new MatrixBlock();
DependencyThreadPool pool = new
DependencyThreadPool(k);
LOG.debug("Encoding with full DAG on " + k + "
Threads");
@@ -187,7 +187,7 @@ public class MultiColumnEncoder implements Encoder {
public void build(CacheBlock in, int k) {
if(hasLegacyEncoder() && !(in instanceof FrameBlock))
throw new DMLRuntimeException("LegacyEncoders do not
support non FrameBlock Inputs");
- if(MULTI_THREADED && k > 1) {
+ if(k > 1) {
buildMT(in, k);
}
else {
@@ -255,7 +255,7 @@ public class MultiColumnEncoder implements Encoder {
// TODO smart checks
// Block allocation for MT access
outputMatrixPreProcessing(out, in);
- if(MULTI_THREADED && k > 1) {
+ if(k > 1) {
applyMT(in, out, outputCol, k);
}
else {
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index fbf7111..bad4a5e 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -208,7 +208,7 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
Files.readAllLines(Paths.get(SPEC)).forEach(s ->
specSb.append(s).append("\n"));
MultiColumnEncoder encoder =
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
input.getNumColumns(), null);
- MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
+ //MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
MatrixBlock outputS = encoder.encode(input, 1);
MatrixBlock outputM = encoder.encode(input, 12);