MJ Deng created SPARK-48758:
-------------------------------

             Summary: Race condition between executor registration and heartbeat
                 Key: SPARK-48758
                 URL: https://issues.apache.org/jira/browse/SPARK-48758
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.4.3
            Reporter: MJ Deng


We found a race condition in our prod jobs when executor finished registration 
but when the it starts to heartbeat, the driver tells the executor is still 
unknown.

The below test case will demonstrate the issue:

test("test the executor registered states are consistent between scheduler and 
heartbeater") {
    val rpcEnv = sc.env.rpcEnv
    val fakeClusterManager = new FakeClusterManager(rpcEnv)
    val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", 
fakeClusterManager)
    val listenerBus = new LiveListenerBus(sc.conf)
    val mockMetricsSystem = mock(classOf[MetricsSystem])
    listenerBus.start(sc, mockMetricsSystem)
    when(sc.listenerBus).thenReturn(listenerBus)
    when(sc.heartbeatReceiver).thenReturn(heartbeatReceiverRef)
    val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, 
fakeClusterManagerRef)
    when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
 
    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
 
    // simulate the slow event listener in the listener queue before the 
heartbeatReceiver listener
    sc.listenerBus.addToManagementQueue(new SparkListener() {
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
        if (executorAdded.executorId != "driver") {
          Thread.sleep(2000)
        }
      }
    })
    // register the heartbeatReceiver listener to make sure it's after the
    // slow listener above
    sc.listenerBus.addToManagementQueue(heartbeatReceiver)
 
    fakeSchedulerBackend.start()
    val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
    val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", 
dummyExecutorEndpoint1)
 
    // register the executor
    fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
Map.empty))
 
    // simulate the executor starting to heartbeat
    val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
        Heartbeat(executorId1,
          Array.empty[(Long, Seq[AccumulatorV2[_, _]])],
          BlockManagerId(executorId1, "1.2.3.4", 8008),
          new ExecutorMetrics(Array.empty[Long])))
 
    assert(!response.reregisterBlockManager)
  }
 




Will proposed a PR for the fix soon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to