FWD to dev mail list for helps
________________________________ From: Haopu Wang Sent: 2014年9月22日 16:35 To: u...@spark.apache.org Subject: Spark SQL 1.1.0: NPE when join two cached table I have two data sets and want to join them on each first field. Sample data are below: data set 1: id2,name1,2,300.0 data set 2: id1,aaaaaaaaaaaa The code is something like below: val sparkConf = new SparkConf().setAppName("JoinInScala") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") import org.apache.spark.sql._ val testdata = sc.textFile(args(0) + "data.txt").map(_.split(",")) .map(p => Row(p(0), p(1).trim, p(2).trim.toLong, p(3).trim.toDouble)) val fields = new Array[StructField](4) fields(0) = StructField("id", StringType, false); fields(1) = StructField("name", StringType, false); fields(2) = StructField("agg1", LongType, false); fields(3) = StructField("agg2", DoubleType, false); val schema = StructType(fields); val data = sqlContext.applySchema(testdata, schema) data.registerTempTable("datatable") sqlContext.cacheTable("datatable") val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(",")) .map(p => Row(p(0), p(1).trim)) val reffields = new Array[StructField](2) reffields(0) = StructField("id", StringType, false); reffields(1) = StructField("data", StringType, true); val refschema = StructType(reffields); val refschemardd = sqlContext.applySchema(refdata, refschema) refschemardd.registerTempTable("ref") sqlContext.cacheTable("ref") val results = sqlContext.sql("SELECT d.id,d.name,d.agg1,d.agg2,ref.data FROM datatable as d join ref on d.id=ref.id") results.foreach(T => Unit); But I got below NullPointerException. If I comment out the two "cacheTable()" calls, the program run well. Please shed some lights, thank you! Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InMemoryColumnarTableScan.scala:43) at org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:42) at org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStrategies.scala:83) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:268) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135) at org.apache.spark.rdd.RDD.foreach(RDD.scala:759) at Join$$anonfun$main$1.apply$mcVI$sp(Join.scala:44) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at Join$.main(Join.scala:42) at Join.main(Join.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)