Nope.  All of them are registered from the driver program.

However, I think we've found the culprit.  If the join column between two
tables is not in the same column position in both tables, it triggers what
appears to be a bug.  For example, this program fails:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.types._

case class Record(value: String, key: Int)
case class Record2(key: Int, value: String)

object TestJob {

  def main(args: Array[String]) {
    run()
  }

  private def run() {
    val sparkConf = new SparkConf()
    sparkConf.setAppName("TestJob")
    sparkConf.set("spark.cores.max", "8")
    sparkConf.set("spark.storage.memoryFraction", "0.1")
    sparkConf.set("spark.shuffle.memoryFracton", "0.2")
    sparkConf.set("spark.executor.memory", "2g")
    sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar"))
    sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077")
    sparkConf.setSparkHome("/home/pulseio/spark/current")
    val sc = new SparkContext(sparkConf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext._

    val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", i)))
    val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, s"rdd_$i")))
    rdd1.registerAsTable("rdd1")
    rdd2.registerAsTable("rdd2")

    sql("SELECT * FROM rdd1").collect.foreach { row => println(row) }

    sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row =>
println(row) }

    sc.stop()
  }

}

If you change the definition of Record and Record2 to the following, it
succeeds:

case class Record(key: Int, value: String)
case class Record2(key: Int, value: String)

as does:

case class Record(value: String, key: Int)
case class Record2(value: String, key: Int)

Let me know if you need anymore details.


On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Are you registering multiple RDDs of case classes as tables concurrently?
>  You are possibly hitting SPARK-2178
> <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
> SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.
>
>
> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons <keith.simm...@gmail.com>
> wrote:
>
>> HI folks,
>>
>> I'm running into the following error when trying to perform a join in my
>> code:
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.spark.sql.catalyst.types.LongType$
>>
>> I see similar errors for StringType$ and also:
>>
>>  scala.reflect.runtime.ReflectError: value apache is not a package.
>>
>> Strangely, if I just work with a single table, everything is fine. I can
>> iterate through the records in both tables and print them out without a
>> problem.
>>
>> Furthermore, this code worked without an exception in Spark 1.0.0
>> (thought the join caused some field corruption, possibly related to
>> https://issues.apache.org/jira/browse/SPARK-1994
>> <https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994&sa=D&sntz=1&usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q>).
>>  The data is coming from a custom protocol buffer based format on hdfs that
>> is being mapped into the individual record types without a problem.
>>
>> The immediate cause seems to be a task trying to deserialize one or more
>> SQL case classes before loading the spark uber jar, but I have no idea why
>> this is happening, or why it only happens when I do a join.  Ideas?
>>
>> Keith
>>
>> P.S. If it's relevant, we're using the Kryo serializer.
>>
>>
>>
>

Reply via email to