Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21214#discussion_r185834096
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
    @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
     
       test("Cancelling stage in a query with Range.") {
         val listener = new SparkListener {
    -      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    -        eventually(timeout(10.seconds), interval(1.millis)) {
    -          assert(DataFrameRangeSuite.stageToKill > 0)
    -        }
    -        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
    +      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    +        sparkContext.cancelStage(taskStart.stageId)
           }
         }
     
         sparkContext.addSparkListener(listener)
         for (codegen <- Seq(true, false)) {
           withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    -        DataFrameRangeSuite.stageToKill = -1
             val ex = intercept[SparkException] {
    -          spark.range(0, 100000000000L, 1, 1).map { x =>
    -            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
    -            x
    -          }.toDF("id").agg(sum("id")).collect()
    +          spark.range(0, 100000000000L, 1, 1)
    --- End diff --
    
    You could use `latch.await(timeout)` instead of just `latch.await()`, and 
throw an exception if it times out. That would avoid the test blocking 
indefinitely, and would add an explicit wait instead of using a large count to 
emulate it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to