[SparkSQL ] What is Exchange in physical plan for ?

2015-06-08 Thread invkrh
Hi,

DataFrame.explain() shows the physical plan of a query. I noticed there are
a lot of `Exchange`s in it, like below:

Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
 Exchange (HashPartitioning [refCategoryID#3], 12)
  Project [categoryName#0,refCategoryID#3]
   PhysicalRDD
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3],
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
 Exchange (HashPartitioning [category#17], 12)
  Project [timestamp_sec#13L AS
period#20L,category#17,region#18,action#15,list_id#16L]
   PhysicalRDD
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
PhysicalRDD
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439

I find also its class:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala.

So what does it mean ? 

Thank you.

Hao.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-What-is-Exchange-in-physical-plan-for-tp12659.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



scala.MatchError on SparkSQL when creating ArrayType of StructType

2014-12-04 Thread invkrh
Hi,

I am using SparkSQL on 1.1.0 branch.

The following code leads to a scala.MatchError 
at
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247)

val scm = StructType(*inputRDD*.schema.fields.init :+
  StructField(list,
ArrayType(
  StructType(
Seq(StructField(*date*, StringType, nullable = *false*),
  StructField(*nbPurchase*, IntegerType, nullable =
*false*,
nullable = false))

// *purchaseRDD* is RDD[sql.ROW] whose schema is corresponding to scm. It is
transformed from *inputRDD*
val schemaRDD = hiveContext.applySchema(purchaseRDD, scm)
schemaRDD.registerTempTable(t_purchase)

Here's the stackTrace:
scala.MatchError: ArrayType(StructType(List(StructField(date,StringType,
*true* ), StructField(n_reachat,IntegerType, *true* ))),true) (of class
org.apache.spark.sql.catalyst.types.ArrayType)
at
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247)
at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

The strange thing is that *nullable* of *date* and *nbPurchase* field are
set to true while it were false in the code. If I set both to *true*, it
works. But, in fact, they should not be nullable.

Here's what I find at Cast.scala:247 on 1.1.0 branch

  private[this] lazy val cast: Any = Any = dataType match {
case StringType = castToString
case BinaryType = castToBinary
case DecimalType = castToDecimal
case TimestampType = castToTimestamp
case BooleanType = castToBoolean
case ByteType = castToByte
case ShortType = castToShort
case IntegerType = castToInt
case FloatType = castToFloat
case LongType = castToLong
case DoubleType = castToDouble
  }

Any idea? Thank you.

Hao



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp9623.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org