Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2db610100 -> b8fa79cec


[SPARK-13747][CORE] Add ThreadUtils.awaitReady and disallow Await.ready

## What changes were proposed in this pull request?

Add `ThreadUtils.awaitReady` similar to `ThreadUtils.awaitResult` and disallow 
`Await.ready`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #17763 from zsxwing/awaitready.

(cherry picked from commit 324a904d8e80089d8865e4c7edaedb92ab2ec1b2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8fa79ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8fa79ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8fa79ce

Branch: refs/heads/branch-2.2
Commit: b8fa79cec7a15e748bf9916e8a3c6476e0d350a3
Parents: 2db6101
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed May 17 17:21:46 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed May 17 17:21:56 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 +------
 .../org/apache/spark/storage/BlockManager.scala |  6 +++---
 .../org/apache/spark/util/ThreadUtils.scala     | 21 ++++++++++++++++++++
 .../org/apache/spark/SparkContextSuite.scala    |  5 ++---
 .../netty/NettyBlockTransferSecuritySuite.scala |  5 +++--
 .../scheduler/SchedulerIntegrationSuite.scala   |  4 ++--
 .../spark/storage/BlockInfoManagerSuite.scala   |  2 +-
 .../kinesis/KinesisCheckpointerSuite.scala      |  2 ++
 scalastyle-config.xml                           | 11 ++++++++++
 .../streaming/util/FileBasedWriteAheadLog.scala |  2 ++
 11 files changed, 49 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala 
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index a50600f..0899693 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -261,7 +261,7 @@ class JavaFutureActionWrapper[S, T](futureAction: 
FutureAction[S], converter: S
 
   private def getImpl(timeout: Duration): T = {
     // This will throw TimeoutException on timeout:
-    Await.ready(futureAction, timeout)
+    ThreadUtils.awaitReady(futureAction, timeout)
     futureAction.value.get match {
       case scala.util.Success(value) => converter(value)
       case scala.util.Failure(exception) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aab177f..35f6b36 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -618,12 +618,7 @@ class DAGScheduler(
       properties: Properties): Unit = {
     val start = System.nanoTime
     val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, 
properties)
-    // Note: Do not call Await.ready(future) because that calls 
`scala.concurrent.blocking`,
-    // which causes concurrent SQL executions to fail if a fork-join pool is 
used. Note that
-    // due to idiosyncrasies in Scala, `awaitPermission` is not actually used 
anywhere so it's
-    // safe to pass in null here. For more detail, see SPARK-13747.
-    val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
-    waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
+    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
     waiter.completionFuture.value.get match {
       case scala.util.Success(_) =>
         logInfo("Job %d finished: %s, took %f s".format

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ad0dc3c..6c363c5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,7 +23,7 @@ import java.nio.channels.Channels
 
 import scala.collection.mutable
 import scala.collection.mutable.HashMap
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.reflect.ClassTag
 import scala.util.Random
@@ -334,7 +334,7 @@ private[spark] class BlockManager(
     val task = asyncReregisterTask
     if (task != null) {
       try {
-        Await.ready(task, Duration.Inf)
+        ThreadUtils.awaitReady(task, Duration.Inf)
       } catch {
         case NonFatal(t) =>
           throw new Exception("Error occurred while waiting for async. 
reregistration", t)
@@ -909,7 +909,7 @@ private[spark] class BlockManager(
       if (level.replication > 1) {
         // Wait for asynchronous replication to finish
         try {
-          Await.ready(replicationFuture, Duration.Inf)
+          ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
         } catch {
           case NonFatal(t) =>
             throw new Exception("Error occurred while waiting for replication 
to finish", t)

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 1aa4456..81aaf79 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -206,4 +206,25 @@ private[spark] object ThreadUtils {
     }
   }
   // scalastyle:on awaitresult
+
+  // scalastyle:off awaitready
+  /**
+   * Preferred alternative to `Await.ready()`.
+   *
+   * @see [[awaitResult]]
+   */
+  @throws(classOf[SparkException])
+  def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type 
= {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+      // See SPARK-13747.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.ready(atMost)(awaitPermission)
+    } catch {
+      // TimeoutException is thrown in the current thread, so not need to warp 
the exception.
+      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+        throw new SparkException("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitready
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 7e26139..27945a9 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
-import scala.concurrent.Await
 
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
@@ -35,7 +34,7 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.Matchers._
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, 
SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 
 class SparkContextSuite extends SparkFunSuite with LocalSparkContext with 
Eventually {
@@ -315,7 +314,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
       val future = sc.parallelize(Seq(0)).foreachAsync(_ => 
{Thread.sleep(1000L)})
       sc.cancelJobGroup("nonExistGroupId")
-      Await.ready(future, Duration(2, TimeUnit.SECONDS))
+      ThreadUtils.awaitReady(future, Duration(2, TimeUnit.SECONDS))
 
       // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and 
cause
       // SparkContext to shutdown, so the following assertion will fail.

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index fe89558..792a1d7 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -22,7 +22,7 @@ import java.nio._
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.Promise
 import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 
@@ -36,6 +36,7 @@ import org.apache.spark.network.{BlockDataManager, 
BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.shuffle.BlockFetchingListener
 import org.apache.spark.storage.{BlockId, ShuffleBlockId}
+import org.apache.spark.util.ThreadUtils
 
 class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar 
with ShouldMatchers {
   test("security default off") {
@@ -166,7 +167,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite 
with MockitoSugar wi
         }
       })
 
-    Await.ready(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
+    ThreadUtils.awaitReady(promise.future, FiniteDuration(10, 
TimeUnit.SECONDS))
     promise.future.value.get
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 8300607..37b0898 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{TimeoutException, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Future
 import scala.concurrent.duration.{Duration, SECONDS}
 import scala.language.existentials
 import scala.reflect.ClassTag
@@ -260,7 +260,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
    */
   def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
     try {
-      Await.ready(jobFuture, duration)
+      ThreadUtils.awaitReady(jobFuture, duration)
     } catch {
       case te: TimeoutException if backendException.get() != null =>
         val msg = raw"""

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 1b32580..917db76 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -152,7 +152,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     // one should acquire the write lock. The second thread should block until 
the winner of the
     // write race releases its lock.
     val winningFuture: Future[Boolean] =
-      Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 
1.seconds)
+      ThreadUtils.awaitReady(Future.firstCompletedOf(Seq(lock1Future, 
lock2Future)), 1.seconds)
     assert(winningFuture.value.get.get)
     val winningTID = blockInfoManager.get("block").get.writerTask
     assert(winningTID === 1 || winningTID === 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
index fef24ed..8d56d4b 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -140,7 +140,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase
       ExecutionContext.global)
 
     intercept[TimeoutException] {
+      // scalastyle:off awaitready
       Await.ready(f, 50 millis)
+      // scalastyle:on awaitready
     }
 
     clock.advance(checkpointInterval.milliseconds / 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 1f48d71..0a4073b 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -203,6 +203,17 @@ This file is divided into 3 sections:
     ]]></customMessage>
   </check>
 
+  <check customId="awaitready" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter name="regex">Await\.ready</parameter></parameters>
+    <customMessage><![CDATA[
+      Are you sure that you want to use Await.ready? In most cases, you should 
use ThreadUtils.awaitReady instead.
+      If you must use Await.ready, wrap the code block with
+      // scalastyle:off awaitready
+      Await.ready(...)
+      // scalastyle:on awaitready
+    ]]></customMessage>
+  </check>
+
   <!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters 
-->
   <check customId="javaconversions" level="error" 
class="org.scalastyle.scalariform.TokenChecker" enabled="true">
     <parameters><parameter 
name="regex">JavaConversions</parameter></parameters>

http://git-wip-us.apache.org/repos/asf/spark/blob/b8fa79ce/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 845f554..1e5f187 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -189,7 +189,9 @@ private[streaming] class FileBasedWriteAheadLog(
           val f = Future { deleteFile(logInfo) }(executionContext)
           if (waitForCompletion) {
             import scala.concurrent.duration._
+            // scalastyle:off awaitready
             Await.ready(f, 1 second)
+            // scalastyle:on awaitready
           }
         } catch {
           case e: RejectedExecutionException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to