Repository: spark
Updated Branches:
  refs/heads/master 2dd37d827 -> d3abb3699


[SPARK-21788][SS] Handle more exceptions when stopping a streaming query

## What changes were proposed in this pull request?

Add more cases we should view as a normal query stop rather than a failure.

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #18997 from zsxwing/SPARK-21788.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3abb369
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3abb369
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3abb369

Branch: refs/heads/master
Commit: d3abb36990d928a8445a8c69ddebeabdfeb1484d
Parents: 2dd37d8
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Thu Aug 24 10:23:59 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Aug 24 10:23:59 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 34 ++++++++++-
 .../spark/sql/streaming/StreamSuite.scala       | 60 +++++++++++++++++++-
 2 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 432b2d4..c224f2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{InterruptedIOException, IOException}
+import java.io.{InterruptedIOException, IOException, UncheckedIOException}
+import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
@@ -27,6 +28,7 @@ import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -335,7 +337,7 @@ class StreamExecution(
         // `stop()` is already called. Let `finally` finish the cleanup.
       }
     } catch {
-      case _: InterruptedException | _: InterruptedIOException if state.get == 
TERMINATED =>
+      case e if isInterruptedByStop(e) =>
         // interrupted by stop()
         updateStatusMessage("Stopped")
       case e: IOException if e.getMessage != null
@@ -407,6 +409,32 @@ class StreamExecution(
     }
   }
 
+  private def isInterruptedByStop(e: Throwable): Boolean = {
+    if (state.get == TERMINATED) {
+      e match {
+        // InterruptedIOException - thrown when an I/O operation is interrupted
+        // ClosedByInterruptException - thrown when an I/O operation upon a 
channel is interrupted
+        case _: InterruptedException | _: InterruptedIOException | _: 
ClosedByInterruptException =>
+          true
+        // The cause of the following exceptions may be one of the above 
exceptions:
+        //
+        // UncheckedIOException - thrown by codes that cannot throw a checked 
IOException, such as
+        //                        BiFunction.apply
+        // ExecutionException - thrown by codes running in a thread pool and 
these codes throw an
+        //                      exception
+        // UncheckedExecutionException - thrown by codes that cannot throw a 
checked
+        //                               ExecutionException, such as 
BiFunction.apply
+        case e2 @ (_: UncheckedIOException | _: ExecutionException | _: 
UncheckedExecutionException)
+          if e2.getCause != null =>
+          isInterruptedByStop(e2.getCause)
+        case _ =>
+          false
+      }
+    } else {
+      false
+    }
+  }
+
   /**
    * Populate the start offsets to start the execution at the current offsets 
stored in the sink
    * (i.e. avoid reprocessing data that we have already processed). This 
function must be called

http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 012cccf..d0b2041 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.streaming
 
-import java.io.{File, InterruptedIOException, IOException}
-import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
+import java.io.{File, InterruptedIOException, IOException, 
UncheckedIOException}
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.{CountDownLatch, ExecutionException, 
TimeoutException, TimeUnit}
 
 import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
+import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 
@@ -691,6 +693,31 @@ class StreamSuite extends StreamTest {
       }
     }
   }
+
+  for (e <- Seq(
+    new InterruptedException,
+    new InterruptedIOException,
+    new ClosedByInterruptException,
+    new UncheckedIOException("test", new ClosedByInterruptException),
+    new ExecutionException("test", new InterruptedException),
+    new UncheckedExecutionException("test", new InterruptedException))) {
+    test(s"view ${e.getClass.getSimpleName} as a normal query stop") {
+      ThrowingExceptionInCreateSource.createSourceLatch = new CountDownLatch(1)
+      ThrowingExceptionInCreateSource.exception = e
+      val query = spark
+        .readStream
+        .format(classOf[ThrowingExceptionInCreateSource].getName)
+        .load()
+        .writeStream
+        .format("console")
+        .start()
+      assert(ThrowingExceptionInCreateSource.createSourceLatch
+        .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+        "ThrowingExceptionInCreateSource.createSource wasn't called before 
timeout")
+      query.stop()
+      assert(query.exception.isEmpty)
+    }
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {
@@ -824,3 +851,32 @@ class TestStateStoreProvider extends StateStoreProvider {
 
   override def getStore(version: Long): StateStore = null
 }
+
+/** A fake source that throws `ThrowingExceptionInCreateSource.exception` in 
`createSource` */
+class ThrowingExceptionInCreateSource extends FakeSource {
+
+  override def createSource(
+    spark: SQLContext,
+    metadataPath: String,
+    schema: Option[StructType],
+    providerName: String,
+    parameters: Map[String, String]): Source = {
+    ThrowingExceptionInCreateSource.createSourceLatch.countDown()
+    try {
+      Thread.sleep(30000)
+      throw new TimeoutException("sleep was not interrupted in 30 seconds")
+    } catch {
+      case _: InterruptedException =>
+        throw ThrowingExceptionInCreateSource.exception
+    }
+  }
+}
+
+object ThrowingExceptionInCreateSource {
+  /**
+   * A latch to allow the user to wait until 
`ThrowingExceptionInCreateSource.createSource` is
+   * called.
+   */
+  @volatile var createSourceLatch: CountDownLatch = null
+  @volatile var exception: Exception = null
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to