[ 
https://issues.apache.org/jira/browse/SPARK-19462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ian updated SPARK-19462:
------------------------
    Description: 
property spark.sql.adaptive.enabled needs to be set "true" for the issue to be 
reproduced.

reproducible steps using spark-shell
0. we use yarn as cluster manager, spark-shell runs in client mode 
1. launch spark-shell
2. 
{code}
val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
df1.registerTempTable("test")

val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
data1.collect

val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY 
number")
data2.collect

// everything is fine up to this point
// manually kill both the AM and all the NMs of the spark-shell app

// re-run data1.collect, the result is returned successfully
data1.collect

// but data2.collect will fail
data2.collect

// stacktrace
Caused by: java.lang.RuntimeException: Exchange not implemented for 
UnknownPartitioning(1)
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
  at 
org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
  at 
org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

{code}

The difference between data1 and data2 is whether ShuffledRowRDD is present in 
lineage.
When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can 
be observed when node failures or container loss happens.
{code}
scala> data2.rdd.toDebugString
res6: String =
(1) MapPartitionsRDD[20] at rdd at <console>:26 []
 |  MapPartitionsRDD[19] at rdd at <console>:26 []
 |  ShuffledRowRDD[8] at collect at <console>:26 []
 +-(2) MapPartitionsRDD[7] at collect at <console>:26 []
    |  MapPartitionsRDD[6] at collect at <console>:26 []
    |  MapPartitionsRDD[5] at collect at <console>:26 []
    |  MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:25 []

{code}



  was:
property spark.sql.adaptive.enabled needs to be set "true"

reproducible steps using spark-shell
0. we use yarn as cluster manager, spark-shell runs in client mode 
1. launch spark-shell
2. 
{code}
val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
df1.registerTempTable("test")

val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
data1.collect

val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY 
number")
data2.collect

// everything is fine up to this point
// manually kill both the AM and all the NMs of the spark-shell app

// re-run data1.collect, the result is returned successfully
data1.collect

// but data2.collect will fail
data2.collect

// stacktrace
Caused by: java.lang.RuntimeException: Exchange not implemented for 
UnknownPartitioning(1)
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
  at 
org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
  at 
org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

{code}

The difference between data1 and data2 is whether ShuffledRowRDD is present in 
lineage.
When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can 
be observed when node failures or container loss happens.
{code}
scala> data2.rdd.toDebugString
res6: String =
(1) MapPartitionsRDD[20] at rdd at <console>:26 []
 |  MapPartitionsRDD[19] at rdd at <console>:26 []
 |  ShuffledRowRDD[8] at collect at <console>:26 []
 +-(2) MapPartitionsRDD[7] at collect at <console>:26 []
    |  MapPartitionsRDD[6] at collect at <console>:26 []
    |  MapPartitionsRDD[5] at collect at <console>:26 []
    |  MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:25 []

{code}




> when spark.sql.adaptive.enabled is enabled, DF is not resilient to 
> node/container failure
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-19462
>                 URL: https://issues.apache.org/jira/browse/SPARK-19462
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.3
>            Reporter: Ian
>
> property spark.sql.adaptive.enabled needs to be set "true" for the issue to 
> be reproduced.
> reproducible steps using spark-shell
> 0. we use yarn as cluster manager, spark-shell runs in client mode 
> 1. launch spark-shell
> 2. 
> {code}
> val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
> df1.registerTempTable("test")
> val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
> data1.collect
> val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY 
> number")
> data2.collect
> // everything is fine up to this point
> // manually kill both the AM and all the NMs of the spark-shell app
> // re-run data1.collect, the result is returned successfully
> data1.collect
> // but data2.collect will fail
> data2.collect
> // stacktrace
> Caused by: java.lang.RuntimeException: Exchange not implemented for 
> UnknownPartitioning(1)
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
>   at 
> org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
>   at 
> org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The difference between data1 and data2 is whether ShuffledRowRDD is present 
> in lineage.
> When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior 
> can be observed when node failures or container loss happens.
> {code}
> scala> data2.rdd.toDebugString
> res6: String =
> (1) MapPartitionsRDD[20] at rdd at <console>:26 []
>  |  MapPartitionsRDD[19] at rdd at <console>:26 []
>  |  ShuffledRowRDD[8] at collect at <console>:26 []
>  +-(2) MapPartitionsRDD[7] at collect at <console>:26 []
>     |  MapPartitionsRDD[6] at collect at <console>:26 []
>     |  MapPartitionsRDD[5] at collect at <console>:26 []
>     |  MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 []
>     |  ParallelCollectionRDD[0] at parallelize at <console>:25 []
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to