Repository: spark
Updated Branches:
  refs/heads/master 12f490b5c -> c979c8bba


[SPARK-14131][STREAMING] SQL Improved fix for avoiding potential deadlocks in 
HDFSMetadataLog

## What changes were proposed in this pull request?
Current fix for deadlock disables interrupts in the StreamExecution which 
getting offsets for all sources, and when writing to any metadata log, to avoid 
potential deadlocks in HDFSMetadataLog(see JIRA for more details). However, 
disabling interrupts can have unintended consequences in other sources. So I am 
making the fix more narrow, by disabling interrupt it only in the 
HDFSMetadataLog. This is a narrower fix for something risky like disabling 
interrupt.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #14292 from tdas/SPARK-14131.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c979c8bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c979c8bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c979c8bb

Branch: refs/heads/master
Commit: c979c8bba02bc89cb9ad81b212f085a8a5490a07
Parents: 12f490b
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Jul 25 16:09:22 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Jul 25 16:09:22 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 31 ++++++++++----
 .../execution/streaming/StreamExecution.scala   | 28 ++++---------
 .../streaming/FileStreamSinkLogSuite.scala      |  4 +-
 .../streaming/HDFSMetadataLogSuite.scala        | 10 +++--
 .../apache/spark/sql/test/SQLTestUtils.scala    | 43 +++++++++++++++++++-
 5 files changed, 80 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c979c8bb/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 069e41b..698f07b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.UninterruptibleThread
 
 
 /**
@@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
     serializer.deserialize[T](ByteBuffer.wrap(bytes))
   }
 
+  /**
+   * Store the metadata for the specified batchId and return `true` if 
successful. If the batchId's
+   * metadata has already been stored, this method will return `false`.
+   *
+   * Note that this method must be called on a 
[[org.apache.spark.util.UninterruptibleThread]]
+   * so that interrupts can be disabled while writing the batch file. This is 
because there is a
+   * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 
(HADOOP-10622). If the thread
+   * running "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our
+   * case, `writeBatch` creates a file using HDFS API and calls 
"Shell.runCommand" to set the
+   * file permissions, and can get deadlocked if the stream execution thread 
is stopped by
+   * interrupt. Hence, we make sure that this method is called on 
[[UninterruptibleThread]] which
+   * allows us to disable interrupts here. Also see SPARK-14131.
+   */
   override def add(batchId: Long, metadata: T): Boolean = {
     get(batchId).map(_ => false).getOrElse {
-      // Only write metadata when the batch has not yet been written.
-      try {
-        writeBatch(batchId, serialize(metadata))
-        true
-      } catch {
-        case e: IOException if "java.lang.InterruptedException" == 
e.getMessage =>
-          // create may convert InterruptedException to IOException. Let's 
convert it back to
-          // InterruptedException so that this failure won't crash 
StreamExecution
-          throw new InterruptedException("Creating file is interrupted")
+      // Only write metadata when the batch has not yet been written
+      Thread.currentThread match {
+        case ut: UninterruptibleThread =>
+          ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) }
+        case _ =>
+          throw new IllegalStateException(
+            "HDFSMetadataLog.add() must be executed on a 
o.a.spark.util.UninterruptibleThread")
       }
+      true
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c979c8bb/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index c90dcc5..af2229a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -110,7 +110,11 @@ class StreamExecution(
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
-  /** The thread that runs the micro-batches of this stream. */
+  /**
+   * The thread that runs the micro-batches of this stream. Note that this 
thread must be
+   * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential 
deadlocks in using
+   * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
+   */
   private[sql] val microBatchThread =
     new UninterruptibleThread(s"stream execution thread for $name") {
       override def run(): Unit = {
@@ -269,19 +273,11 @@ class StreamExecution(
    * batchId counter is incremented and a new log entry is written with the 
newest offsets.
    */
   private def constructNextBatch(): Unit = {
-    // There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
-    // If we interrupt some thread running Shell.runCommand, we may hit this 
issue.
-    // As "FileStreamSource.getOffset" will create a file using HDFS API and 
call "Shell.runCommand"
-    // to set the file permission, we should not interrupt "microBatchThread" 
when running this
-    // method. See SPARK-14131.
-    //
     // Check to see what new data is available.
     val hasNewData = {
       awaitBatchLock.lock()
       try {
-        val newData = microBatchThread.runUninterruptibly {
-          uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-        }
+        val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
         availableOffsets ++= newData
 
         if (dataAvailable) {
@@ -295,16 +291,8 @@ class StreamExecution(
       }
     }
     if (hasNewData) {
-      // There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
-      // If we interrupt some thread running Shell.runCommand, we may hit this 
issue.
-      // As "offsetLog.add" will create a file using HDFS API and call 
"Shell.runCommand" to set
-      // the file permission, we should not interrupt "microBatchThread" when 
running this method.
-      // See SPARK-14131.
-      microBatchThread.runUninterruptibly {
-        assert(
-          offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
-          s"Concurrent update to the log.  Multiple streaming jobs detected 
for $currentBatchId")
-      }
+      assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+        s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
       logInfo(s"Committed offsets for batch $currentBatchId.")
     } else {
       awaitBatchLock.lock()

http://git-wip-us.apache.org/repos/asf/spark/blob/c979c8bb/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index a7b2cfe..39fd1f0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -190,7 +190,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("compact") {
+  testWithUninterruptibleThread("compact") {
     withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
       withFileStreamSinkLog { sinkLog =>
         for (batchId <- 0 to 10) {
@@ -210,7 +210,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("delete expired file") {
+  testWithUninterruptibleThread("delete expired file") {
     // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting 
behaviour
     // deterministically
     withSQLConf(

http://git-wip-us.apache.org/repos/asf/spark/blob/c979c8bb/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ef2b479..ab5a2d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.sql.execution.streaming.FakeFileSystem._
 import 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, 
FileManager, FileSystemManager}
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.UninterruptibleThread
 
 class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
 
@@ -56,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("HDFSMetadataLog: basic") {
+  testWithUninterruptibleThread("HDFSMetadataLog: basic") {
     withTempDir { temp =>
       val dir = new File(temp, "dir") // use non-existent directory to test 
whether log make the dir
       val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
@@ -81,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+  testWithUninterruptibleThread(
+    "HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = 
true) {
     spark.conf.set(
       s"fs.$scheme.impl",
       classOf[FakeFileSystem].getName)
@@ -101,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("HDFSMetadataLog: restart") {
+  testWithUninterruptibleThread("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, 
temp.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
@@ -124,7 +126,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
       val waiter = new Waiter
       val maxBatchId = 100
       for (id <- 0 until 10) {
-        new Thread() {
+        new UninterruptibleThread(s"HDFSMetadataLog: metadata directory 
collision - thread $id") {
           override def run(): Unit = waiter {
             val metadataLog =
               new HDFSMetadataLog[String](spark, temp.getAbsolutePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/c979c8bb/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5286ee5..d4d8e3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,6 +22,7 @@ import java.util.UUID
 
 import scala.language.implicitConversions
 import scala.util.Try
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfterAll
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.FilterExec
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{UninterruptibleThread, Utils}
 
 /**
  * Helper trait that should be extended by all SQL test suites.
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
       }
     }
   }
+
+  /** Run a test on a separate [[UninterruptibleThread]]. */
+  protected def testWithUninterruptibleThread(name: String, quietly: Boolean = 
false)
+    (body: => Unit): Unit = {
+    val timeoutMillis = 10000
+    @transient var ex: Throwable = null
+
+    def runOnThread(): Unit = {
+      val thread = new UninterruptibleThread(s"Testing thread for test $name") 
{
+        override def run(): Unit = {
+          try {
+            body
+          } catch {
+            case NonFatal(e) =>
+              ex = e
+          }
+        }
+      }
+      thread.setDaemon(true)
+      thread.start()
+      thread.join(timeoutMillis)
+      if (thread.isAlive) {
+        thread.interrupt()
+        // If this interrupt does not work, then this thread is most likely 
running something that
+        // is not interruptible. There is not much point to wait for the 
thread to termniate, and
+        // we rather let the JVM terminate the thread on exit.
+        fail(
+          s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out 
after" +
+            s" $timeoutMillis ms")
+      } else if (ex != null) {
+        throw ex
+      }
+    }
+
+    if (quietly) {
+      testQuietly(name) { runOnThread() }
+    } else {
+      test(name) { runOnThread() }
+    }
+  }
 }
 
 private[sql] object SQLTestUtils {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to