Repository: spark
Updated Branches:
  refs/heads/master 1d5d2a9d0 -> c86a57f4d


[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.

## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId 
will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching 
exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to 
https://github.com/apache/spark/pull/16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch 
involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinx...@meituan.com>

Closes #16779 from jinxing64/SPARK-19437.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c86a57f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c86a57f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c86a57f4

Branch: refs/heads/master
Commit: c86a57f4d1a39ab9602733a09d8fec13506cc6d4
Parents: 1d5d2a9
Author: jinxing <jinx...@meituan.com>
Authored: Thu Feb 2 23:18:16 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Feb 2 23:18:16 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/HeartbeatReceiverSuite.scala   | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c86a57f4/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 7b6a231..8891648 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -46,8 +46,8 @@ class HeartbeatReceiverSuite
   with PrivateMethodTester
   with LocalSparkContext {
 
-  private val executorId1 = "executor-1"
-  private val executorId2 = "executor-2"
+  private val executorId1 = "1"
+  private val executorId2 = "2"
 
   // Shared state that must be reset before and after each test
   private var scheduler: TaskSchedulerImpl = null
@@ -93,12 +93,12 @@ class HeartbeatReceiverSuite
 
   test("task scheduler is set correctly") {
     assert(heartbeatReceiver.scheduler === null)
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     assert(heartbeatReceiver.scheduler !== null)
   }
 
   test("normal heartbeat") {
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
     triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -116,14 +116,14 @@ class HeartbeatReceiverSuite
   }
 
   test("reregister if heartbeat from unregistered executor") {
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     // Received heartbeat from unknown executor, so we ask it to re-register
     triggerHeartbeat(executorId1, executorShouldReregister = true)
     assert(getTrackedExecutors.isEmpty)
   }
 
   test("reregister if heartbeat from removed executor") {
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
     // Remove the second executor but not the first
@@ -140,7 +140,7 @@ class HeartbeatReceiverSuite
 
   test("expire dead hosts") {
     val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
     triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -149,7 +149,7 @@ class HeartbeatReceiverSuite
     heartbeatReceiverClock.advance(executorTimeout / 2)
     triggerHeartbeat(executorId1, executorShouldReregister = false)
     heartbeatReceiverClock.advance(executorTimeout)
-    heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
+    heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
     // Only the second executor should be expired as a dead host
     verify(scheduler).executorLost(Matchers.eq(executorId2), any())
     val trackedExecutors = getTrackedExecutors
@@ -173,11 +173,11 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", 
dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", 
dummyExecutorEndpoint2)
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
+    fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
       RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
Map.empty))
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
+    fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
       RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, 
Map.empty))
-    heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
     triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -195,7 +195,7 @@ class HeartbeatReceiverSuite
     // Here we use a timeout of O(seconds), but in practice this whole test 
takes O(10ms).
     val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
     heartbeatReceiverClock.advance(executorTimeout * 2)
-    heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
+    heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
     val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
     killThread.shutdown() // needed for awaitTermination
     killThread.awaitTermination(10L, TimeUnit.SECONDS)
@@ -213,7 +213,7 @@ class HeartbeatReceiverSuite
       executorShouldReregister: Boolean): Unit = {
     val metrics = TaskMetrics.empty
     val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
-    val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
+    val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
       Heartbeat(executorId, Array(1L -> metrics.accumulators()), 
blockManagerId))
     if (executorShouldReregister) {
       assert(response.reregisterBlockManager)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to