This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e27bef1 [ZEPPELIN-4475]. Spark job progress is not correct e27bef1 is described below commit e27bef1cc62dba01399a3d7c5c857af5914a56ea Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Dec 9 21:25:27 2019 +0800 [ZEPPELIN-4475]. Spark job progress is not correct ### What is this PR for? When calculating the job progress, we should only take care the first one match the jobGroup. Because each paragraph has one unique jobGroup, and one paragraph may run multiple times. So only look for the first job which match the jobGroup. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4475 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3548 from zjffdu/ZEPPELIN-4475 and squashes the following commits: 452b964a6 [Jeff Zhang] [ZEPPELIN-4475]. Spark job progress is not correct --- .../apache/zeppelin/spark/JobProgressUtil.scala | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala index 3b44c9a..79018c8 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala @@ -18,22 +18,32 @@ package org.apache.zeppelin.spark import org.apache.spark.SparkContext +import org.slf4j.{Logger, LoggerFactory} object JobProgressUtil { - def progress(sc: SparkContext, jobGroup : String):Int = { - val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup) - val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) } - val stages = jobs.flatMap { job => - job.stageIds().flatMap(sc.statusTracker.getStageInfo) - } + protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - val taskCount = stages.map(_.numTasks).sum - val completedTaskCount = stages.map(_.numCompletedTasks).sum - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt + def progress(sc: SparkContext, jobGroup : String):Int = { + // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. + // So only look for the first job which match the jobGroup + val jobInfo = sc.statusTracker + .getJobIdsForGroup(jobGroup) + .headOption + .flatMap(jobId => sc.statusTracker.getJobInfo(jobId)) + val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo))) + stagesInfoOption match { + case None => 0 + case Some(stagesInfo) => + val taskCount = stagesInfo.map(_.numTasks).sum + val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum + LOGGER.debug("Total TaskCount: " + taskCount) + LOGGER.debug("Completed TaskCount: " + completedTaskCount) + if (taskCount == 0) { + 0 + } else { + (100 * completedTaskCount.toDouble / taskCount).toInt + } } } }