This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8c0e2d8  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…
8c0e2d8 is described below

commit 8c0e2d838c2aa386bc164dd960d12312366d3732
Author: Jie <gt.hu.ch...@gmail.com>
AuthorDate: Tue Jul 20 21:23:51 2021 -0500

    [SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
    
    ### What changes were proposed in this pull request?
    
    1. add "closeStreams" to FileAppender and RollingFileAppender
    2. set "closeStreams" to "true" in ExecutorRunner
    
    ### Why are the changes needed?
    
    The executor will hang when due disk full or other exceptions which 
happened in writting to outputStream: the root cause is the "inputStream" is 
not closed after the error happens:
    1. ExecutorRunner creates two files appenders for pipe: one for stdout, one 
for stderr
    2. FileAppender.appendStreamToFile exits the loop when writing to 
outputStream
    3. FileAppender closes the outputStream, but left the inputStream which 
refers the pipe's stdout and stderr opened
    4. The executor will hang when printing the log message if the pipe is full 
(no one consume the outputs)
    5. From the driver side, you can see the task can't be completed for ever
    
    With this fix, the step 4 will throw an exception, the driver can catch up 
the exception and reschedule the failed task to other executors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add new tests for the "closeStreams" in FileAppenderSuite
    
    Closes #33263 from jhu-chang/SPARK-35027.
    
    Authored-by: Jie <gt.hu.ch...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit 1a8c6755a1802afdb9a73793e9348d322176125a)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/deploy/worker/ExecutorRunner.scala       |  4 +--
 .../apache/spark/util/logging/FileAppender.scala   | 37 ++++++++++++++++------
 .../spark/util/logging/RollingFileAppender.scala   |  6 ++--
 .../org/apache/spark/util/FileAppenderSuite.scala  | 35 ++++++++++++++++++++
 4 files changed, 68 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2e26ccf..974c2d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(
 
       // Redirect its stdout and stderr to files
       val stdout = new File(executorDir, "stdout")
-      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
+      stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
 
       val stderr = new File(executorDir, "stderr")
       Files.write(header, stderr, StandardCharsets.UTF_8)
-      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
+      stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
 
       state = ExecutorState.RUNNING
       worker.send(ExecutorStateChanged(appId, execId, state, None, None))
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 7107be2..2243239 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
 /**
  * Continuously appends the data from an input stream into the given file.
  */
-private[spark] class FileAppender(inputStream: InputStream, file: File, 
bufferSize: Int = 8192)
-  extends Logging {
+private[spark] class FileAppender(
+  inputStream: InputStream,
+  file: File,
+  bufferSize: Int = 8192,
+  closeStreams: Boolean = false
+) extends Logging {
   @volatile private var outputStream: FileOutputStream = null
   @volatile private var markedForStop = false     // has the appender been 
asked to stopped
 
@@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, 
file: File, bufferSi
           }
         }
       } {
-        closeFile()
+        try {
+          if (closeStreams) {
+            inputStream.close()
+          }
+        } finally {
+          closeFile()
+        }
       }
     } catch {
       case e: Exception =>
@@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: 
InputStream, file: File, bufferSi
 private[spark] object FileAppender extends Logging {
 
   /** Create the right appender based on Spark configuration */
-  def apply(inputStream: InputStream, file: File, conf: SparkConf): 
FileAppender = {
+  def apply(
+    inputStream: InputStream,
+    file: File,
+    conf: SparkConf,
+    closeStreams: Boolean = false
+  ) : FileAppender = {
 
     val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
     val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
@@ -141,9 +156,10 @@ private[spark] object FileAppender extends Logging {
       validatedParams.map {
         case (interval, pattern) =>
           new RollingFileAppender(
-            inputStream, file, new TimeBasedRollingPolicy(interval, pattern), 
conf)
+            inputStream, file, new TimeBasedRollingPolicy(interval, pattern), 
conf,
+            closeStreams = closeStreams)
       }.getOrElse {
-        new FileAppender(inputStream, file)
+        new FileAppender(inputStream, file, closeStreams = closeStreams)
       }
     }
 
@@ -151,17 +167,18 @@ private[spark] object FileAppender extends Logging {
       rollingSizeBytes match {
         case IntParam(bytes) =>
           logInfo(s"Rolling executor logs enabled for $file with rolling every 
$bytes bytes")
-          new RollingFileAppender(inputStream, file, new 
SizeBasedRollingPolicy(bytes), conf)
+          new RollingFileAppender(
+            inputStream, file, new SizeBasedRollingPolicy(bytes), conf, 
closeStreams = closeStreams)
         case _ =>
           logWarning(
             s"Illegal size [$rollingSizeBytes] for rolling executor logs, 
rolling logs not enabled")
-          new FileAppender(inputStream, file)
+          new FileAppender(inputStream, file, closeStreams = closeStreams)
       }
     }
 
     rollingStrategy match {
       case "" =>
-        new FileAppender(inputStream, file)
+        new FileAppender(inputStream, file, closeStreams = closeStreams)
       case "time" =>
         createTimeBasedAppender()
       case "size" =>
@@ -170,7 +187,7 @@ private[spark] object FileAppender extends Logging {
         logWarning(
           s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
             s"rolling logs not enabled")
-        new FileAppender(inputStream, file)
+        new FileAppender(inputStream, file, closeStreams = closeStreams)
     }
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index b73f422..68a5923 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -36,14 +36,16 @@ import org.apache.spark.internal.config
  * @param rollingPolicy           Policy based on which files will be rolled 
over.
  * @param conf                    SparkConf that is used to pass on extra 
configurations
  * @param bufferSize              Optional buffer size. Used mainly for 
testing.
+ * @param closeStreams            Option flag: whether to close the 
inputStream at the end.
  */
 private[spark] class RollingFileAppender(
     inputStream: InputStream,
     activeFile: File,
     val rollingPolicy: RollingPolicy,
     conf: SparkConf,
-    bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
-  ) extends FileAppender(inputStream, activeFile, bufferSize) {
+    bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE,
+    closeStreams: Boolean = false
+  ) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) {
 
   private val maxRetainedFiles = 
conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
   private val enableCompression = 
conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 12d9757..71010a1 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + 
testString)
   }
 
+  test("SPARK-35027: basic file appender - close stream") {
+    val inputStream = mock(classOf[InputStream])
+    val appender = new FileAppender(inputStream, testFile, closeStreams = true)
+    Thread.sleep(10)
+    appender.stop()
+    appender.awaitTermination()
+    verify(inputStream).close()
+  }
+
   test("rolling file appender - time-based rolling") {
     // setup input stream and appender
     val testOutputStream = new PipedOutputStream()
@@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
       appender, testOutputStream, textToAppend, rolloverIntervalMillis, 
isCompressed = true)
   }
 
+  test("SPARK-35027: rolling file appender - time-based rolling close stream") 
{
+    val inputStream = mock(classOf[InputStream])
+    val sparkConf = new SparkConf()
+    sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time")
+    val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams 
= true)
+    assert(
+      
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy])
+    Thread.sleep(10)
+    appender.stop()
+    appender.awaitTermination()
+    verify(inputStream).close()
+  }
+
+  test("SPARK-35027: rolling file appender - size-based rolling close stream") 
{
+    val inputStream = mock(classOf[InputStream])
+    val sparkConf = new SparkConf()
+    sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")
+    val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams 
= true)
+    assert(
+      
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy])
+    Thread.sleep(10)
+    appender.stop()
+    appender.awaitTermination()
+    verify(inputStream).close()
+  }
+
   test("rolling file appender - size-based rolling") {
     // setup input stream and appender
     val testOutputStream = new PipedOutputStream()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to