Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207443099 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( + timeout: Int, + listenerBus: LiveListenerBus, + override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) + } + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = + new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { + super.onStart() + listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { + syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { + val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) + if (requests != null) { + requests.clear() + } + logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): Int = { + barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), 0) + barrierEpochByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Increase the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def increaseBarrierEpoch(stageId: Int, stageAttemptId: Int): Unit = { + val barrierEpoch = barrierEpochByStageIdAndAttempt.getOrDefault((stageId, stageAttemptId), -1) + if (barrierEpoch >= 0) { + barrierEpochByStageIdAndAttempt.put((stageId, stageAttemptId), barrierEpoch + 1) + } else { + // The stage attempt already finished, don't update barrier epoch. + } + } + + /** + * Cancel TimerTask for a stage attempt. + */ + private def cancelTimerTask(stageId: Int, stageAttemptId: Int): Unit = { + val timerTask = timerTaskByStageIdAndAttempt.get((stageId, stageAttemptId)) + if (timerTask != null) { --- End diff -- Not safe. Use ~~~scala val timerTask = timerTaskByStageIdAndAttempt.remove((stageId, stageAttemptId)) if (timerTask != null) { timerTask.cancel() } ~~~
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org