Sure thing.  Here you go:

== Logical Plan ==
Sort [key#0 ASC]
 Project [key#0,value#1,value#2]
  Join Inner, Some((key#0 = key#3))
   SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at
mapPartitions at basicOperators.scala:176)
   SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at
mapPartitions at basicOperators.scala:176)

== Optimized Logical Plan ==
Sort [key#0 ASC]
 Project [key#0,value#1,value#2]
  Join Inner, Some((key#0 = key#3))
   SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at
mapPartitions at basicOperators.scala:176)
   SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at
mapPartitions at basicOperators.scala:176)

== Physical Plan ==
Sort [key#0:0 ASC], true
 Exchange (RangePartitioning [key#0 ASC], 200)
  Project [key#0:0,value#1:1,value#2:2]
   HashJoin [key#0], [key#3], BuildRight
    Exchange (HashPartitioning [key#0], 200)
     ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at
basicOperators.scala:176
    Exchange (HashPartitioning [key#3], 200)
     ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at
basicOperators.scala:176


Also, in case it's helpful, here's the entire stack trace:

java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.catalyst.types.IntegerType$
at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1730)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:225)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:222)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:588)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1516)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)


On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Can you print out the queryExecution?
>
> (i.e. println(sql(....).queryExecution))
>
>
> On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons <keith.simm...@gmail.com>
> wrote:
>
>> To give a few more details of my environment in case that helps you
>> reproduce:
>>
>> * I'm running spark 1.0.1 downloaded as a tar ball, not built myself
>> * I'm running in stand alone mode, with 1 master and 1 worker, both on
>> the same machine (though the same error occurs with two workers on two
>> machines)
>> * I'm using spark-core and spark-sql 1.0.1 pulled via maven
>>
>> Here's my built.sbt:
>>
>> name := "spark-test"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>>
>> resolvers += "Cloudera Repository" at "
>> https://repository.cloudera.com/artifactory/cloudera-repos/";
>>
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.1" %
>> "provided"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" %
>> "provided"
>>
>>
>> On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang <zonghen...@gmail.com>
>> wrote:
>>
>>> FWIW, I am unable to reproduce this using the example program locally.
>>>
>>> On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons <keith.simm...@gmail.com>
>>> wrote:
>>> > Nope.  All of them are registered from the driver program.
>>> >
>>> > However, I think we've found the culprit.  If the join column between
>>> two
>>> > tables is not in the same column position in both tables, it triggers
>>> what
>>> > appears to be a bug.  For example, this program fails:
>>> >
>>> > import org.apache.spark.SparkContext._
>>> > import org.apache.spark.SparkContext
>>> > import org.apache.spark.SparkConf
>>> > import org.apache.spark.sql.SQLContext
>>> > import org.apache.spark.sql.SchemaRDD
>>> > import org.apache.spark.sql.catalyst.types._
>>> >
>>> > case class Record(value: String, key: Int)
>>> > case class Record2(key: Int, value: String)
>>> >
>>> > object TestJob {
>>> >
>>> >   def main(args: Array[String]) {
>>> >     run()
>>> >   }
>>> >
>>> >   private def run() {
>>> >     val sparkConf = new SparkConf()
>>> >     sparkConf.setAppName("TestJob")
>>> >     sparkConf.set("spark.cores.max", "8")
>>> >     sparkConf.set("spark.storage.memoryFraction", "0.1")
>>> >     sparkConf.set("spark.shuffle.memoryFracton", "0.2")
>>> >     sparkConf.set("spark.executor.memory", "2g")
>>> >
>>> sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar"))
>>> >     sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077")
>>> >     sparkConf.setSparkHome("/home/pulseio/spark/current")
>>> >     val sc = new SparkContext(sparkConf)
>>> >
>>> >     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>> >     import sqlContext._
>>> >
>>> >     val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i",
>>> i)))
>>> >     val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i,
>>> s"rdd_$i")))
>>> >     rdd1.registerAsTable("rdd1")
>>> >     rdd2.registerAsTable("rdd2")
>>> >
>>> >     sql("SELECT * FROM rdd1").collect.foreach { row => println(row) }
>>> >
>>> >     sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
>>> > rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row =>
>>> > println(row) }
>>> >
>>> >     sc.stop()
>>> >   }
>>> >
>>> > }
>>> >
>>> > If you change the definition of Record and Record2 to the following, it
>>> > succeeds:
>>> >
>>> > case class Record(key: Int, value: String)
>>> > case class Record2(key: Int, value: String)
>>> >
>>> > as does:
>>> >
>>> > case class Record(value: String, key: Int)
>>> > case class Record2(value: String, key: Int)
>>> >
>>> > Let me know if you need anymore details.
>>> >
>>> >
>>> > On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust <
>>> mich...@databricks.com>
>>> > wrote:
>>> >>
>>> >> Are you registering multiple RDDs of case classes as tables
>>> concurrently?
>>> >> You are possibly hitting SPARK-2178 which is caused by SI-6240.
>>> >>
>>> >>
>>> >> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons <
>>> keith.simm...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> HI folks,
>>> >>>
>>> >>> I'm running into the following error when trying to perform a join
>>> in my
>>> >>> code:
>>> >>>
>>> >>> java.lang.NoClassDefFoundError: Could not initialize class
>>> >>> org.apache.spark.sql.catalyst.types.LongType$
>>> >>>
>>> >>> I see similar errors for StringType$ and also:
>>> >>>
>>> >>>  scala.reflect.runtime.ReflectError: value apache is not a package.
>>> >>>
>>> >>> Strangely, if I just work with a single table, everything is fine. I
>>> can
>>> >>> iterate through the records in both tables and print them out
>>> without a
>>> >>> problem.
>>> >>>
>>> >>> Furthermore, this code worked without an exception in Spark 1.0.0
>>> >>> (thought the join caused some field corruption, possibly related to
>>> >>> https://issues.apache.org/jira/browse/SPARK-1994).  The data is
>>> coming from
>>> >>> a custom protocol buffer based format on hdfs that is being mapped
>>> into the
>>> >>> individual record types without a problem.
>>> >>>
>>> >>> The immediate cause seems to be a task trying to deserialize one or
>>> more
>>> >>> SQL case classes before loading the spark uber jar, but I have no
>>> idea why
>>> >>> this is happening, or why it only happens when I do a join.  Ideas?
>>> >>>
>>> >>> Keith
>>> >>>
>>> >>> P.S. If it's relevant, we're using the Kryo serializer.
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>

Reply via email to