This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 810ae1f  [SYSTEMDS-3065] Close SparkContext silently
810ae1f is described below

commit 810ae1f7009085d58e32e6e493fdf7be100f2780
Author: baunsgaard <[email protected]>
AuthorDate: Sat Jul 17 12:33:23 2021 +0200

    [SYSTEMDS-3065] Close SparkContext silently
    
    When executing sequences of spark instructions, then sometimes the
    spark context throw errors when closing the context after all results
    are correctly calculated and returned.
    
    These errors are known to be inconsequential and therefore only confuse
    a user. This commit remove these error messages limited to the content of
    messages produced in closing the spark context after execution is done.
---
 .../context/SparkExecutionContext.java             | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index cfaa50e..1f6c275 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -19,9 +19,22 @@
 
 package org.apache.sysds.runtime.controlprogram.context;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -49,8 +62,8 @@ import 
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
@@ -78,16 +91,8 @@ import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.utils.MLContextProxy;
 import org.apache.sysds.utils.Statistics;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
+import scala.Tuple2;
 
 
 public class SparkExecutionContext extends ExecutionContext
@@ -161,12 +166,24 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        public void close() {
-               synchronized( SparkExecutionContext.class ) {
-                       if( _spctx != null ) {
-                               //stop the spark context if existing
+               synchronized( SparkExecutionContext.class) {
+                       if(_spctx != null) {
+                               Logger spL = 
Logger.getLogger("org.apache.spark.network.client.TransportResponseHandler");
+                               spL.setLevel(Level.FATAL);
+                               OutputStream buff = new OutputStream() {
+                                       @Override
+                                       public void write(int b) {
+                                               // do Nothing
+                                       }
+                               };
+                               PrintStream old = System.err;
+                               System.setErr(new PrintStream(buff));
+                               // stop the spark context if existing
                                _spctx.stop();
-                               //make sure stopped context is never used again
+                               // make sure stopped context is never used again
                                _spctx = null;
+                               System.setErr(old);
+                               spL.setLevel(Level.ERROR);
                        }
                }
        }
@@ -1018,7 +1035,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
 
                        if( list.size()>1 )
-                               throw new DMLRuntimeException("Expecting no 
more than one result block.");
+                               throw new DMLRuntimeException("Expecting no 
more than one result block but got: " + list.size());
                        else if( list.size()==1 )
                                out = list.get(0)._2();
                        else //empty (e.g., after ops w/ outputEmpty=false)

Reply via email to