[SparkSQL ] What is Exchange in physical plan for ?
Hi, DataFrame.explain() shows the physical plan of a query. I noticed there are a lot of `Exchange`s in it, like below: Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L] ShuffledHashJoin [region#18], [regionCode#9], BuildRight Exchange (HashPartitioning [region#18], 12) Project [categoryName#0,list_id#16L,period#20L,action#15,region#18] ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight Exchange (HashPartitioning [refCategoryID#3], 12) Project [categoryName#0,refCategoryID#3] PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439 Exchange (HashPartitioning [category#17], 12) Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L] PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19], MapPartitionsRDD[16] at map at SQLContext.scala:394 Exchange (HashPartitioning [regionCode#9], 12) Project [regionName#10,regionCode#9] PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439 I find also its class: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala. So what does it mean ? Thank you. Hao. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-What-is-Exchange-in-physical-plan-for-tp12659.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
scala.MatchError on SparkSQL when creating ArrayType of StructType
Hi, I am using SparkSQL on 1.1.0 branch. The following code leads to a scala.MatchError at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) val scm = StructType(*inputRDD*.schema.fields.init :+ StructField(list, ArrayType( StructType( Seq(StructField(*date*, StringType, nullable = *false*), StructField(*nbPurchase*, IntegerType, nullable = *false*, nullable = false)) // *purchaseRDD* is RDD[sql.ROW] whose schema is corresponding to scm. It is transformed from *inputRDD* val schemaRDD = hiveContext.applySchema(purchaseRDD, scm) schemaRDD.registerTempTable(t_purchase) Here's the stackTrace: scala.MatchError: ArrayType(StructType(List(StructField(date,StringType, *true* ), StructField(n_reachat,IntegerType, *true* ))),true) (of class org.apache.spark.sql.catalyst.types.ArrayType) at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The strange thing is that *nullable* of *date* and *nbPurchase* field are set to true while it were false in the code. If I set both to *true*, it works. But, in fact, they should not be nullable. Here's what I find at Cast.scala:247 on 1.1.0 branch private[this] lazy val cast: Any = Any = dataType match { case StringType = castToString case BinaryType = castToBinary case DecimalType = castToDecimal case TimestampType = castToTimestamp case BooleanType = castToBoolean case ByteType = castToByte case ShortType = castToShort case IntegerType = castToInt case FloatType = castToFloat case LongType = castToLong case DoubleType = castToDouble } Any idea? Thank you. Hao -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp9623.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org