This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc88d3df [SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules 
jobs which never complete
fc88d3df is described below

commit fc88d3df5c53e0ade04af8ad9006ed543950c467
Author: Ajith <ajith2...@gmail.com>
AuthorDate: Sun Mar 17 12:56:41 2019 -0500

    [SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules jobs which 
never complete
    
    ## What changes were proposed in this pull request?
    
    When Result stage has zero tasks, the Job End event is never fired, hence 
the Job is always running in UI. Example: sc.emptyRDD[Int].countApprox(1000) 
never finishes even it has no tasks to launch
    
    ## How was this patch tested?
    
    Added UT
    
    Closes #24100 from ajithme/emptyRDD.
    
    Authored-by: Ajith <ajith2...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala     | 11 +++++++++--
 .../org/apache/spark/scheduler/DAGSchedulerSuite.scala      | 13 +++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dd1b259..9177c1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -767,10 +767,17 @@ private[spark] class DAGScheduler(
       callSite: CallSite,
       timeout: Long,
       properties: Properties): PartialResult[R] = {
-    val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
-    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val partitions = (0 until rdd.partitions.length).toArray
     val jobId = nextJobId.getAndIncrement()
+    if (partitions.isEmpty) {
+      // Return immediately if the job is running 0 tasks
+      val time = clock.getTimeMillis()
+      listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), 
properties))
+      listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
+      return new PartialResult(evaluator.currentResult(), true)
+    }
+    val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
+    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     eventProcessLoop.post(JobSubmitted(
       jobId, rdd, func2, partitions, callSite, listener, 
SerializationUtils.clone(properties)))
     listener.awaitResult()    // Will throw an exception if the job fails
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e17d264..e74f462 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.annotation.meta.param
@@ -2849,6 +2850,18 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }
   }
 
+  test("SPARK-27164: RDD.countApprox on empty RDDs schedules jobs which never 
complete") {
+    val latch = new CountDownLatch(1)
+    val jobListener = new SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+        latch.countDown()
+      }
+    }
+    sc.addSparkListener(jobListener)
+    sc.emptyRDD[Int].countApprox(10000).getFinalValue()
+    assert(latch.await(10, TimeUnit.SECONDS))
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to