Repository: spark
Updated Branches:
  refs/heads/branch-1.1 8c7957446 -> bd3ce2ffb


[SPARK-2677] BasicBlockFetchIterator#next can wait forever

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #1632 from sarutak/SPARK-2677 and squashes the following commits:

cddbc7b [Kousuke Saruta] Removed Exception throwing when 
ConnectionManager#handleMessage receives ack for non-referenced message
d3bd2a8 [Kousuke Saruta] Modified configuration.md for 
spark.core.connection.ack.timeout
e85f88b [Kousuke Saruta] Removed useless synchronized blocks
7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor 
ConnectionManager-wide
9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-2677
0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala
7cbb8ca [Kousuke Saruta] Modified to match with scalastyle
8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-2677
ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-2677
0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case 
remote Executor cannot ack
a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into SPARK-2677
9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala

(cherry picked from commit 76fa0eaf515fd6771cdd69422b1259485debcae5)
Signed-off-by: Josh Rosen <joshro...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd3ce2ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd3ce2ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd3ce2ff

Branch: refs/heads/branch-1.1
Commit: bd3ce2ffb8964abb4d59918ebb2c230fe4614aa2
Parents: 8c79574
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Sat Aug 16 14:15:58 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Sat Aug 16 14:16:15 2014 -0700

----------------------------------------------------------------------
 .../spark/network/ConnectionManager.scala       | 45 +++++++++++++++-----
 .../spark/network/ConnectionManagerSuite.scala  | 44 ++++++++++++++++++-
 docs/configuration.md                           |  9 ++++
 3 files changed, 87 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd3ce2ff/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 95f96b8..37d69a9 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -22,6 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
 import java.net._
+import java.util.{Timer, TimerTask}
 import java.util.concurrent.atomic.AtomicInteger
 
 import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
     var ackMessage: Option[Message] = None
 
     def markDone(ackMessage: Option[Message]) {
-      this.synchronized {
-        this.ackMessage = ackMessage
-        completionHandler(this)
-      }
+      this.ackMessage = ackMessage
+      completionHandler(this)
     }
   }
 
   private val selector = SelectorProvider.provider.openSelector()
+  private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
 
   // default to 30 second timeout waiting for authentication
   private val authTimeout = 
conf.getInt("spark.core.connection.auth.wait.timeout", 30)
+  private val ackTimeout = 
conf.getInt("spark.core.connection.ack.wait.timeout", 60)
 
   private val handleMessageExecutor = new ThreadPoolExecutor(
     conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
           }
         }
         if (bufferMessage.hasAckId()) {
-          val sentMessageStatus = messageStatuses.synchronized {
+          messageStatuses.synchronized {
             messageStatuses.get(bufferMessage.ackId) match {
               case Some(status) => {
                 messageStatuses -= bufferMessage.ackId
-                status
+                status.markDone(Some(message))
               }
               case None => {
-                throw new Exception("Could not find reference for received ack 
message " +
-                  message.id)
+                /**
+                 * We can fall down on this code because of following 2 cases
+                 *
+                 * (1) Invalid ack sent due to buggy code.
+                 *
+                 * (2) Late-arriving ack for a SendMessageStatus
+                 *     To avoid unwilling late-arriving ack
+                 *     caused by long pause like GC, you can set
+                 *     larger value than default to 
spark.core.connection.ack.wait.timeout
+                 */
+                logWarning(s"Could not find reference for received ack Message 
${message.id}")
               }
             }
           }
-          sentMessageStatus.markDone(Some(message))
         } else {
           var ackMessage : Option[Message] = None
           try {
@@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
   def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: 
Message)
       : Future[Message] = {
     val promise = Promise[Message]()
+
+    val timeoutTask = new TimerTask {
+      override def run(): Unit = {
+        messageStatuses.synchronized {
+          messageStatuses.remove(message.id).foreach ( s => {
+            promise.failure(
+              new IOException(s"sendMessageReliably failed because ack " +
+                "was not received within ${ackTimeout} sec"))
+          })
+        }
+      }
+    }
+
     val status = new MessageStatus(message, connectionManagerId, s => {
+      timeoutTask.cancel()
       s.ackMessage match {
-        case None =>  // Indicates a failure where we either never sent or 
never got ACK'd
+        case None => // Indicates a failure where we either never sent or 
never got ACK'd
           promise.failure(new IOException("sendMessageReliably failed without 
being ACK'd"))
         case Some(ackMessage) =>
           if (ackMessage.hasError) {
@@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
     messageStatuses.synchronized {
       messageStatuses += ((message.id, status))
     }
+
+    ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
     sendMessage(connectionManagerId, message)
     promise.future
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd3ce2ff/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
index 846537d..e2f4d4c 100644
--- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
@@ -19,14 +19,19 @@ package org.apache.spark.network
 
 import java.io.IOException
 import java.nio._
+import java.util.concurrent.TimeoutException
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.scalatest.FunSuite
 
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+import scala.concurrent.TimeoutException
 import scala.concurrent.{Await, TimeoutException}
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /**
   * Test the ConnectionManager with various security settings.
@@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {
 
   }
 
+  test("sendMessageReliably timeout") {
+    val clientConf = new SparkConf
+    clientConf.set("spark.authenticate", "false")
+    val ackTimeout = 30
+    clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+
+    val clientSecurityManager = new SecurityManager(clientConf)
+    val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
+
+    val serverConf = new SparkConf
+    serverConf.set("spark.authenticate", "false")
+    val serverSecurityManager = new SecurityManager(serverConf)
+    val managerServer = new ConnectionManager(0, serverConf, 
serverSecurityManager)
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      // sleep 60 sec > ack timeout for simulating server slow down or hang up
+      Thread.sleep(ackTimeout * 3 * 1000)
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+
+    val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
+
+    // Future should throw IOException in 30 sec.
+    // Otherwise TimeoutExcepton is thrown from Await.result.
+    // We expect TimeoutException is not thrown.
+    intercept[IOException] {
+      Await.result(future, (ackTimeout * 2) second)
+    }
+
+    manager.stop()
+    managerServer.stop()
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd3ce2ff/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index c408c46..981170d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -885,6 +885,15 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.core.connection.ack.wait.timeout</code></td>
+  <td>60</td>
+  <td>
+    Number of seconds for the connection to wait for ack to occur before timing
+    out and giving up. To avoid unwilling timeout caused by long pause like GC,
+    you can set larger value.
+  </td>
+</tr>
+<tr>
   <td><code>spark.ui.filters</code></td>
   <td>None</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to