Sure thing. Here you go: == Logical Plan == Sort [key#0 ASC] Project [key#0,value#1,value#2] Join Inner, Some((key#0 = key#3)) SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176) SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176)
== Optimized Logical Plan == Sort [key#0 ASC] Project [key#0,value#1,value#2] Join Inner, Some((key#0 = key#3)) SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176) SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176) == Physical Plan == Sort [key#0:0 ASC], true Exchange (RangePartitioning [key#0 ASC], 200) Project [key#0:0,value#1:1,value#2:2] HashJoin [key#0], [key#3], BuildRight Exchange (HashPartitioning [key#0], 200) ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176 Exchange (HashPartitioning [key#3], 200) ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176 Also, in case it's helpful, here's the entire stack trace: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.IntegerType$ at java.io.ObjectStreamClass.hasStaticInitializer(Native Method) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1730) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:225) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:222) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:588) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1516) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust <mich...@databricks.com> wrote: > Can you print out the queryExecution? > > (i.e. println(sql(....).queryExecution)) > > > On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons <keith.simm...@gmail.com> > wrote: > >> To give a few more details of my environment in case that helps you >> reproduce: >> >> * I'm running spark 1.0.1 downloaded as a tar ball, not built myself >> * I'm running in stand alone mode, with 1 master and 1 worker, both on >> the same machine (though the same error occurs with two workers on two >> machines) >> * I'm using spark-core and spark-sql 1.0.1 pulled via maven >> >> Here's my built.sbt: >> >> name := "spark-test" >> >> version := "1.0" >> >> scalaVersion := "2.10.4" >> >> resolvers += "Akka Repository" at "http://repo.akka.io/releases/" >> >> resolvers += "Cloudera Repository" at " >> https://repository.cloudera.com/artifactory/cloudera-repos/" >> >> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.1" % >> "provided" >> >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" % >> "provided" >> >> >> On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang <zonghen...@gmail.com> >> wrote: >> >>> FWIW, I am unable to reproduce this using the example program locally. >>> >>> On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons <keith.simm...@gmail.com> >>> wrote: >>> > Nope. All of them are registered from the driver program. >>> > >>> > However, I think we've found the culprit. If the join column between >>> two >>> > tables is not in the same column position in both tables, it triggers >>> what >>> > appears to be a bug. For example, this program fails: >>> > >>> > import org.apache.spark.SparkContext._ >>> > import org.apache.spark.SparkContext >>> > import org.apache.spark.SparkConf >>> > import org.apache.spark.sql.SQLContext >>> > import org.apache.spark.sql.SchemaRDD >>> > import org.apache.spark.sql.catalyst.types._ >>> > >>> > case class Record(value: String, key: Int) >>> > case class Record2(key: Int, value: String) >>> > >>> > object TestJob { >>> > >>> > def main(args: Array[String]) { >>> > run() >>> > } >>> > >>> > private def run() { >>> > val sparkConf = new SparkConf() >>> > sparkConf.setAppName("TestJob") >>> > sparkConf.set("spark.cores.max", "8") >>> > sparkConf.set("spark.storage.memoryFraction", "0.1") >>> > sparkConf.set("spark.shuffle.memoryFracton", "0.2") >>> > sparkConf.set("spark.executor.memory", "2g") >>> > >>> sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar")) >>> > sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077") >>> > sparkConf.setSparkHome("/home/pulseio/spark/current") >>> > val sc = new SparkContext(sparkConf) >>> > >>> > val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>> > import sqlContext._ >>> > >>> > val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", >>> i))) >>> > val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, >>> s"rdd_$i"))) >>> > rdd1.registerAsTable("rdd1") >>> > rdd2.registerAsTable("rdd2") >>> > >>> > sql("SELECT * FROM rdd1").collect.foreach { row => println(row) } >>> > >>> > sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on >>> > rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row => >>> > println(row) } >>> > >>> > sc.stop() >>> > } >>> > >>> > } >>> > >>> > If you change the definition of Record and Record2 to the following, it >>> > succeeds: >>> > >>> > case class Record(key: Int, value: String) >>> > case class Record2(key: Int, value: String) >>> > >>> > as does: >>> > >>> > case class Record(value: String, key: Int) >>> > case class Record2(value: String, key: Int) >>> > >>> > Let me know if you need anymore details. >>> > >>> > >>> > On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust < >>> mich...@databricks.com> >>> > wrote: >>> >> >>> >> Are you registering multiple RDDs of case classes as tables >>> concurrently? >>> >> You are possibly hitting SPARK-2178 which is caused by SI-6240. >>> >> >>> >> >>> >> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons < >>> keith.simm...@gmail.com> >>> >> wrote: >>> >>> >>> >>> HI folks, >>> >>> >>> >>> I'm running into the following error when trying to perform a join >>> in my >>> >>> code: >>> >>> >>> >>> java.lang.NoClassDefFoundError: Could not initialize class >>> >>> org.apache.spark.sql.catalyst.types.LongType$ >>> >>> >>> >>> I see similar errors for StringType$ and also: >>> >>> >>> >>> scala.reflect.runtime.ReflectError: value apache is not a package. >>> >>> >>> >>> Strangely, if I just work with a single table, everything is fine. I >>> can >>> >>> iterate through the records in both tables and print them out >>> without a >>> >>> problem. >>> >>> >>> >>> Furthermore, this code worked without an exception in Spark 1.0.0 >>> >>> (thought the join caused some field corruption, possibly related to >>> >>> https://issues.apache.org/jira/browse/SPARK-1994). The data is >>> coming from >>> >>> a custom protocol buffer based format on hdfs that is being mapped >>> into the >>> >>> individual record types without a problem. >>> >>> >>> >>> The immediate cause seems to be a task trying to deserialize one or >>> more >>> >>> SQL case classes before loading the spark uber jar, but I have no >>> idea why >>> >>> this is happening, or why it only happens when I do a join. Ideas? >>> >>> >>> >>> Keith >>> >>> >>> >>> P.S. If it's relevant, we're using the Kryo serializer. >>> >>> >>> >>> >>> >> >>> > >>> >> >> >