RE: Should I avoid "state" in an Spark application?

2016-06-12 Thread Haopu Wang
Can someone look at my questions? Thanks again!

 



From: Haopu Wang 
Sent: 2016年6月12日 16:40
To: u...@spark.apache.org
Subject: Should I avoid "state" in an Spark application?

 

I have a Spark application whose structure is below:

 

var ts: Long = 0L

dstream1.foreachRDD{

(x, time) => {

ts = time

x.do_something()...

}

}

..

process_data(dstream2, ts, ..)

 

I assume foreachRDD function call can update "ts" variable which is then used 
in the Spark tasks of "process_data" function.

 

>From my test result of a standalone Spark cluster, it is working. But should I 
>concern if switch to YARN?

 

And I saw some articles are recommending to avoid state in Scala programming. 
Without the state variable, how could that be done?

 

Any comments or suggestions are appreciated.

 

Thanks,

Haopu



RE: [SparkStreaming] NPE in DStreamCheckPointData.scala:125

2015-06-17 Thread Haopu Wang
Can someone help? Thank you!



From: Haopu Wang 
Sent: Monday, June 15, 2015 3:36 PM
To: user; dev@spark.apache.org
Subject: [SparkStreaming] NPE in DStreamCheckPointData.scala:125



I use the attached program to test checkpoint. It's quite simple.

 

When I run the program second time, it will load checkpoint data, that's
expected, however I see NPE in driver log.

 

Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very
much!

 

== logs ==

 

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435313 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435314 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435315 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435316 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435317 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435318 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435319 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error
starting the context, marking it as stopped

java.io.IOException: java.lang.NullPointerException

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)

   at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt
reamCheckpointData.scala:123)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$
mcV$sp(DStream.scala:498)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493
)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV
$sp(DStreamGraph.scala:181)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1

[SparkStreaming] NPE in DStreamCheckPointData.scala:125

2015-06-15 Thread Haopu Wang
I use the attached program to test checkpoint. It's quite simple.

 

When I run the program second time, it will load checkpoint data, that's
expected, however I see NPE in driver log.

 

Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very
much!

 

== logs ==

 

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435313 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435314 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435315 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435316 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435317 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435318 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435319 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error
starting the context, marking it as stopped

java.io.IOException: java.lang.NullPointerException

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)

   at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt
reamCheckpointData.scala:123)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$
mcV$sp(DStream.scala:498)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493
)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV
$sp(DStreamGraph.scala:181)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:1
76)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.Delega

RE: [SparkSQL] cannot filter by a DateType column

2015-05-10 Thread Haopu Wang
Sorry, I was using Spark 1.3.x.

 

I cannot reproduce it in master.

 

But should I still open a JIRA because can I request it to be back
ported to 1.3.x branch? Thanks again!

 



From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Saturday, May 09, 2015 2:41 AM
To: Haopu Wang
Cc: user; dev@spark.apache.org
Subject: Re: [SparkSQL] cannot filter by a DateType column

 

What version of Spark are you using?  It appears that at least in master
we are doing the conversion correctly, but its possible older versions
of applySchema do not.  If you can reproduce the same bug in master, can
you open a JIRA?

 

On Fri, May 8, 2015 at 1:36 AM, Haopu Wang  wrote:

I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

exception=

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

code=

 

val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val sc = new SparkContext(conf)

val sqlCtx = new HiveContext(sc)

import sqlCtx.implicits._



case class Test(dt: java.sql.Date)

 

val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF



var r = df.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")

var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

println("==")

 

// "df2" doesn't do filter correct!!

val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)



val schema = StructType(Array(StructField("dt", DateType, false)))



val df2 = sqlCtx.applySchema(rdd2, schema) 



r = df2.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")



r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

 

 



[SparkSQL] cannot filter by a DateType column

2015-05-08 Thread Haopu Wang
I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

exception=

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

code=

 

val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val sc = new SparkContext(conf)

val sqlCtx = new HiveContext(sc)

import sqlCtx.implicits._



case class Test(dt: java.sql.Date)

 

val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF



var r = df.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")

var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

println("==")

 

// "df2" doesn't do filter correct!!

val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)



val schema = StructType(Array(StructField("dt", DateType, false)))



val df2 = sqlCtx.applySchema(rdd2, schema) 



r = df2.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")



r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

 



RE: Is SQLContext thread-safe?

2015-04-30 Thread Haopu Wang
Hi, in a test on SparkSQL 1.3.0, multiple threads are doing select on a
same SQLContext instance, but below exception is thrown, so it looks
like SQLContext is NOT thread safe? I think this is not the desired
behavior.

==

java.lang.RuntimeException: [1.1] failure: ``insert'' expected but
identifier select found

select id ,ext.d from UNIT_TEST
^
 at scala.sys.package$.error(package.scala:27)
 at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
SQLParser.scala:40)
 at
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
QLParser$$others$1.apply(SparkSQLParser.scala:96)
 at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
QLParser$$others$1.apply(SparkSQLParser.scala:95)
 at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
s.scala:242)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
s.scala:242)
 at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
apply$2.apply(Parsers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
apply$2.apply(Parsers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
sers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
sers.scala:254)
 at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
rsers.scala:891)
 at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
rsers.scala:891)
 at
scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParser
s.scala:110)
 at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
SQLParser.scala:38)
 at
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
la:134)
 at
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
la:134)
 at scala.Option.getOrElse(Option.scala:120)
 at
org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:915)

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-----
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in
different threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: Can I call aggregate UDF in DataFrame?

2015-04-01 Thread Haopu Wang
Great! Thank you!

 



From: Reynold Xin [mailto:r...@databricks.com] 
Sent: Thursday, April 02, 2015 8:11 AM
To: Haopu Wang
Cc: user; dev@spark.apache.org
Subject: Re: Can I call aggregate UDF in DataFrame?

 

You totally can.

 

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/
apache/spark/sql/DataFrame.scala#L792

 

There is also an attempt at adding stddev here already:
https://github.com/apache/spark/pull/5228

 

 

 

On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang 
wrote:

Specifically there are only 5 aggregate functions in class
org.apache.spark.sql.GroupedData: sum/max/min/mean/count.

Can I plugin a function to calculate stddev?

Thank you!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Can I call aggregate UDF in DataFrame?

2015-03-26 Thread Haopu Wang
Specifically there are only 5 aggregate functions in class
org.apache.spark.sql.GroupedData: sum/max/min/mean/count.

Can I plugin a function to calculate stddev?

Thank you!


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?

2015-03-10 Thread Haopu Wang
Hao, thanks for the response.

 

For Q1, in my case, I have a tool on SparkShell which serves multiple
users where they can use different Hive installation. I take a look at
the code of HiveContext. It looks like I cannot do that today because
"catalog" field cannot be changed after initialize.

 

  /* A catalyst metadata catalog that points to the Hive Metastore. */

  @transient

  override protected[sql] lazy val catalog = new
HiveMetastoreCatalog(this) with OverrideCatalog

 

For Q2, I check HDFS and it is running as a cluster. I can run the DDL
from spark shell with HiveContext as well. To reproduce the exception, I
just run below script. It happens in the last step.

 

15/03/11 14:24:48 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET
hive.metastore.warehouse.dir=hdfs://server:8020/space/warehouse")

scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src(key INT, value
STRING)")

scala> sqlContext.sql("LOAD DATA LOCAL INPATH
'examples/src/main/resources/kv1.txt' INTO TABLE src")

scala> var output = sqlContext.sql("SELECT key,value FROM src")

scala> output.saveAsTable("outputtable")

 



From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Wednesday, March 11, 2015 8:25 AM
To: Haopu Wang; user; dev@spark.apache.org
Subject: RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?

 

I am not so sure if Hive supports change the metastore after
initialized, I guess not. Spark SQL totally rely on Hive Metastore in
HiveContext, probably that's why it doesn't work as expected for Q1.

 

BTW, in most of cases, people configure the metastore settings in
hive-site.xml, and will not change that since then, is there any reason
that you want to change that in runtime?

 

For Q2, probably something wrong in configuration, seems the HDFS run
into the pseudo/single node mode, can you double check that? Or can you
run the DDL (like create a table) from the spark shell with HiveContext?


 

From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, March 10, 2015 6:38 PM
To: user; dev@spark.apache.org
Subject: [SparkSQL] Reuse HiveContext to different Hive warehouse?

 

I'm using Spark 1.3.0 RC3 build with Hive support.

 

In Spark Shell, I want to reuse the HiveContext instance to different
warehouse locations. Below are the steps for my test (Assume I have
loaded a file into table "src").

 

==

15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table1")

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table2")

==

After these steps, the tables are stored in "/test/w" only. I expect
"table2" to be stored in "/test/w2" folder.

 

Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS
folder, I cannot use saveAsTable()? Is this by design? Exception stack
trace is below:

==

15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0

15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast
at TableReader.scala:74

java.lang.IllegalArgumentException: Wrong FS:
hdfs://server:8020/space/warehouse/table2, expected: file:///
 

at
org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)

at
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463)

at
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav
a:118)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:252)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:251)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP
arquet.scala:251)

at
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37
0)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:96)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:125)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)

  

[SparkSQL] Reuse HiveContext to different Hive warehouse?

2015-03-10 Thread Haopu Wang
I'm using Spark 1.3.0 RC3 build with Hive support.

 

In Spark Shell, I want to reuse the HiveContext instance to different
warehouse locations. Below are the steps for my test (Assume I have
loaded a file into table "src").

 

==

15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table1")

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table2")

==

After these steps, the tables are stored in "/test/w" only. I expect
"table2" to be stored in "/test/w2" folder.

 

Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS
folder, I cannot use saveAsTable()? Is this by design? Exception stack
trace is below:

==

15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0

15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast
at TableReader.scala:74

java.lang.IllegalArgumentException: Wrong FS:
hdfs://server:8020/space/warehouse/table2, expected: file:///

at
org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)

at
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463)

at
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav
a:118)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:252)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:251)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP
arquet.scala:251)

at
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37
0)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:96)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:125)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)

at
org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.ru
n(commands.scala:217)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompu
te(commands.scala:55)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands
.scala:55)

at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65
)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLConte
xt.scala:1088)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:10
88)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:998)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:964)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:942)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:20)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)

at $iwC$$iwC$$iwC$$iwC$$iwC.(:29)

at $iwC$$iwC$$iwC$$iwC.(:31)

at $iwC$$iwC$$iwC.(:33)

at $iwC$$iwC.(:35)

at $iwC.(:37)

at (:39)

 

Thank you very much!

 



Spark Streaming and SchemaRDD usage

2015-03-04 Thread Haopu Wang
Hi, in the roadmap of Spark in 2015 (link:
http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p
ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark
Streaming and Spark SQL.

My question is: what's the typical usage of SchemaRDD in a Spark
Streaming application? Thank you very much!


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: HiveContext cannot be serialized

2015-02-16 Thread Haopu Wang
Reynold and Michael, thank you so much for the quick response.

 

This problem also happens on branch-1.1, would you mind resolving it on
branch-1.1 also? Thanks again!

 



From: Reynold Xin [mailto:r...@databricks.com] 
Sent: Tuesday, February 17, 2015 3:44 AM
To: Michael Armbrust
Cc: Haopu Wang; dev@spark.apache.org
Subject: Re: HiveContext cannot be serialized

 

I submitted a patch

 

https://github.com/apache/spark/pull/4628

 

On Mon, Feb 16, 2015 at 10:59 AM, Michael Armbrust
 wrote:

I was suggesting you mark the variable that is holding the HiveContext
'@transient' since the scala compiler is not correctly propagating this
through the tuple extraction.  This is only a workaround.  We can also
remove the tuple extraction.

 

On Mon, Feb 16, 2015 at 10:47 AM, Reynold Xin 
wrote:

Michael - it is already transient. This should probably considered a bug
in the scala compiler, but we can easily work around it by removing the
use of destructuring binding.

 

On Mon, Feb 16, 2015 at 10:41 AM, Michael Armbrust
 wrote:

I'd suggest marking the HiveContext as @transient since its not valid to
use it on the slaves anyway.


On Mon, Feb 16, 2015 at 4:27 AM, Haopu Wang  wrote:

> When I'm investigating this issue (in the end of this email), I take a
> look at HiveContext's code and find this change
>
(https://github.com/apache/spark/commit/64945f868443fbc59cb34b34c16d782d
> da0fb63d#diff-ff50aea397a607b79df9bec6f2a841db):
>
>
>
> -  @transient protected[hive] lazy val hiveconf = new
> HiveConf(classOf[SessionState])
>
> -  @transient protected[hive] lazy val sessionState = {
>
> -val ss = new SessionState(hiveconf)
>
> -setConf(hiveconf.getAllProperties)  // Have SQLConf pick up the
> initial set of HiveConf.
>
> -ss
>
> -  }
>
> +  @transient protected[hive] lazy val (hiveconf, sessionState) =
>
> +Option(SessionState.get())
>
> +  .orElse {
>
>
>
> With the new change, Scala compiler always generate a Tuple2 field of
> HiveContext as below:
>
>
>
> private Tuple2 x$3;
>
> private transient OutputStream outputBuffer;
>
> private transient HiveConf hiveconf;
>
> private transient SessionState sessionState;
>
> private transient HiveMetastoreCatalog catalog;
>
>
>
> That "x$3" field's key is HiveConf object that cannot be serialized.
So
> can you suggest how to resolve this issue? Thank you very much!
>
>
>
> 
>
>
>
> I have a streaming application which registered temp table on a
> HiveContext for each batch duration.
>
> The application runs well in Spark 1.1.0. But I get below error from
> 1.1.1.
>
> Do you have any suggestions to resolve it? Thank you!
>
>
>
> java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf
>
> - field (class "scala.Tuple2", name: "_1", type: "class
> java.lang.Object")
>
> - object (class "scala.Tuple2", (Configuration: core-default.xml,
> core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
> yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
>
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa
> che.hadoop.hive.ql.session.SessionState@49b6eef9))
>
> - field (class "org.apache.spark.sql.hive.HiveContext", name:
"x$3",
> type: "class scala.Tuple2")
>
> - object (class "org.apache.spark.sql.hive.HiveContext",
> org.apache.spark.sql.hive.HiveContext@4e6e66a4)
>
> - field (class
> "example.BaseQueryableDStream$$anonfun$registerTempTable$2", name:
> "sqlContext$1", type: "class org.apache.spark.sql.SQLContext")
>
>- object (class
> "example.BaseQueryableDStream$$anonfun$registerTempTable$2",
> )
>
> - field (class
> "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
> name: "foreachFunc$1", type: "interface scala.Function1")
>
> - object (class
> "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
> )
>
> - field (class
"org.apache.spark.streaming.dstream.ForEachDStream",
> name:
"org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
> type: "interface scala.Function2")
>
> - object (class
"org.apache.spark.streaming.dstream.ForEachDStream",
> org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)
>
> - element of array (index: 0)
>
> - array (class "[Ljava.lang.Object;", size: 16)
>
> - field (class "scala.collectio

HiveContext cannot be serialized

2015-02-16 Thread Haopu Wang
When I'm investigating this issue (in the end of this email), I take a
look at HiveContext's code and find this change
(https://github.com/apache/spark/commit/64945f868443fbc59cb34b34c16d782d
da0fb63d#diff-ff50aea397a607b79df9bec6f2a841db):

 

-  @transient protected[hive] lazy val hiveconf = new
HiveConf(classOf[SessionState])

-  @transient protected[hive] lazy val sessionState = {

-val ss = new SessionState(hiveconf)

-setConf(hiveconf.getAllProperties)  // Have SQLConf pick up the
initial set of HiveConf.

-ss

-  }

+  @transient protected[hive] lazy val (hiveconf, sessionState) =

+Option(SessionState.get())

+  .orElse {

 

With the new change, Scala compiler always generate a Tuple2 field of
HiveContext as below:

 

private Tuple2 x$3;

private transient OutputStream outputBuffer;

private transient HiveConf hiveconf;

private transient SessionState sessionState;

private transient HiveMetastoreCatalog catalog;

 

That "x$3" field's key is HiveConf object that cannot be serialized. So
can you suggest how to resolve this issue? Thank you very much!

 



 

I have a streaming application which registered temp table on a
HiveContext for each batch duration.

The application runs well in Spark 1.1.0. But I get below error from
1.1.1.

Do you have any suggestions to resolve it? Thank you!

 

java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf

- field (class "scala.Tuple2", name: "_1", type: "class
java.lang.Object")

- object (class "scala.Tuple2", (Configuration: core-default.xml,
core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa
che.hadoop.hive.ql.session.SessionState@49b6eef9))

- field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3",
type: "class scala.Tuple2")

- object (class "org.apache.spark.sql.hive.HiveContext",
org.apache.spark.sql.hive.HiveContext@4e6e66a4)

- field (class
"example.BaseQueryableDStream$$anonfun$registerTempTable$2", name:
"sqlContext$1", type: "class org.apache.spark.sql.SQLContext")

   - object (class
"example.BaseQueryableDStream$$anonfun$registerTempTable$2",
)

- field (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
name: "foreachFunc$1", type: "interface scala.Function1")

- object (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
)

- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")

- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)

- element of array (index: 0)

- array (class "[Ljava.lang.Object;", size: 16)

- field (class "scala.collection.mutable.ArrayBuffer", name:
"array", type: "class [Ljava.lang.Object;")

- object (class "scala.collection.mutable.ArrayBuffer",
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))

- field (class "org.apache.spark.streaming.DStreamGraph", name:
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")

- custom writeObject data (class
"org.apache.spark.streaming.DStreamGraph")

- object (class "org.apache.spark.streaming.DStreamGraph",
org.apache.spark.streaming.DStreamGraph@776ae7da)

- field (class "org.apache.spark.streaming.Checkpoint", name:
"graph", type: "class org.apache.spark.streaming.DStreamGraph")

- root object (class "org.apache.spark.streaming.Checkpoint",
org.apache.spark.streaming.Checkpoint@5eade065)

at java.io.ObjectOutputStream.writeObject0(Unknown Source)

 

 

 



Do you know any Spark modeling tool?

2014-12-25 Thread Haopu Wang
Hi, I think a modeling tool may be helpful because sometimes it's
hard/tricky to program Spark. I don't know if there is already such a
tool.

Thanks!

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-10-07 Thread Haopu Wang
Liquan, yes, for full outer join, one hash table on both sides is more 
efficient.

 

For the left/right outer join, it looks like one hash table should be enought.

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 18:34
To: Haopu Wang
Cc: dev@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

How about full outer join? One hash table may not be efficient for this case. 

 

Liquan

 

On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang  wrote:

Hi, Liquan, thanks for the response.

 

In your example, I think the hash table should be built on the "right" side, so 
Spark can iterate through the left side and find matches in the right side from 
the hash table efficiently. Please comment and suggest, thanks again!

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 12:31
To: Haopu Wang
Cc: dev@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

My understanding is that the hashtable on both left and right side is used for 
including null values in result in an efficient manner. If hash table is only 
built on one side, let's say left side and we perform a left outer join, for 
each row in left side, a scan over the right side is needed to make sure that 
no matching tuples for that row on left side. 

 

Hope this helps!

Liquan

 

On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang  wrote:

I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 



RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-29 Thread Haopu Wang
Hi, Liquan, thanks for the response.

 

In your example, I think the hash table should be built on the "right" side, so 
Spark can iterate through the left side and find matches in the right side from 
the hash table efficiently. Please comment and suggest, thanks again!

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 12:31
To: Haopu Wang
Cc: dev@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

My understanding is that the hashtable on both left and right side is used for 
including null values in result in an efficient manner. If hash table is only 
built on one side, let's say left side and we perform a left outer join, for 
each row in left side, a scan over the right side is needed to make sure that 
no matching tuples for that row on left side. 

 

Hope this helps!

Liquan

 

On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang  wrote:

I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 



Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-29 Thread Haopu Wang
I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



FW: Spark SQL 1.1.0: NPE when join two cached table

2014-09-22 Thread Haopu Wang
FWD to dev mail list for helps

 



From: Haopu Wang 
Sent: 2014年9月22日 16:35
To: u...@spark.apache.org
Subject: Spark SQL 1.1.0: NPE when join two cached table

 

I have two data sets and want to join them on each first field. Sample data are 
below:

 

data set 1:

  id2,name1,2,300.0

 

data set 2:

  id1,

 

The code is something like below:

 

val sparkConf = new SparkConf().setAppName("JoinInScala")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")

import org.apache.spark.sql._   



val testdata = sc.textFile(args(0) + "data.txt").map(_.split(","))

  .map(p => Row(p(0), p(1).trim, p(2).trim.toLong, p(3).trim.toDouble))

  

val fields = new Array[StructField](4)

fields(0) = StructField("id", StringType, false);

fields(1) = StructField("name", StringType, false);

fields(2) = StructField("agg1", LongType, false);

fields(3) = StructField("agg2", DoubleType, false);

val schema = StructType(fields);

 

val data = sqlContext.applySchema(testdata, schema)



data.registerTempTable("datatable")

sqlContext.cacheTable("datatable")

 

val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(","))

  .map(p => Row(p(0), p(1).trim))

  

val reffields = new Array[StructField](2)

reffields(0) = StructField("id", StringType, false);

reffields(1) = StructField("data", StringType, true);

val refschema = StructType(reffields);

 

val refschemardd = sqlContext.applySchema(refdata, refschema)

refschemardd.registerTempTable("ref")

sqlContext.cacheTable("ref")



   val results = sqlContext.sql("SELECT d.id,d.name,d.agg1,d.agg2,ref.data FROM 
datatable as d join ref on d.id=ref.id")

results.foreach(T => Unit);

 

But I got below NullPointerException. If I comment out the two "cacheTable()" 
calls, the program run well. Please shed some lights, thank you!

 

Exception in thread "main" java.lang.NullPointerException

at 
org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InMemoryColumnarTableScan.scala:43)

at 
org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:42)

at 
org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStrategies.scala:83)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)

at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:268)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)

at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)

at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)

at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)

at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)

at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)

at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)

at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189)

at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233)

at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:202