This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 9196234  [MINOR] Add transformencode tuning parameters to config
9196234 is described below

commit 9196234061b33541b9471ec6eb49af841c7ec20d
Author: arnabp <[email protected]>
AuthorDate: Sat Nov 6 15:00:10 2021 +0100

    [MINOR] Add transformencode tuning parameters to config
---
 src/main/java/org/apache/sysds/conf/ConfigurationManager.java  |  8 ++++++++
 src/main/java/org/apache/sysds/conf/DMLConfig.java             |  4 ++++
 src/main/java/org/apache/sysds/hops/OptimizerUtils.java        |  3 ++-
 .../cp/MultiReturnParameterizedBuiltinCPInstruction.java       |  2 +-
 .../sysds/runtime/transform/encode/MultiColumnEncoder.java     | 10 +++++-----
 .../transform/TransformFrameEncodeMultithreadedTest.java       |  2 +-
 6 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java 
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 4162d84..b1fa997 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -175,6 +175,10 @@ public class ConfigurationManager
                return 
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
        }
 
+       public static boolean isStagedParallelTransform() {
+               return 
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE_STAGED);
+       }
+
        public static int getParallelApplyBlocks(){
                return 
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_APPLY_BLOCKS);
        }
@@ -182,6 +186,10 @@ public class ConfigurationManager
        public static int getParallelBuildBlocks(){
                return 
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_BUILD_BLOCKS);
        }
+       
+       public static int getNumThreads() {
+               return 
getDMLConfig().getIntValue(DMLConfig.PARALLEL_ENCODE_NUM_THREADS);
+       }
 
        public static boolean isParallelParFor() {
                return 
getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index c956862..c020fdd 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -68,8 +68,10 @@ public class DMLConfig
        public static final String CP_PARALLEL_OPS      = 
"sysds.cp.parallel.ops";
        public static final String CP_PARALLEL_IO       = 
"sysds.cp.parallel.io";
        public static final String PARALLEL_ENCODE      = 
"sysds.parallel.encode";  // boolean: enable multi-threaded transformencode and 
apply
+       public static final String PARALLEL_ENCODE_STAGED = 
"sysds.parallel.encode.staged";
        public static final String PARALLEL_ENCODE_APPLY_BLOCKS = 
"sysds.parallel.encode.applyBlocks";
        public static final String PARALLEL_ENCODE_BUILD_BLOCKS = 
"sysds.parallel.encode.buildBlocks";
+       public static final String PARALLEL_ENCODE_NUM_THREADS  = 
"sysds.parallel.encode.numThreads";
        public static final String COMPRESSED_LINALG    = 
"sysds.compressed.linalg";
        public static final String COMPRESSED_LOSSY     = 
"sysds.compressed.lossy";
        public static final String COMPRESSED_VALID_COMPRESSIONS = 
"sysds.compressed.valid.compressions";
@@ -130,8 +132,10 @@ public class DMLConfig
                _defaultVals.put(CP_PARALLEL_OPS,        "true" );
                _defaultVals.put(CP_PARALLEL_IO,         "true" );
                _defaultVals.put(PARALLEL_ENCODE,        "false" );
+               _defaultVals.put(PARALLEL_ENCODE_STAGED, "false" );
                _defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "1");
                _defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "1");
+               _defaultVals.put(PARALLEL_ENCODE_NUM_THREADS, "-1");
                _defaultVals.put(COMPRESSED_LINALG,      
Compression.CompressConfig.FALSE.name() );
                _defaultVals.put(COMPRESSED_LOSSY,       "false" );
                _defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 1b94413..6a41ea8 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -1017,10 +1017,11 @@ public class OptimizerUtils
                return ret;
        }
 
-       public static int getTransformNumThreads(int maxNumThreads)
+       public static int getTransformNumThreads()
        {
                //by default max local parallelism (vcores) 
                int ret = InfrastructureAnalyzer.getLocalParallelism();
+               int maxNumThreads = ConfigurationManager.getNumThreads();
                
                //apply external max constraint (e.g., set by parfor or other 
rewrites)
                if( maxNumThreads > 0 ) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 800aa51..14dbc3a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -87,7 +87,7 @@ public class MultiReturnParameterizedBuiltinCPInstruction 
extends ComputationCPI
                // execute block transform encode
                MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, 
colnames, fin.getNumColumns(), null);
                // TODO: Assign #threads in compiler and pass via the 
instruction string
-               MatrixBlock data = encoder.encode(fin, 
OptimizerUtils.getTransformNumThreads(-1)); // build and apply
+               MatrixBlock data = encoder.encode(fin, 
OptimizerUtils.getTransformNumThreads()); // build and apply
                FrameBlock meta = encoder.getMetaData(new 
FrameBlock(fin.getNumColumns(), ValueType.STRING));
                meta.setColumnNames(colnames);
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 3e2b310..4f5c2af 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
@@ -56,9 +57,8 @@ import org.apache.sysds.utils.Statistics;
 public class MultiColumnEncoder implements Encoder {
 
        protected static final Log LOG = 
LogFactory.getLog(MultiColumnEncoder.class.getName());
-       private static final boolean MULTI_THREADED = true;
        // If true build and apply separately by placing a synchronization 
barrier
-       public static boolean MULTI_THREADED_STAGES = false;
+       public static boolean MULTI_THREADED_STAGES = 
ConfigurationManager.isStagedParallelTransform();
 
        // Only affects if  MULTI_THREADED_STAGES is true
        // if true apply tasks for each column will complete
@@ -87,7 +87,7 @@ public class MultiColumnEncoder implements Encoder {
        public MatrixBlock encode(CacheBlock in, int k) {
                MatrixBlock out;
                try {
-                       if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && 
!hasLegacyEncoder()) {
+                       if(k > 1 && !MULTI_THREADED_STAGES && 
!hasLegacyEncoder()) {
                                out = new MatrixBlock();
                                DependencyThreadPool pool = new 
DependencyThreadPool(k);
                                LOG.debug("Encoding with full DAG on " + k + " 
Threads");
@@ -187,7 +187,7 @@ public class MultiColumnEncoder implements Encoder {
        public void build(CacheBlock in, int k) {
                if(hasLegacyEncoder() && !(in instanceof FrameBlock))
                        throw new DMLRuntimeException("LegacyEncoders do not 
support non FrameBlock Inputs");
-               if(MULTI_THREADED && k > 1) {
+               if(k > 1) {
                        buildMT(in, k);
                }
                else {
@@ -255,7 +255,7 @@ public class MultiColumnEncoder implements Encoder {
                // TODO smart checks
                // Block allocation for MT access
                outputMatrixPreProcessing(out, in);
-               if(MULTI_THREADED && k > 1) {
+               if(k > 1) {
                        applyMT(in, out, outputCol, k);
                }
                else {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index fbf7111..bad4a5e 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -208,7 +208,7 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
                        Files.readAllLines(Paths.get(SPEC)).forEach(s -> 
specSb.append(s).append("\n"));
                        MultiColumnEncoder encoder = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
                                input.getNumColumns(), null);
-                       MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
+                       //MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
 
                        MatrixBlock outputS = encoder.encode(input, 1);
                        MatrixBlock outputM = encoder.encode(input, 12);

Reply via email to