[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21214


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185842584
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

You'd probably need to use the object field for that; which would be fine 
since you'd just be using it, not trying to overwrite it at any point.

But the current code is probably ok too. It's unlikely it will actually 
finish before the other threads have had a chance to run. I just wish that was 
a little bit more explicit in the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185836245
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

How to overcome the first bullet point?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185834096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

You could use `latch.await(timeout)` instead of just `latch.await()`, and 
throw an exception if it times out. That would avoid the test blocking 
indefinitely, and would add an explicit wait instead of using a large count to 
emulate it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185792166
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

Yeah, after extensive testing and tryout I was thinking about this as well 
but hesitating to stop the executor thread any kind of way. The reasons behind:
- Executor code can be synchronized only with object variables, otherwise 
`NotSerializableException` comes. As a result of introducing object 
`CountDownLatch ` similar race will appear just like 
`DataFrameRangeSuite.stageToKill` was overwritten.
- `spark.range(0, 1000L, 1, 1)` takes ~1-2 minutes to get 
calculated which is quite a big time window not to be flaky this way.
- Minor concern(rather note) if no timeout syncronization way used the test 
code get stuck in `collect` and never reaches `waitUntilEmpty` and 
`eventually...`.

All in all not recommend to do it because it could end up in 5 hours 
timeout.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185687677
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185649136
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -180,6 +174,8 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 fail("Expected the cause to be SparkException, got " + 
cause.toString() + " instead.")
 }
   }
+  // Wait until all ListenerBus events consumed to make sure 
cancelStage called for all stage
--- End diff --

nit: all stages


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21214#discussion_r185650518
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
-}
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+  spark.range(0, 1000L, 1, 1)
--- End diff --

This is ok-ish but this kind of test is still racy. There's no guarantee 
the job won't finish before the events are posted to the bus, processed by the 
listener, and the stage is cancelled. The large count is just an attempt to 
make that less likely.

You could use a `CountDownLatch` for that - wait for it in the task (so 
that the task start event is fired), and signal it in the listener.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21214: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-05-02 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/21214

[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays 
sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a 
suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's 
thread after the reset which ended up in canceling the same stage 2 times. This 
caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared 
`DataFrameRangeSuite.stageToKill` and using `onTaskStart` where stage ID is 
provided. In order to make sure cancelStage called for all stages 
`waitUntilEmpty` is called on `ListenerBus`.

In [PR20888](https://github.com/apache/spark/pull/20888) this tried to get 
solved by:
* Stopping the executor thread with `wait`
* Wait for all `cancelStage` called
* Kill the executor thread by setting 
`SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL`

but this thread killing left the shared `SparkContext` sometimes in a state 
where further tasks can't be submitted. As a result 
DataFrameRangeSuite.test("Cancelling stage in a query with Range.") test passed 
properly but the next test inside the suite was hanging.

## How was this patch tested?

Existing unit test executed 10k times.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-23775_1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21214


commit 9781cbee95f338d5e1bcd61190c7a938155803bf
Author: Gabor Somogyi 
Date:   2018-05-02T09:23:38Z

[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org