[ https://issues.apache.org/jira/browse/SPARK-19462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ian updated SPARK-19462: ------------------------ Description: property spark.sql.adaptive.enabled needs to be set "true" for the issue to be reproduced. reproducible steps using spark-shell 0. we use yarn as cluster manager, spark-shell runs in client mode 1. launch spark-shell 2. {code} val df1 = sc.parallelize( 1 to 1000, 2).toDF("number") df1.registerTempTable("test") val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50") data1.collect val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY number") data2.collect // everything is fine up to this point // manually kill both the AM and all the NMs of the spark-shell app // re-run data1.collect, the result is returned successfully data1.collect // but data2.collect will fail data2.collect // stacktrace Caused by: java.lang.RuntimeException: Exchange not implemented for UnknownPartitioning(1) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198) at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208) at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} The difference between data1 and data2 is whether ShuffledRowRDD is present in lineage. When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can be observed when node failures or container loss happens. {code} scala> data2.rdd.toDebugString res6: String = (1) MapPartitionsRDD[20] at rdd at <console>:26 [] | MapPartitionsRDD[19] at rdd at <console>:26 [] | ShuffledRowRDD[8] at collect at <console>:26 [] +-(2) MapPartitionsRDD[7] at collect at <console>:26 [] | MapPartitionsRDD[6] at collect at <console>:26 [] | MapPartitionsRDD[5] at collect at <console>:26 [] | MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 [] | ParallelCollectionRDD[0] at parallelize at <console>:25 [] {code} was: property spark.sql.adaptive.enabled needs to be set "true" reproducible steps using spark-shell 0. we use yarn as cluster manager, spark-shell runs in client mode 1. launch spark-shell 2. {code} val df1 = sc.parallelize( 1 to 1000, 2).toDF("number") df1.registerTempTable("test") val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50") data1.collect val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY number") data2.collect // everything is fine up to this point // manually kill both the AM and all the NMs of the spark-shell app // re-run data1.collect, the result is returned successfully data1.collect // but data2.collect will fail data2.collect // stacktrace Caused by: java.lang.RuntimeException: Exchange not implemented for UnknownPartitioning(1) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198) at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208) at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} The difference between data1 and data2 is whether ShuffledRowRDD is present in lineage. When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can be observed when node failures or container loss happens. {code} scala> data2.rdd.toDebugString res6: String = (1) MapPartitionsRDD[20] at rdd at <console>:26 [] | MapPartitionsRDD[19] at rdd at <console>:26 [] | ShuffledRowRDD[8] at collect at <console>:26 [] +-(2) MapPartitionsRDD[7] at collect at <console>:26 [] | MapPartitionsRDD[6] at collect at <console>:26 [] | MapPartitionsRDD[5] at collect at <console>:26 [] | MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 [] | ParallelCollectionRDD[0] at parallelize at <console>:25 [] {code} > when spark.sql.adaptive.enabled is enabled, DF is not resilient to > node/container failure > ----------------------------------------------------------------------------------------- > > Key: SPARK-19462 > URL: https://issues.apache.org/jira/browse/SPARK-19462 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.3 > Reporter: Ian > > property spark.sql.adaptive.enabled needs to be set "true" for the issue to > be reproduced. > reproducible steps using spark-shell > 0. we use yarn as cluster manager, spark-shell runs in client mode > 1. launch spark-shell > 2. > {code} > val df1 = sc.parallelize( 1 to 1000, 2).toDF("number") > df1.registerTempTable("test") > val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50") > data1.collect > val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY > number") > data2.collect > // everything is fine up to this point > // manually kill both the AM and all the NMs of the spark-shell app > // re-run data1.collect, the result is returned successfully > data1.collect > // but data2.collect will fail > data2.collect > // stacktrace > Caused by: java.lang.RuntimeException: Exchange not implemented for > UnknownPartitioning(1) > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198) > at > org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208) > at > org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > The difference between data1 and data2 is whether ShuffledRowRDD is present > in lineage. > When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior > can be observed when node failures or container loss happens. > {code} > scala> data2.rdd.toDebugString > res6: String = > (1) MapPartitionsRDD[20] at rdd at <console>:26 [] > | MapPartitionsRDD[19] at rdd at <console>:26 [] > | ShuffledRowRDD[8] at collect at <console>:26 [] > +-(2) MapPartitionsRDD[7] at collect at <console>:26 [] > | MapPartitionsRDD[6] at collect at <console>:26 [] > | MapPartitionsRDD[5] at collect at <console>:26 [] > | MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 [] > | ParallelCollectionRDD[0] at parallelize at <console>:25 [] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org