spark git commit: [SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes
Repository: spark Updated Branches: refs/heads/branch-1.6 8e9a60031 -> 93095eb29 [SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild. Author: Bryan Cutler Closes #10284 from BryanCutler/async-MasterUI-SPARK-12062. (cherry picked from commit c5b6b398d5e368626e589feede80355fb74c2bd8) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93095eb2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93095eb2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93095eb2 Branch: refs/heads/branch-1.6 Commit: 93095eb29a1e59dbdbf6220bfa732b502330e6ae Parents: 8e9a600 Author: Bryan Cutler Authored: Tue Dec 15 18:28:16 2015 -0800 Committer: Andrew Or Committed: Tue Dec 15 18:28:26 2015 -0800 -- .../org/apache/spark/deploy/master/Master.scala | 79 +--- .../spark/deploy/master/MasterMessages.scala| 2 + 2 files changed, 52 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93095eb2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1355e1a..fc42bf0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -21,9 +21,11 @@ import java.io.FileNotFoundException import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps import scala.util.Random @@ -56,6 +58,10 @@ private[deploy] class Master( private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") + private val rebuildUIThread = +ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread") + private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread) + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) private def createDateFormat = new SimpleDateFormat("MMddHHmmss") // For application IDs @@ -78,7 +84,8 @@ private[deploy] class Master( private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] private val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 - private val appIdToUI = new HashMap[String, SparkUI] + // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI + private val appIdToUI = new ConcurrentHashMap[String, SparkUI] private val drivers = new HashSet[DriverInfo] private val completedDrivers = new ArrayBuffer[DriverInfo] @@ -191,6 +198,7 @@ private[deploy] class Master( checkForWorkerTimeOutTask.cancel(true) } forwardMessageThread.shutdownNow() +rebuildUIThread.shutdownNow() webUi.stop() restServer.foreach(_.stop()) masterMetricsSystem.stop() @@ -367,6 +375,10 @@ private[deploy] class Master( case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + +case AttachCompletedRebuildUI(appId) => + // An asyncRebuildSparkUI has completed, so need to attach to master webUi + Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -809,7 +821,7 @@ private[deploy] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { - appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) } + Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) @@ -818,7 +830,7 @@ private[deploy] class Master( waitingApps -= app // If application events are logged, use them to rebuild the UI - rebuildSparkUI(app) + asyncRebuildSparkUI(app) for (exec <- app.executors.values) { killExecutor(exec) @@ -923,49 +935,57 @@ private[deploy] c
spark git commit: [SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes
Repository: spark Updated Branches: refs/heads/master 8a215d233 -> c5b6b398d [SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild. Author: Bryan Cutler Closes #10284 from BryanCutler/async-MasterUI-SPARK-12062. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5b6b398 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5b6b398 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5b6b398 Branch: refs/heads/master Commit: c5b6b398d5e368626e589feede80355fb74c2bd8 Parents: 8a215d2 Author: Bryan Cutler Authored: Tue Dec 15 18:28:16 2015 -0800 Committer: Andrew Or Committed: Tue Dec 15 18:28:16 2015 -0800 -- .../org/apache/spark/deploy/master/Master.scala | 79 +--- .../spark/deploy/master/MasterMessages.scala| 2 + 2 files changed, 52 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5b6b398/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1355e1a..fc42bf0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -21,9 +21,11 @@ import java.io.FileNotFoundException import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps import scala.util.Random @@ -56,6 +58,10 @@ private[deploy] class Master( private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") + private val rebuildUIThread = +ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread") + private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread) + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) private def createDateFormat = new SimpleDateFormat("MMddHHmmss") // For application IDs @@ -78,7 +84,8 @@ private[deploy] class Master( private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] private val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 - private val appIdToUI = new HashMap[String, SparkUI] + // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI + private val appIdToUI = new ConcurrentHashMap[String, SparkUI] private val drivers = new HashSet[DriverInfo] private val completedDrivers = new ArrayBuffer[DriverInfo] @@ -191,6 +198,7 @@ private[deploy] class Master( checkForWorkerTimeOutTask.cancel(true) } forwardMessageThread.shutdownNow() +rebuildUIThread.shutdownNow() webUi.stop() restServer.foreach(_.stop()) masterMetricsSystem.stop() @@ -367,6 +375,10 @@ private[deploy] class Master( case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + +case AttachCompletedRebuildUI(appId) => + // An asyncRebuildSparkUI has completed, so need to attach to master webUi + Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -809,7 +821,7 @@ private[deploy] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { - appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) } + Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) @@ -818,7 +830,7 @@ private[deploy] class Master( waitingApps -= app // If application events are logged, use them to rebuild the UI - rebuildSparkUI(app) + asyncRebuildSparkUI(app) for (exec <- app.executors.values) { killExecutor(exec) @@ -923,49 +935,57 @@ private[deploy] class Master( * Return the UI if successful, else None */ private[master] def rebuildSparkUI(a