[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21214 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185842584 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- You'd probably need to use the object field for that; which would be fine since you'd just be using it, not trying to overwrite it at any point. But the current code is probably ok too. It's unlikely it will actually finish before the other threads have had a chance to run. I just wish that was a little bit more explicit in the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185836245 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- How to overcome the first bullet point? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185834096 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- You could use `latch.await(timeout)` instead of just `latch.await()`, and throw an exception if it times out. That would avoid the test blocking indefinitely, and would add an explicit wait instead of using a large count to emulate it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185792166 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- Yeah, after extensive testing and tryout I was thinking about this as well but hesitating to stop the executor thread any kind of way. The reasons behind: - Executor code can be synchronized only with object variables, otherwise `NotSerializableException` comes. As a result of introducing object `CountDownLatch ` similar race will appear just like `DataFrameRangeSuite.stageToKill` was overwritten. - `spark.range(0, 1000L, 1, 1)` takes ~1-2 minutes to get calculated which is quite a big time window not to be flaky this way. - Minor concern(rather note) if no timeout syncronization way used the test code get stuck in `collect` and never reaches `waitUntilEmpty` and `eventually...`. All in all not recommend to do it because it could end up in 5 hours timeout. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185687677 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185649136 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -180,6 +174,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") } } + // Wait until all ListenerBus events consumed to make sure cancelStage called for all stage --- End diff -- nit: all stages --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21214#discussion_r185650518 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) --- End diff -- This is ok-ish but this kind of test is still racy. There's no guarantee the job won't finish before the events are posted to the bus, processed by the listener, and the stage is cancelled. The large count is just an attempt to make that less likely. You could use a `CountDownLatch` for that - wait for it in the task (so that the task start event is fired), and signal it in the listener. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/21214 [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `onTaskStart` where stage ID is provided. In order to make sure cancelStage called for all stages `waitUntilEmpty` is called on `ListenerBus`. In [PR20888](https://github.com/apache/spark/pull/20888) this tried to get solved by: * Stopping the executor thread with `wait` * Wait for all `cancelStage` called * Kill the executor thread by setting `SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL` but this thread killing left the shared `SparkContext` sometimes in a state where further tasks can't be submitted. As a result DataFrameRangeSuite.test("Cancelling stage in a query with Range.") test passed properly but the next test inside the suite was hanging. ## How was this patch tested? Existing unit test executed 10k times. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-23775_1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21214 commit 9781cbee95f338d5e1bcd61190c7a938155803bf Author: Gabor Somogyi Date: 2018-05-02T09:23:38Z [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org