FWD to dev mail list for helps

 

________________________________

From: Haopu Wang 
Sent: 2014年9月22日 16:35
To: u...@spark.apache.org
Subject: Spark SQL 1.1.0: NPE when join two cached table

 

I have two data sets and want to join them on each first field. Sample data are 
below:

 

data set 1:

  id2,name1,2,300.0

 

data set 2:

  id1,aaaaaaaaaaaa

 

The code is something like below:

 

    val sparkConf = new SparkConf().setAppName("JoinInScala")

    val sc = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)

    sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")

    import org.apache.spark.sql._   

    

    val testdata = sc.textFile(args(0) + "data.txt").map(_.split(","))

      .map(p => Row(p(0), p(1).trim, p(2).trim.toLong, p(3).trim.toDouble))

      

    val fields = new Array[StructField](4)

    fields(0) = StructField("id", StringType, false);

    fields(1) = StructField("name", StringType, false);

    fields(2) = StructField("agg1", LongType, false);

    fields(3) = StructField("agg2", DoubleType, false);    

    val schema = StructType(fields);

 

    val data = sqlContext.applySchema(testdata, schema)

    

    data.registerTempTable("datatable")

    sqlContext.cacheTable("datatable")

 

    val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(","))

      .map(p => Row(p(0), p(1).trim))

      

    val reffields = new Array[StructField](2)

    reffields(0) = StructField("id", StringType, false);

    reffields(1) = StructField("data", StringType, true);

    val refschema = StructType(reffields);

 

    val refschemardd = sqlContext.applySchema(refdata, refschema)

    refschemardd.registerTempTable("ref")

    sqlContext.cacheTable("ref")

    

   val results = sqlContext.sql("SELECT d.id,d.name,d.agg1,d.agg2,ref.data FROM 
datatable as d join ref on d.id=ref.id")

    results.foreach(T => Unit);

 

But I got below NullPointerException. If I comment out the two "cacheTable()" 
calls, the program run well. Please shed some lights, thank you!

 

Exception in thread "main" java.lang.NullPointerException

        at 
org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InMemoryColumnarTableScan.scala:43)

        at 
org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:42)

        at 
org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStrategies.scala:83)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)

        at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:268)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)

        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)

        at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)

        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)

        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)

        at scala.Option.getOrElse(Option.scala:120)

        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189)

        at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233)

        at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117)

        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

        at scala.Option.getOrElse(Option.scala:120)

        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)

        at org.apache.spark.rdd.RDD.foreach(RDD.scala:759)

        at Join$$anonfun$main$1.apply$mcVI$sp(Join.scala:44)

        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

        at Join$.main(Join.scala:42)

        at Join.main(Join.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 

 

 

 

 

Reply via email to