This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fc88d3df [SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules jobs which never complete fc88d3df is described below commit fc88d3df5c53e0ade04af8ad9006ed543950c467 Author: Ajith <ajith2...@gmail.com> AuthorDate: Sun Mar 17 12:56:41 2019 -0500 [SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules jobs which never complete ## What changes were proposed in this pull request? When Result stage has zero tasks, the Job End event is never fired, hence the Job is always running in UI. Example: sc.emptyRDD[Int].countApprox(1000) never finishes even it has no tasks to launch ## How was this patch tested? Added UT Closes #24100 from ajithme/emptyRDD. Authored-by: Ajith <ajith2...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 11 +++++++++-- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 13 +++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd1b259..9177c1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -767,10 +767,17 @@ private[spark] class DAGScheduler( callSite: CallSite, timeout: Long, properties: Properties): PartialResult[R] = { - val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.length).toArray val jobId = nextJobId.getAndIncrement() + if (partitions.isEmpty) { + // Return immediately if the job is running 0 tasks + val time = clock.getTimeMillis() + listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties)) + listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) + return new PartialResult(evaluator.currentResult(), true) + } + val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) + val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties))) listener.awaitResult() // Will throw an exception if the job fails diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e17d264..e74f462 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.annotation.meta.param @@ -2849,6 +2850,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + test("SPARK-27164: RDD.countApprox on empty RDDs schedules jobs which never complete") { + val latch = new CountDownLatch(1) + val jobListener = new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + latch.countDown() + } + } + sc.addSparkListener(jobListener) + sc.emptyRDD[Int].countApprox(10000).getFinalValue() + assert(latch.await(10, TimeUnit.SECONDS)) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org