This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b15af  [MINOR] Multi-threaded parfor result file deletion (latency 
mitigation)
e0b15af is described below

commit e0b15afc5205821e86b4c1165132b7c004bc7771
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Oct 9 21:36:37 2020 +0200

    [MINOR] Multi-threaded parfor result file deletion (latency mitigation)
    
    After parfor operations a result-merge implementation merges the partial
    results from parfor workers into the final result variables. In case of
    remote parfor, we have #result variables x #parfor tasks files, which -
    in case of in-memory result merge - are into the driver, aggregated, and
    finally deleted. In sub-optimal cluster configurations, the delete can
    have substantial latency (independent of file size). To mitigate this
    latency we now delete these files in a multi-threaded manner, which
    showed good performance. Note that we refrain from asynchronous deletion
    to avoid synchronization in case of parfor loops in surrounding
    while/for loops (where the same files might be written multiple times).
---
 .../runtime/controlprogram/ParForProgramBlock.java | 20 ++++----
 .../sysds/test/usertest/pythonapi/StartupTest.java | 55 ++++++++++------------
 2 files changed, 35 insertions(+), 40 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index d5ef760..1e4281c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -99,6 +99,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 
 
@@ -1046,12 +1047,12 @@ public class ParForProgramBlock extends ForProgramBlock
         * @param out output matrix
         * @param in array of input matrix objects
         */
-       private static void cleanWorkerResultVariables(ExecutionContext ec, 
MatrixObject out, MatrixObject[] in) {
-               for( MatrixObject tmp : in ) {
-                       //check for empty inputs (no iterations executed)
-                       if( tmp != null && tmp != out )
-                               ec.cleanupCacheableData(tmp);
-               }
+       private static void cleanWorkerResultVariables(ExecutionContext ec, 
MatrixObject out, MatrixObject[] in, boolean parallel) {
+               //check for empty inputs (no iterations executed)
+               Stream<MatrixObject> results = Arrays.stream(in).filter(m -> 
m!=null && m!=out);
+               //perform cleanup (parallel to mitigate file deletion 
bottlenecks)
+               (parallel ? results.parallel() : results)
+                       .forEach(m -> ec.cleanupCacheableData(m));
        }
        
        /**
@@ -1432,7 +1433,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                                ec.cleanupDataObject(exdata);
                                        
                                        //cleanup of intermediate result 
variables
-                                       cleanWorkerResultVariables( ec, out, in 
);
+                                       cleanWorkerResultVariables( ec, out, 
in, true );
                                        
                                        //set merged result variable
                                        ec.setVariable(var._name, outNew);
@@ -1657,13 +1658,12 @@ public class ParForProgramBlock extends ForProgramBlock
                                        }
                
                                        //cleanup of intermediate result 
variables
-                                       cleanWorkerResultVariables( _ec, out, 
in );
+                                       cleanWorkerResultVariables( _ec, out, 
in, false );
                                }
                                
                                _success = true;
                        }
-                       catch(Exception ex)
-                       {
+                       catch(Exception ex) {
                                LOG.error("Error executing result merge: ", ex);
                        }
                }
diff --git 
a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java 
b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
index 561bdf1..0106099 100644
--- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
+++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
@@ -19,40 +19,35 @@
 
 package org.apache.sysds.test.usertest.pythonapi;
 
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
 import org.apache.sysds.api.PythonDMLScript;
 import org.junit.Test;
 
 /** Simple tests to verify startup of Python Gateway server happens without 
crashes */
 public class StartupTest {
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_1() {
-        PythonDMLScript.main(new String[] {});
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_2() {
-        PythonDMLScript.main(new String[] {""});
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_3() {
-        PythonDMLScript.main(new String[] {"131", "131"});
-    }
-
-    @Test(expected = NumberFormatException.class)
-    public void testStartupIncorrect_4() {
-        PythonDMLScript.main(new String[] {"Hello"});
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testStartupIncorrect_5() {
-        // Number out of range
-        PythonDMLScript.main(new String[] {"918757"});
-    }
+       @Test(expected = IllegalArgumentException.class)
+       public void testStartupIncorrect_1() {
+               PythonDMLScript.main(new String[] {});
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testStartupIncorrect_2() {
+               PythonDMLScript.main(new String[] {""});
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testStartupIncorrect_3() {
+               PythonDMLScript.main(new String[] {"131", "131"});
+       }
+
+       @Test(expected = NumberFormatException.class)
+       public void testStartupIncorrect_4() {
+               PythonDMLScript.main(new String[] {"Hello"});
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testStartupIncorrect_5() {
+               // Number out of range
+               PythonDMLScript.main(new String[] {"918757"});
+       }
 }

Reply via email to