Repository: spark Updated Branches: refs/heads/master 2dd37d827 -> d3abb3699
[SPARK-21788][SS] Handle more exceptions when stopping a streaming query ## What changes were proposed in this pull request? Add more cases we should view as a normal query stop rather than a failure. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <zsxw...@gmail.com> Closes #18997 from zsxwing/SPARK-21788. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3abb369 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3abb369 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3abb369 Branch: refs/heads/master Commit: d3abb36990d928a8445a8c69ddebeabdfeb1484d Parents: 2dd37d8 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Thu Aug 24 10:23:59 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Aug 24 10:23:59 2017 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 34 ++++++++++- .../spark/sql/streaming/StreamSuite.scala | 60 +++++++++++++++++++- 2 files changed, 89 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 432b2d4..c224f2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.execution.streaming -import java.io.{InterruptedIOException, IOException} +import java.io.{InterruptedIOException, IOException, UncheckedIOException} +import java.nio.channels.ClosedByInterruptException import java.util.UUID -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantLock @@ -27,6 +28,7 @@ import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -335,7 +337,7 @@ class StreamExecution( // `stop()` is already called. Let `finally` finish the cleanup. } } catch { - case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED => + case e if isInterruptedByStop(e) => // interrupted by stop() updateStatusMessage("Stopped") case e: IOException if e.getMessage != null @@ -407,6 +409,32 @@ class StreamExecution( } } + private def isInterruptedByStop(e: Throwable): Boolean = { + if (state.get == TERMINATED) { + e match { + // InterruptedIOException - thrown when an I/O operation is interrupted + // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted + case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException => + true + // The cause of the following exceptions may be one of the above exceptions: + // + // UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as + // BiFunction.apply + // ExecutionException - thrown by codes running in a thread pool and these codes throw an + // exception + // UncheckedExecutionException - thrown by codes that cannot throw a checked + // ExecutionException, such as BiFunction.apply + case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException) + if e2.getCause != null => + isInterruptedByStop(e2.getCause) + case _ => + false + } + } else { + false + } + } + /** * Populate the start offsets to start the execution at the current offsets stored in the sink * (i.e. avoid reprocessing data that we have already processed). This function must be called http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 012cccf..d0b2041 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.streaming -import java.io.{File, InterruptedIOException, IOException} -import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} +import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} +import java.nio.channels.ClosedByInterruptException +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} import scala.reflect.ClassTag import scala.util.control.ControlThrowable +import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -691,6 +693,31 @@ class StreamSuite extends StreamTest { } } } + + for (e <- Seq( + new InterruptedException, + new InterruptedIOException, + new ClosedByInterruptException, + new UncheckedIOException("test", new ClosedByInterruptException), + new ExecutionException("test", new InterruptedException), + new UncheckedExecutionException("test", new InterruptedException))) { + test(s"view ${e.getClass.getSimpleName} as a normal query stop") { + ThrowingExceptionInCreateSource.createSourceLatch = new CountDownLatch(1) + ThrowingExceptionInCreateSource.exception = e + val query = spark + .readStream + .format(classOf[ThrowingExceptionInCreateSource].getName) + .load() + .writeStream + .format("console") + .start() + assert(ThrowingExceptionInCreateSource.createSourceLatch + .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS), + "ThrowingExceptionInCreateSource.createSource wasn't called before timeout") + query.stop() + assert(query.exception.isEmpty) + } + } } abstract class FakeSource extends StreamSourceProvider { @@ -824,3 +851,32 @@ class TestStateStoreProvider extends StateStoreProvider { override def getStore(version: Long): StateStore = null } + +/** A fake source that throws `ThrowingExceptionInCreateSource.exception` in `createSource` */ +class ThrowingExceptionInCreateSource extends FakeSource { + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + ThrowingExceptionInCreateSource.createSourceLatch.countDown() + try { + Thread.sleep(30000) + throw new TimeoutException("sleep was not interrupted in 30 seconds") + } catch { + case _: InterruptedException => + throw ThrowingExceptionInCreateSource.exception + } + } +} + +object ThrowingExceptionInCreateSource { + /** + * A latch to allow the user to wait until `ThrowingExceptionInCreateSource.createSource` is + * called. + */ + @volatile var createSourceLatch: CountDownLatch = null + @volatile var exception: Exception = null +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org