MJ Deng created SPARK-48758: ------------------------------- Summary: Race condition between executor registration and heartbeat Key: SPARK-48758 URL: https://issues.apache.org/jira/browse/SPARK-48758 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.4.3 Reporter: MJ Deng
We found a race condition in our prod jobs when executor finished registration but when the it starts to heartbeat, the driver tells the executor is still unknown. The below test case will demonstrate the issue: test("test the executor registered states are consistent between scheduler and heartbeater") { val rpcEnv = sc.env.rpcEnv val fakeClusterManager = new FakeClusterManager(rpcEnv) val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager) val listenerBus = new LiveListenerBus(sc.conf) val mockMetricsSystem = mock(classOf[MetricsSystem]) listenerBus.start(sc, mockMetricsSystem) when(sc.listenerBus).thenReturn(listenerBus) when(sc.heartbeatReceiver).thenReturn(heartbeatReceiverRef) val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef) when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) // simulate the slow event listener in the listener queue before the heartbeatReceiver listener sc.listenerBus.addToManagementQueue(new SparkListener() { override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { if (executorAdded.executorId != "driver") { Thread.sleep(2000) } } }) // register the heartbeatReceiver listener to make sure it's after the // slow listener above sc.listenerBus.addToManagementQueue(heartbeatReceiver) fakeSchedulerBackend.start() val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv) val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) // register the executor fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) // simulate the executor starting to heartbeat val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( Heartbeat(executorId1, Array.empty[(Long, Seq[AccumulatorV2[_, _]])], BlockManagerId(executorId1, "1.2.3.4", 8008), new ExecutorMetrics(Array.empty[Long]))) assert(!response.reregisterBlockManager) } Will proposed a PR for the fix soon. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org