Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20888#discussion_r181756636
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
    @@ -156,43 +156,52 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
       test("Cancelling stage in a query with Range.") {
         val slices = 10
     
    -    val listener = new SparkListener {
    -      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    -        eventually(timeout(10.seconds)) {
    -          assert(DataFrameRangeSuite.isTaskStarted)
    +    // Save and restore the value because SparkContext is shared
    +    val savedInterruptOnCancel = sparkContext
    +      .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
    +
    +    try {
    +      
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"true")
    +
    +      for (codegen <- Seq(true, false)) {
    +        val latch = new CountDownLatch(2)
    +
    +        val listener = new SparkListener {
    +          override def onTaskStart(taskStart: SparkListenerTaskStart): 
Unit = {
    +            sparkContext.cancelStage(taskStart.stageId)
    +            latch.countDown()
    +          }
             }
    -        sparkContext.cancelStage(taskStart.stageId)
    -        DataFrameRangeSuite.semaphore.release(slices)
    -      }
    -    }
     
    -    sparkContext.addSparkListener(listener)
    -    for (codegen <- Seq(true, false)) {
    -      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    -        DataFrameRangeSuite.semaphore.drainPermits()
    -        DataFrameRangeSuite.isTaskStarted = false
    -        val ex = intercept[SparkException] {
    -          sparkContext.range(0, 10000L, numSlices = slices).mapPartitions 
{ x =>
    -            DataFrameRangeSuite.isTaskStarted = true
    -            // Block waiting for the listener to cancel the stage.
    -            DataFrameRangeSuite.semaphore.acquire()
    -            x
    -          }.toDF("id").agg(sum("id")).collect()
    +        sparkContext.addSparkListener(listener)
    +        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    +          val ex = intercept[SparkException] {
    +            sparkContext.range(0, 10000L, numSlices = 
slices).mapPartitions { x =>
    --- End diff --
    
    Why do we need to specify the `numSlices`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to