[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19698 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150655624 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -66,13 +66,8 @@ private[spark] class AppStatusStore(store: KVStore) { filtered.asScala.map(_.info).toSeq } - def executorSummary(executorId: String): Option[v1.ExecutorSummary] = { -try { - Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info) -} catch { - case _: NoSuchElementException => -None -} + def executorSummary(executorId: String): v1.ExecutorSummary = { + store.read(classOf[ExecutorSummaryWrapper], executorId).info --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150654983 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -1036,8 +1006,13 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - taskData.errorMessage.getOrElse(""), - logs) + info.errorMessage.getOrElse(""), + executorLogs(info.executorId)) + } + + private def executorLogs(id: String): Map[String, String] = { +executors.getOrElseUpdate(id, + store.executorSummary(id).map(_.executorLogs).getOrElse(Map.empty)) --- End diff -- ah right, good point --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150648166 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -182,173 +185,183 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } def render(request: HttpServletRequest): Seq[Node] = { -val listener = parent.jobProgresslistener +// stripXSS is called first to remove suspicious characters used in XSS attacks +val parameterId = UIUtils.stripXSS(request.getParameter("id")) +require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") -listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterId = UIUtils.stripXSS(request.getParameter("id")) - require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - - val jobId = parameterId.toInt - val jobDataOption = listener.jobIdToData.get(jobId) - if (jobDataOption.isEmpty) { -val content = - -No information to display for job {jobId} - -return UIUtils.headerSparkPage( - s"Details for Job $jobId", content, parent) - } - val jobData = jobDataOption.get - val isComplete = jobData.status != JobExecutionStatus.RUNNING - val stages = jobData.stageIds.map { stageId => -// This could be empty if the JobProgressListener hasn't received information about the -// stage or if the stage information has been garbage collected -listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) +val jobId = parameterId.toInt +val jobDataOption = Try(store.job(jobId)).toOption --- End diff -- I added a shared method; I'm not too happy with it, but this code needs to be resilient to data disappearing (either because events don't arrive or because data is cleaned up to save memory), so, there's not really a good way around it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150648236 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -1036,8 +1006,13 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - taskData.errorMessage.getOrElse(""), - logs) + info.errorMessage.getOrElse(""), + executorLogs(info.executorId)) + } + + private def executorLogs(id: String): Map[String, String] = { +executors.getOrElseUpdate(id, + store.executorSummary(id).map(_.executorLogs).getOrElse(Map.empty)) --- End diff -- Pretty sure there's a separate instance of this class per request. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150637980 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala --- @@ -19,35 +19,52 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import scala.collection.JavaConverters._ + +import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils} +import org.apache.spark.status.AppStatusStore +import org.apache.spark.ui._ /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") { +private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) + extends SparkUITab(parent, "jobs") { + val sc = parent.sc val killEnabled = parent.killEnabled - val jobProgresslistener = parent.jobProgressListener - val operationGraphListener = parent.operationGraphListener - def isFairScheduler: Boolean = -jobProgresslistener.schedulingMode == Some(SchedulingMode.FAIR) + def isFairScheduler: Boolean = { +val configName = "spark.scheduler.mode" +val config = sc match { + case Some(_sc) => +_sc.conf.getOption(configName) --- End diff -- how come this needs to check the sc.conf, but StagesTab doesn't? Also doesn't seem like the old code would check this either. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150622074 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -182,173 +185,183 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } def render(request: HttpServletRequest): Seq[Node] = { -val listener = parent.jobProgresslistener +// stripXSS is called first to remove suspicious characters used in XSS attacks +val parameterId = UIUtils.stripXSS(request.getParameter("id")) +require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") -listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterId = UIUtils.stripXSS(request.getParameter("id")) - require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - - val jobId = parameterId.toInt - val jobDataOption = listener.jobIdToData.get(jobId) - if (jobDataOption.isEmpty) { -val content = - -No information to display for job {jobId} - -return UIUtils.headerSparkPage( - s"Details for Job $jobId", content, parent) - } - val jobData = jobDataOption.get - val isComplete = jobData.status != JobExecutionStatus.RUNNING - val stages = jobData.stageIds.map { stageId => -// This could be empty if the JobProgressListener hasn't received information about the -// stage or if the stage information has been garbage collected -listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) +val jobId = parameterId.toInt +val jobDataOption = Try(store.job(jobId)).toOption +if (jobDataOption.isEmpty) { + val content = + + No information to display for job {jobId} + + return UIUtils.headerSparkPage( +s"Details for Job $jobId", content, parent) +} +val jobData = jobDataOption.get +val isComplete = jobData.status != JobExecutionStatus.RUNNING +val stages = jobData.stageIds.map { stageId => + // This could be empty if the listener hasn't received information about the + // stage or if the stage information has been garbage collected + store.stageData(stageId).lastOption.getOrElse { +new v1.StageData( + v1.StageStatus.PENDING, + stageId, + 0, 0, 0, 0, 0, 0, 0, + 0L, 0L, None, None, None, None, + 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, + "Unknown", + None, + "Unknown", + null, + Nil, + Nil, + None, + None, + Map()) } +} - val activeStages = Buffer[StageInfo]() - val completedStages = Buffer[StageInfo]() - // If the job is completed, then any pending stages are displayed as "skipped": - val pendingOrSkippedStages = Buffer[StageInfo]() - val failedStages = Buffer[StageInfo]() - for (stage <- stages) { -if (stage.submissionTime.isEmpty) { - pendingOrSkippedStages += stage -} else if (stage.completionTime.isDefined) { - if (stage.failureReason.isDefined) { -failedStages += stage - } else { -completedStages += stage - } +val activeStages = Buffer[v1.StageData]() +val completedStages = Buffer[v1.StageData]() +// If the job is completed, then any pending stages are displayed as "skipped": +val pendingOrSkippedStages = Buffer[v1.StageData]() +val failedStages = Buffer[v1.StageData]() +for (stage <- stages) { + if (stage.submissionTime.isEmpty) { +pendingOrSkippedStages += stage + } else if (stage.completionTime.isDefined) { +if (stage.status == v1.StageStatus.FAILED) { + failedStages += stage } else { - activeStages += stage + completedStages += stage } + } else { +activeStages += stage } +} - val basePath = "jobs/job" +val basePath = "jobs/job" - val pendingOrSkippedTableId = -if (isComplete) { - "pending" -} else { - "skipped" -} +val pendingOrSkippedTableId = + if (isComplete) { +"pending" + } else { +"skipped" + } - val activeStagesTable = -new StageTableBase(request,
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150631869 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -856,22 +832,24 @@ private[ui] class TaskTableRowData( val logs: Map[String, String]) private[ui] class TaskDataSource( -tasks: Seq[TaskUIData], +tasks: Seq[TaskData], hasAccumulators: Boolean, hasInput: Boolean, hasOutput: Boolean, hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, -lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, desc: Boolean, store: AppStatusStore) extends PagedDataSource[TaskTableRowData](pageSize) { import StagePage._ - // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table + // Keep an internal cache of executor log maps so that long task lists render faster. + private val executors = new HashMap[String, Map[String, String]]() --- End diff -- nit: rename to something like `executorIdToLogs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150632556 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -1036,8 +1006,13 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - taskData.errorMessage.getOrElse(""), - logs) + info.errorMessage.getOrElse(""), + executorLogs(info.executorId)) + } + + private def executorLogs(id: String): Map[String, String] = { +executors.getOrElseUpdate(id, + store.executorSummary(id).map(_.executorLogs).getOrElse(Map.empty)) --- End diff -- I think you need to protect `executors` from a race if two UI threads both call this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150626334 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala --- @@ -17,50 +17,49 @@ package org.apache.spark.ui.jobs +import java.util.NoSuchElementException import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.scheduler.StageInfo +import org.apache.spark.status.PoolData +import org.apache.spark.status.api.v1._ import org.apache.spark.ui.{UIUtils, WebUIPage} /** Page showing specific pool details */ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { - private val sc = parent.sc - private val listener = parent.progressListener def render(request: HttpServletRequest): Seq[Node] = { -listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname => -UIUtils.decodeURLParameter(poolname) - }.getOrElse { -throw new IllegalArgumentException(s"Missing poolname parameter") - } +// stripXSS is called first to remove suspicious characters used in XSS attacks +val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname => + UIUtils.decodeURLParameter(poolname) +}.getOrElse { + throw new IllegalArgumentException(s"Missing poolname parameter") +} - val poolToActiveStages = listener.poolToActiveStages - val activeStages = poolToActiveStages.get(poolName) match { -case Some(s) => s.values.toSeq -case None => Seq.empty[StageInfo] - } - val shouldShowActiveStages = activeStages.nonEmpty - val activeStagesTable = -new StageTableBase(request, activeStages, "", "activeStage", parent.basePath, "stages/pool", - parent.progressListener, parent.isFairScheduler, parent.killEnabled, - isFailedStage = false) +// For now, pool information is only accessible in live UIs --- End diff -- weird that the PoolPage is even hooked up when there isn't a live UI (but I think you have the right change here, I wouldn't change that behavior as part of this) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150622874 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -182,173 +185,183 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } def render(request: HttpServletRequest): Seq[Node] = { -val listener = parent.jobProgresslistener +// stripXSS is called first to remove suspicious characters used in XSS attacks +val parameterId = UIUtils.stripXSS(request.getParameter("id")) +require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") -listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterId = UIUtils.stripXSS(request.getParameter("id")) - require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - - val jobId = parameterId.toInt - val jobDataOption = listener.jobIdToData.get(jobId) - if (jobDataOption.isEmpty) { -val content = - -No information to display for job {jobId} - -return UIUtils.headerSparkPage( - s"Details for Job $jobId", content, parent) - } - val jobData = jobDataOption.get - val isComplete = jobData.status != JobExecutionStatus.RUNNING - val stages = jobData.stageIds.map { stageId => -// This could be empty if the JobProgressListener hasn't received information about the -// stage or if the stage information has been garbage collected -listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) +val jobId = parameterId.toInt +val jobDataOption = Try(store.job(jobId)).toOption --- End diff -- most places, you do an explicit try/catch, I assume because you only want to convert `NoSuchElementException` to `None`. Does that concern not apply here? also since you do that so much, maybe its worth a helper, so that you could use it like this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150564678 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -23,19 +23,21 @@ import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer} +import scala.util.Try --- End diff -- unused (and more imports here can be cleaned up) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19698#discussion_r150573812 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -285,106 +272,120 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def render(request: HttpServletRequest): Seq[Node] = { -val listener = parent.jobProgresslistener -listener.synchronized { - val startTime = listener.startTime - val endTime = listener.endTime - val activeJobs = listener.activeJobs.values.toSeq - val completedJobs = listener.completedJobs.reverse - val failedJobs = listener.failedJobs.reverse - - val activeJobsTable = -jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) - val completedJobsTable = -jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) - val failedJobsTable = -jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) - - val shouldShowActiveJobs = activeJobs.nonEmpty - val shouldShowCompletedJobs = completedJobs.nonEmpty - val shouldShowFailedJobs = failedJobs.nonEmpty - - val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) { -s"${completedJobs.size}" - } else { -s"${listener.numCompletedJobs}, only showing ${completedJobs.size}" +val appInfo = store.applicationInfo() +val startTime = appInfo.attempts.head.startTime.getTime() +val endTime = appInfo.attempts.head.endTime.getTime() + +val activeJobs = new ListBuffer[v1.JobData]() +val _completedJobs = new ListBuffer[v1.JobData]() +val _failedJobs = new ListBuffer[v1.JobData]() + +store.jobsList(null).foreach { job => + job.status match { +case JobExecutionStatus.SUCCEEDED => + _completedJobs += job +case JobExecutionStatus.FAILED => + _failedJobs += job +case _ => + activeJobs += job } +} - val summary: NodeSeq = - - - - User: - {parent.getSparkUser} - - - Total Uptime: - { -if (endTime < 0 && parent.sc.isDefined) { - UIUtils.formatDuration(System.currentTimeMillis() - startTime) -} else if (endTime > 0) { - UIUtils.formatDuration(endTime - startTime) -} - } - - - Scheduling Mode: - {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} - +val completedJobs = _completedJobs.toSeq.reverse +val failedJobs = _failedJobs.toSeq.reverse --- End diff -- actually is the `reverse` necessary at all? seems if you trace through, only goes to `JobsDataSource`, where its sorted anyway --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19698: [SPARK-20648][core] Port JobsTab and StageTab to ...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/19698 [SPARK-20648][core] Port JobsTab and StageTab to the new UI backend. This change is a little larger because there's a whole lot of logic behind these pages, all really tied to internal types and listeners, and some of that logic had to be implemented in the new listener and the needed data exposed through the API types. - Added missing StageData and ExecutorStageSummary fields which are used by the UI. Some json golden files needed to be updated to account for new fields. - Save RDD graph data in the store. This tries to re-use existing types as much as possible, so that the code doesn't need to be re-written. So it's probably not very optimal. - Some old classes (e.g. JobProgressListener) still remain, since they're used in other parts of the code; they're not used by the UI anymore, though, and will be cleaned up in a separate change. - Save information about active pools in the store. This data is not really used in the SHS, but it's not a lot of data so it's still recorded when replaying applications. - Because the new store sorts things slightly differently from the previous code, some json golden files had some elements within them shuffled around. - The retention unit test in UISeleniumSuite was disabled because the code to throw away old stages / tasks hasn't been added yet. - The job description field in the API tries to follow the old behavior, which makes it be empty most of the time, even though there's information to fill it in. For stages, a new field was added to hold the description (which is basically the job description), so that the UI can be rendered in the old way. - A new stage status ("SKIPPED") was added to account for the fact that the API couldn't represent that state before. Without this, the stage would show up as "PENDING" in the UI, which is now based on API types. - The API used to expose "executorRunTime" as the value of the task's duration, which wasn't really correct (also because that value was easily available from the metrics object); this change fixes that by storing the correct duration, which also means a few expectation files needed to be updated to account for the new durations and sorting differences due to the changed values. - Added changes to implement SPARK-20713 and SPARK-21922 in the new code. Tested with existing unit tests (and by using the UI a lot). You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-20648 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19698 commit a22c45889d8fc0982caf4325eb729048537872bb Author: Marcelo VanzinDate: 2017-01-31T21:31:55Z [SPARK-20648][core] Port JobsTab and StageTab to the new UI backend. This change is a little larger because there's a whole lot of logic behind these pages, all really tied to internal types and listeners, and some of that logic had to be implemented in the new listener and the needed data exposed through the API types. - Added missing StageData and ExecutorStageSummary fields which are used by the UI. Some json golden files needed to be updated to account for new fields. - Save RDD graph data in the store. This tries to re-use existing types as much as possible, so that the code doesn't need to be re-written. So it's probably not very optimal. - Some old classes (e.g. JobProgressListener) still remain, since they're used in other parts of the code; they're not used by the UI anymore, though, and will be cleaned up in a separate change. - Save information about active pools in the store. This data is not really used in the SHS, but it's not a lot of data so it's still recorded when replaying applications. - Because the new store sorts things slightly differently from the previous code, some json golden files had some elements within them shuffled around. - The retention unit test in UISeleniumSuite was disabled because the code to throw away old stages / tasks hasn't been added yet. - The job description field in the API tries to follow the old behavior, which makes it be empty most of the time, even though there's information to fill it in. For stages, a new field was added to hold the description (which is basically the job description), so that the UI can be rendered in the old