[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r118547246
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -170,11 +170,17 @@ private[spark] class BlockManager(
   // service, or just our own Executor's BlockManager.
   private[spark] var shuffleServerId: BlockManagerId = _
 
+  private val registrationTimeout =
+conf.getTimeAsMs("spark.shuffle.registration.timeout", "5s")
--- End diff --

For new configurations, should we be putting these into the `config` 
package object? See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala
 and https://github.com/apache/spark/pull/10205


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-05-26 Thread liyichao
Github user liyichao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r118671187
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -170,11 +170,17 @@ private[spark] class BlockManager(
   // service, or just our own Executor's BlockManager.
   private[spark] var shuffleServerId: BlockManagerId = _
 
+  private val registrationTimeout =
+conf.getTimeAsMs("spark.shuffle.registration.timeout", "5s")
--- End diff --

Updated and add unit test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269084
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
--- End diff --

I'm afraid that this may lead to flakiness in Jenkins: we run multiple 
concurrent builds on the machine and they aren't containerized, so hardcoding 
ports in unit tests risks port conflicts (especially when several jobs kick off 
at about the same time; this actually _is_ an issue in practice).

If you need to know the port that it binds to then I would recommend using 
`Utils.startServiceOnPort` (see examples of this in existing tests elsewhere in 
the codebase, such as in the Kafka module).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269114
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
+val tryAgainMsg = "test_spark_20640_try_again"
+conf.set("spark.shuffle.service.enabled", "true")
+conf.set("spark.shuffle.service.port", shufflePort.toString)
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(): TransportServer = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
--- End diff --

Do we actually need this sleep? What if we just simply never returned any 
response if `attempt < 2`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269120
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
+val tryAgainMsg = "test_spark_20640_try_again"
+conf.set("spark.shuffle.service.enabled", "true")
+conf.set("spark.shuffle.service.port", shufflePort.toString)
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(): TransportServer = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  transCtx.createServer(shufflePort, 
Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava)
--- End diff --

Nit: you can just write `Seq.empty[TransportServerBootstrap].asJava`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269157
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
+val tryAgainMsg = "test_spark_20640_try_again"
+conf.set("spark.shuffle.service.enabled", "true")
+conf.set("spark.shuffle.service.port", shufflePort.toString)
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(): TransportServer = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  transCtx.createServer(shufflePort, 
Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava)
+}
+newShuffleServer()
+
+conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
+conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+var e = intercept[SparkException]{
+  makeBlockManager(8000, "executor1")
+}.getMessage
+assert(e.contains("TimeoutException"))
+
+conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
+conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+e = intercept[SparkException]{
--- End diff --

Ahh, I see why you needed the `sleep()` above: so we can actually return an 
error in the non-timeout case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269201
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
+val tryAgainMsg = "test_spark_20640_try_again"
+conf.set("spark.shuffle.service.enabled", "true")
+conf.set("spark.shuffle.service.port", shufflePort.toString)
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(): TransportServer = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  transCtx.createServer(shufflePort, 
Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava)
+}
+newShuffleServer()
--- End diff --

Do we need to shut down the server created here or otherwise perform any 
cleanup after the test finishes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121269224
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 ---
@@ -60,10 +61,19 @@
   public ExternalShuffleClient(
   TransportConf conf,
   SecretKeyHolder secretKeyHolder,
-  boolean authEnabled) {
+  boolean authEnabled,
+  long registrationTimeoutMilli) {
 this.conf = conf;
 this.secretKeyHolder = secretKeyHolder;
 this.authEnabled = authEnabled;
+this.registrationTimeoutMilli = registrationTimeoutMilli;
+  }
+
+  public ExternalShuffleClient(
--- End diff --

Do we actually need this backwards-compatible constructor? AFAIK this 
interface is internal only and shouldn't be used from outside of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-11 Thread liyichao
Github user liyichao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r121281612
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val shufflePort = 1
+val tryAgainMsg = "test_spark_20640_try_again"
+conf.set("spark.shuffle.service.enabled", "true")
+conf.set("spark.shuffle.service.port", shufflePort.toString)
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(): TransportServer = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  transCtx.createServer(shufflePort, 
Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava)
+}
+newShuffleServer()
+
+conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
+conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+var e = intercept[SparkException]{
+  makeBlockManager(8000, "executor1")
+}.getMessage
+assert(e.contains("TimeoutException"))
+
+conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
+conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+e = intercept[SparkException]{
--- End diff --

Hi, what's your suggestion?  When attempt < 2, we already return an error 
`tryAgainMsg`. The request must fail if specified time is not passed, and 
succeed otherwise, there seems to be no other choice besides `sleep`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-15 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122238555
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
 ---
@@ -61,7 +61,7 @@ public MesosExternalShuffleClient(
   TransportConf conf,
   SecretKeyHolder secretKeyHolder,
   boolean authEnabled) {
-super(conf, secretKeyHolder, authEnabled);
+super(conf, secretKeyHolder, authEnabled, 5000);
--- End diff --

Let's put this magic number into a config value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122617448
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
--- End diff --

nit:
```
def xxxI(
para1: XXX
para2: XXX): XXX
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122617592
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  (transCtx.createServer(port, 
Seq.empty[TransportServerBootstrap].asJava), port)
+}
+val candidatePort = RandomUtils.nextInt(1024, 65536)
+val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
--- End diff --

will this be flaky? e.g. the port is occupied by other test suites


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread liyichao
Github user liyichao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122620196
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+  val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+  msgObj match {
+case exec: RegisterExecutor =>
+  Thread.sleep(50)
+  val attempt = attempts.getOrElse(exec.execId, 0) + 1
+  attempts(exec.execId) = attempt
+  if (attempt < 2) {
+callback.onFailure(new Exception(tryAgainMsg))
+return
+  }
+  callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+  }
+}
+  }
+
+  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
+  val transCtx = new TransportContext(transConf, handler, true)
+  (transCtx.createServer(port, 
Seq.empty[TransportServerBootstrap].asJava), port)
+}
+val candidatePort = RandomUtils.nextInt(1024, 65536)
+val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
--- End diff --

No, because `startServiceOnPort` will handle the conflicted port case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread liyichao
Github user liyichao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122620600
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(client: TransportClient, message: ByteBuffer,
--- End diff --

Updated. By the way, I am a little confused.

First, when you insert line break before, Intellij auto indent like this:

```
override def receive(
  client: TransportClient
```

Second, in the same file, at near 1349, `fetchBlocks`'s indent is like this:

```
override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
shuffleFiles: Array[File]): Unit = {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122620995
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,61 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(
+ client: TransportClient,
--- End diff --

I mean 4 spaces indention, not to align with `def`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-18 Thread liyichao
Github user liyichao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122621671
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1281,6 +1286,61 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
+val tryAgainMsg = "test_spark_20640_try_again"
+// a server which delays response 50ms and must try twice for success.
+def newShuffleServer(port: Int): (TransportServer, Int) = {
+  val attempts = new mutable.HashMap[String, Int]()
+  val handler = new NoOpRpcHandler {
+override def receive(
+ client: TransportClient,
--- End diff --

Oh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-19 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122661511
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 ---
@@ -49,6 +49,7 @@
   private final TransportConf conf;
   private final boolean authEnabled;
   private final SecretKeyHolder secretKeyHolder;
+  private final long registrationTimeoutMilli;
--- End diff --

"MS" or "Millis" is more consistent. Milli suggests something different. 
https://en.wikipedia.org/wiki/Milli_Vanilli


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122918254
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 ---
@@ -132,7 +135,7 @@ public void registerWithShuffleServer(
 checkInit();
 try (TransportClient client = 
clientFactory.createUnmanagedClient(host, port)) {
   ByteBuffer registerMessage = new RegisterExecutor(appId, execId, 
executorInfo).toByteBuffer();
-  client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
--- End diff --

previous we use `Ms`, can we keep this instead of `Millis`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122918471
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -303,6 +303,16 @@ package object config {
   .bytesConf(ByteUnit.BYTE)
   .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_REGISTRATION_TIMEOUT =
+ConfigBuilder("spark.shuffle.registration.timeout")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefault(5000)
+
+  private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS =
--- End diff --

ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r122918448
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -303,6 +303,16 @@ package object config {
   .bytesConf(ByteUnit.BYTE)
   .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_REGISTRATION_TIMEOUT =
+ConfigBuilder("spark.shuffle.registration.timeout")
--- End diff --

can you add `.doc("xxx")` to explain it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-20 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r123040340
  
--- Diff: docs/configuration.md ---
@@ -639,6 +639,20 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
+  spark.shuffle.registration.timeout
+  5000
+  
+Timeout in milliseconds for registration to the external service.
+  
+
+
+  spark.shuffle.registration.maxAttempts
+  3
+  
+When we fail to register to the external service, we will retry for 
maxAttempts times.
--- End diff --

nit:  When we fail to register to the external shuffle service, we will 
retry for maxAttempts times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-20 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18092#discussion_r123040230
  
--- Diff: docs/configuration.md ---
@@ -639,6 +639,20 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
+  spark.shuffle.registration.timeout
+  5000
+  
+Timeout in milliseconds for registration to the external service.
--- End diff --

nit: Timeout in milliseconds for registration to the external shuffle 
service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...

2017-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18092


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org