RE: Should I avoid "state" in an Spark application?
Can someone look at my questions? Thanks again! From: Haopu Wang Sent: 2016年6月12日 16:40 To: u...@spark.apache.org Subject: Should I avoid "state" in an Spark application? I have a Spark application whose structure is below: var ts: Long = 0L dstream1.foreachRDD{ (x, time) => { ts = time x.do_something()... } } .. process_data(dstream2, ts, ..) I assume foreachRDD function call can update "ts" variable which is then used in the Spark tasks of "process_data" function. >From my test result of a standalone Spark cluster, it is working. But should I >concern if switch to YARN? And I saw some articles are recommending to avoid state in Scala programming. Without the state variable, how could that be done? Any comments or suggestions are appreciated. Thanks, Haopu
RE: [SparkStreaming] NPE in DStreamCheckPointData.scala:125
Can someone help? Thank you! From: Haopu Wang Sent: Monday, June 15, 2015 3:36 PM To: user; dev@spark.apache.org Subject: [SparkStreaming] NPE in DStreamCheckPointData.scala:125 I use the attached program to test checkpoint. It's quite simple. When I run the program second time, it will load checkpoint data, that's expected, however I see NPE in driver log. Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very much! == logs == 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435313 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435314 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435315 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435316 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435317 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435318 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435319 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error starting the context, marking it as stopped java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt reamCheckpointData.scala:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44 0) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$ mcV$sp(DStream.scala:498) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493 ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44 0) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV $sp(DStreamGraph.scala:181) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1
[SparkStreaming] NPE in DStreamCheckPointData.scala:125
I use the attached program to test checkpoint. It's quite simple. When I run the program second time, it will load checkpoint data, that's expected, however I see NPE in driver log. Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very much! == logs == 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435313 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435314 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435315 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435316 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435317 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435318 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream$FileInputDStreamCheckpointData: Restoring files for time 143435319 ms - [] 15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored checkpoint data 15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error starting the context, marking it as stopped java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt reamCheckpointData.scala:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44 0) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$ mcV$sp(DStream.scala:498) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493 ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1 431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15 47) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44 0) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV $sp(DStreamGraph.scala:181) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:1 76) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:57) at sun.reflect.Delega
RE: [SparkSQL] cannot filter by a DateType column
Sorry, I was using Spark 1.3.x. I cannot reproduce it in master. But should I still open a JIRA because can I request it to be back ported to 1.3.x branch? Thanks again! From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Saturday, May 09, 2015 2:41 AM To: Haopu Wang Cc: user; dev@spark.apache.org Subject: Re: [SparkSQL] cannot filter by a DateType column What version of Spark are you using? It appears that at least in master we are doing the conversion correctly, but its possible older versions of applySchema do not. If you can reproduce the same bug in master, can you open a JIRA? On Fri, May 8, 2015 at 1:36 AM, Haopu Wang wrote: I want to filter a DataFrame based on a Date column. If the DataFrame object is constructed from a scala case class, it's working (either compare as String or Date). But if the DataFrame is generated by specifying a Schema to an RDD, it doesn't work. Below is the exception and test code. Do you have any idea about the error? Thank you very much! exception= java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$ anonfun$apply$6.apply(Cast.scala:116) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata lyst$expressions$Cast$$buildCast(Cast.scala:111) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a pply(Cast.scala:116) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426) at org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic ates.scala:305) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$ apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$ apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) code= val conf = new SparkConf().setAppName("DFTest").setMaster("local[*]") val sc = new SparkContext(conf) val sqlCtx = new HiveContext(sc) import sqlCtx.implicits._ case class Test(dt: java.sql.Date) val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF var r = df.filter("dt >= '2015-05-06'") r.explain(true) r.show println("==") var r2 = df.filter("dt >= cast('2015-05-06' as DATE)") r2.explain(true) r2.show println("==") // "df2" doesn't do filter correct!! val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7) val schema = StructType(Array(StructField("dt", DateType, false))) val df2 = sqlCtx.applySchema(rdd2, schema) r = df2.filter("dt >= '2015-05-06'") r.explain(true) r.show println("==") r2 = df2.filter("dt >= cast('2015-05-06' as DATE)") r2.explain(true) r2.show
[SparkSQL] cannot filter by a DateType column
I want to filter a DataFrame based on a Date column. If the DataFrame object is constructed from a scala case class, it's working (either compare as String or Date). But if the DataFrame is generated by specifying a Schema to an RDD, it doesn't work. Below is the exception and test code. Do you have any idea about the error? Thank you very much! exception= java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$ anonfun$apply$6.apply(Cast.scala:116) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata lyst$expressions$Cast$$buildCast(Cast.scala:111) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a pply(Cast.scala:116) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426) at org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic ates.scala:305) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$ apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$ apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) code= val conf = new SparkConf().setAppName("DFTest").setMaster("local[*]") val sc = new SparkContext(conf) val sqlCtx = new HiveContext(sc) import sqlCtx.implicits._ case class Test(dt: java.sql.Date) val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF var r = df.filter("dt >= '2015-05-06'") r.explain(true) r.show println("==") var r2 = df.filter("dt >= cast('2015-05-06' as DATE)") r2.explain(true) r2.show println("==") // "df2" doesn't do filter correct!! val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7) val schema = StructType(Array(StructField("dt", DateType, false))) val df2 = sqlCtx.applySchema(rdd2, schema) r = df2.filter("dt >= '2015-05-06'") r.explain(true) r.show println("==") r2 = df2.filter("dt >= cast('2015-05-06' as DATE)") r2.explain(true) r2.show
RE: Is SQLContext thread-safe?
Hi, in a test on SparkSQL 1.3.0, multiple threads are doing select on a same SQLContext instance, but below exception is thrown, so it looks like SQLContext is NOT thread safe? I think this is not the desired behavior. == java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier select found select id ,ext.d from UNIT_TEST ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark SQLParser.scala:40) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS QLParser$$others$1.apply(SparkSQLParser.scala:96) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS QLParser$$others$1.apply(SparkSQLParser.scala:95) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser s.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser s.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$ apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$ apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par sers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par sers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa rsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa rsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParser s.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark SQLParser.scala:38) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca la:134) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca la:134) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:915) -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 9:05 PM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Yes it is thread safe, at least it's supposed to be. -Original Message----- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RE: Can I call aggregate UDF in DataFrame?
Great! Thank you! From: Reynold Xin [mailto:r...@databricks.com] Sent: Thursday, April 02, 2015 8:11 AM To: Haopu Wang Cc: user; dev@spark.apache.org Subject: Re: Can I call aggregate UDF in DataFrame? You totally can. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/ apache/spark/sql/DataFrame.scala#L792 There is also an attempt at adding stddev here already: https://github.com/apache/spark/pull/5228 On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang wrote: Specifically there are only 5 aggregate functions in class org.apache.spark.sql.GroupedData: sum/max/min/mean/count. Can I plugin a function to calculate stddev? Thank you! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can I call aggregate UDF in DataFrame?
Specifically there are only 5 aggregate functions in class org.apache.spark.sql.GroupedData: sum/max/min/mean/count. Can I plugin a function to calculate stddev? Thank you! - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?
Hao, thanks for the response. For Q1, in my case, I have a tool on SparkShell which serves multiple users where they can use different Hive installation. I take a look at the code of HiveContext. It looks like I cannot do that today because "catalog" field cannot be changed after initialize. /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog For Q2, I check HDFS and it is running as a cluster. I can run the DDL from spark shell with HiveContext as well. To reproduce the exception, I just run below script. It happens in the last step. 15/03/11 14:24:48 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> sqlContext.sql("SET hive.metastore.warehouse.dir=hdfs://server:8020/space/warehouse") scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src(key INT, value STRING)") scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") scala> var output = sqlContext.sql("SELECT key,value FROM src") scala> output.saveAsTable("outputtable") From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Wednesday, March 11, 2015 8:25 AM To: Haopu Wang; user; dev@spark.apache.org Subject: RE: [SparkSQL] Reuse HiveContext to different Hive warehouse? I am not so sure if Hive supports change the metastore after initialized, I guess not. Spark SQL totally rely on Hive Metastore in HiveContext, probably that's why it doesn't work as expected for Q1. BTW, in most of cases, people configure the metastore settings in hive-site.xml, and will not change that since then, is there any reason that you want to change that in runtime? For Q2, probably something wrong in configuration, seems the HDFS run into the pseudo/single node mode, can you double check that? Or can you run the DDL (like create a table) from the spark shell with HiveContext? From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, March 10, 2015 6:38 PM To: user; dev@spark.apache.org Subject: [SparkSQL] Reuse HiveContext to different Hive warehouse? I'm using Spark 1.3.0 RC3 build with Hive support. In Spark Shell, I want to reuse the HiveContext instance to different warehouse locations. Below are the steps for my test (Assume I have loaded a file into table "src"). == 15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w") scala> sqlContext.sql("SELECT * from src").saveAsTable("table1") scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2") scala> sqlContext.sql("SELECT * from src").saveAsTable("table2") == After these steps, the tables are stored in "/test/w" only. I expect "table2" to be stored in "/test/w2" folder. Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS folder, I cannot use saveAsTable()? Is this by design? Exception stack trace is below: == 15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast at TableReader.scala:74 java.lang.IllegalArgumentException: Wrong FS: hdfs://server:8020/space/warehouse/table2, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463) at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav a:118) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a pply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a pply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc ala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc ala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP arquet.scala:251) at org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37 0) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca la:96) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca la:125) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)
[SparkSQL] Reuse HiveContext to different Hive warehouse?
I'm using Spark 1.3.0 RC3 build with Hive support. In Spark Shell, I want to reuse the HiveContext instance to different warehouse locations. Below are the steps for my test (Assume I have loaded a file into table "src"). == 15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w") scala> sqlContext.sql("SELECT * from src").saveAsTable("table1") scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2") scala> sqlContext.sql("SELECT * from src").saveAsTable("table2") == After these steps, the tables are stored in "/test/w" only. I expect "table2" to be stored in "/test/w2" folder. Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS folder, I cannot use saveAsTable()? Is this by design? Exception stack trace is below: == 15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast at TableReader.scala:74 java.lang.IllegalArgumentException: Wrong FS: hdfs://server:8020/space/warehouse/table2, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463) at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav a:118) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a pply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a pply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc ala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc ala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP arquet.scala:251) at org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37 0) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca la:96) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca la:125) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.ru n(commands.scala:217) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompu te(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands .scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65 ) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLConte xt.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:10 88) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:998) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:964) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:942) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:20) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27) at $iwC$$iwC$$iwC$$iwC$$iwC.(:29) at $iwC$$iwC$$iwC$$iwC.(:31) at $iwC$$iwC$$iwC.(:33) at $iwC$$iwC.(:35) at $iwC.(:37) at (:39) Thank you very much!
Spark Streaming and SchemaRDD usage
Hi, in the roadmap of Spark in 2015 (link: http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark Streaming and Spark SQL. My question is: what's the typical usage of SchemaRDD in a Spark Streaming application? Thank you very much! - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RE: HiveContext cannot be serialized
Reynold and Michael, thank you so much for the quick response. This problem also happens on branch-1.1, would you mind resolving it on branch-1.1 also? Thanks again! From: Reynold Xin [mailto:r...@databricks.com] Sent: Tuesday, February 17, 2015 3:44 AM To: Michael Armbrust Cc: Haopu Wang; dev@spark.apache.org Subject: Re: HiveContext cannot be serialized I submitted a patch https://github.com/apache/spark/pull/4628 On Mon, Feb 16, 2015 at 10:59 AM, Michael Armbrust wrote: I was suggesting you mark the variable that is holding the HiveContext '@transient' since the scala compiler is not correctly propagating this through the tuple extraction. This is only a workaround. We can also remove the tuple extraction. On Mon, Feb 16, 2015 at 10:47 AM, Reynold Xin wrote: Michael - it is already transient. This should probably considered a bug in the scala compiler, but we can easily work around it by removing the use of destructuring binding. On Mon, Feb 16, 2015 at 10:41 AM, Michael Armbrust wrote: I'd suggest marking the HiveContext as @transient since its not valid to use it on the slaves anyway. On Mon, Feb 16, 2015 at 4:27 AM, Haopu Wang wrote: > When I'm investigating this issue (in the end of this email), I take a > look at HiveContext's code and find this change > (https://github.com/apache/spark/commit/64945f868443fbc59cb34b34c16d782d > da0fb63d#diff-ff50aea397a607b79df9bec6f2a841db): > > > > - @transient protected[hive] lazy val hiveconf = new > HiveConf(classOf[SessionState]) > > - @transient protected[hive] lazy val sessionState = { > > -val ss = new SessionState(hiveconf) > > -setConf(hiveconf.getAllProperties) // Have SQLConf pick up the > initial set of HiveConf. > > -ss > > - } > > + @transient protected[hive] lazy val (hiveconf, sessionState) = > > +Option(SessionState.get()) > > + .orElse { > > > > With the new change, Scala compiler always generate a Tuple2 field of > HiveContext as below: > > > > private Tuple2 x$3; > > private transient OutputStream outputBuffer; > > private transient HiveConf hiveconf; > > private transient SessionState sessionState; > > private transient HiveMetastoreCatalog catalog; > > > > That "x$3" field's key is HiveConf object that cannot be serialized. So > can you suggest how to resolve this issue? Thank you very much! > > > > > > > > I have a streaming application which registered temp table on a > HiveContext for each batch duration. > > The application runs well in Spark 1.1.0. But I get below error from > 1.1.1. > > Do you have any suggestions to resolve it? Thank you! > > > > java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf > > - field (class "scala.Tuple2", name: "_1", type: "class > java.lang.Object") > > - object (class "scala.Tuple2", (Configuration: core-default.xml, > core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, > yarn-site.xml, hdfs-default.xml, hdfs-site.xml, > org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa > che.hadoop.hive.ql.session.SessionState@49b6eef9)) > > - field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3", > type: "class scala.Tuple2") > > - object (class "org.apache.spark.sql.hive.HiveContext", > org.apache.spark.sql.hive.HiveContext@4e6e66a4) > > - field (class > "example.BaseQueryableDStream$$anonfun$registerTempTable$2", name: > "sqlContext$1", type: "class org.apache.spark.sql.SQLContext") > >- object (class > "example.BaseQueryableDStream$$anonfun$registerTempTable$2", > ) > > - field (class > "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", > name: "foreachFunc$1", type: "interface scala.Function1") > > - object (class > "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", > ) > > - field (class "org.apache.spark.streaming.dstream.ForEachDStream", > name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", > type: "interface scala.Function2") > > - object (class "org.apache.spark.streaming.dstream.ForEachDStream", > org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20) > > - element of array (index: 0) > > - array (class "[Ljava.lang.Object;", size: 16) > > - field (class "scala.collectio
HiveContext cannot be serialized
When I'm investigating this issue (in the end of this email), I take a look at HiveContext's code and find this change (https://github.com/apache/spark/commit/64945f868443fbc59cb34b34c16d782d da0fb63d#diff-ff50aea397a607b79df9bec6f2a841db): - @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = { -val ss = new SessionState(hiveconf) -setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. -ss - } + @transient protected[hive] lazy val (hiveconf, sessionState) = +Option(SessionState.get()) + .orElse { With the new change, Scala compiler always generate a Tuple2 field of HiveContext as below: private Tuple2 x$3; private transient OutputStream outputBuffer; private transient HiveConf hiveconf; private transient SessionState sessionState; private transient HiveMetastoreCatalog catalog; That "x$3" field's key is HiveConf object that cannot be serialized. So can you suggest how to resolve this issue? Thank you very much! I have a streaming application which registered temp table on a HiveContext for each batch duration. The application runs well in Spark 1.1.0. But I get below error from 1.1.1. Do you have any suggestions to resolve it? Thank you! java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf - field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object") - object (class "scala.Tuple2", (Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa che.hadoop.hive.ql.session.SessionState@49b6eef9)) - field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3", type: "class scala.Tuple2") - object (class "org.apache.spark.sql.hive.HiveContext", org.apache.spark.sql.hive.HiveContext@4e6e66a4) - field (class "example.BaseQueryableDStream$$anonfun$registerTempTable$2", name: "sqlContext$1", type: "class org.apache.spark.sql.SQLContext") - object (class "example.BaseQueryableDStream$$anonfun$registerTempTable$2", ) - field (class "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name: "foreachFunc$1", type: "interface scala.Function1") - object (class "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", ) - field (class "org.apache.spark.streaming.dstream.ForEachDStream", name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: "interface scala.Function2") - object (class "org.apache.spark.streaming.dstream.ForEachDStream", org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20) - element of array (index: 0) - array (class "[Ljava.lang.Object;", size: 16) - field (class "scala.collection.mutable.ArrayBuffer", name: "array", type: "class [Ljava.lang.Object;") - object (class "scala.collection.mutable.ArrayBuffer", ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)) - field (class "org.apache.spark.streaming.DStreamGraph", name: "outputStreams", type: "class scala.collection.mutable.ArrayBuffer") - custom writeObject data (class "org.apache.spark.streaming.DStreamGraph") - object (class "org.apache.spark.streaming.DStreamGraph", org.apache.spark.streaming.DStreamGraph@776ae7da) - field (class "org.apache.spark.streaming.Checkpoint", name: "graph", type: "class org.apache.spark.streaming.DStreamGraph") - root object (class "org.apache.spark.streaming.Checkpoint", org.apache.spark.streaming.Checkpoint@5eade065) at java.io.ObjectOutputStream.writeObject0(Unknown Source)
Do you know any Spark modeling tool?
Hi, I think a modeling tool may be helpful because sometimes it's hard/tricky to program Spark. I don't know if there is already such a tool. Thanks! - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
Liquan, yes, for full outer join, one hash table on both sides is more efficient. For the left/right outer join, it looks like one hash table should be enought. From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 18:34 To: Haopu Wang Cc: dev@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the "right" side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 12:31 To: Haopu Wang Cc: dev@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the "right" side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 12:31 To: Haopu Wang Cc: dev@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Spark SQL question: why build hashtable for both sides in HashOuterJoin?
I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
FW: Spark SQL 1.1.0: NPE when join two cached table
FWD to dev mail list for helps From: Haopu Wang Sent: 2014年9月22日 16:35 To: u...@spark.apache.org Subject: Spark SQL 1.1.0: NPE when join two cached table I have two data sets and want to join them on each first field. Sample data are below: data set 1: id2,name1,2,300.0 data set 2: id1, The code is something like below: val sparkConf = new SparkConf().setAppName("JoinInScala") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") import org.apache.spark.sql._ val testdata = sc.textFile(args(0) + "data.txt").map(_.split(",")) .map(p => Row(p(0), p(1).trim, p(2).trim.toLong, p(3).trim.toDouble)) val fields = new Array[StructField](4) fields(0) = StructField("id", StringType, false); fields(1) = StructField("name", StringType, false); fields(2) = StructField("agg1", LongType, false); fields(3) = StructField("agg2", DoubleType, false); val schema = StructType(fields); val data = sqlContext.applySchema(testdata, schema) data.registerTempTable("datatable") sqlContext.cacheTable("datatable") val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(",")) .map(p => Row(p(0), p(1).trim)) val reffields = new Array[StructField](2) reffields(0) = StructField("id", StringType, false); reffields(1) = StructField("data", StringType, true); val refschema = StructType(reffields); val refschemardd = sqlContext.applySchema(refdata, refschema) refschemardd.registerTempTable("ref") sqlContext.cacheTable("ref") val results = sqlContext.sql("SELECT d.id,d.name,d.agg1,d.agg2,ref.data FROM datatable as d join ref on d.id=ref.id") results.foreach(T => Unit); But I got below NullPointerException. If I comment out the two "cacheTable()" calls, the program run well. Please shed some lights, thank you! Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InMemoryColumnarTableScan.scala:43) at org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:42) at org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStrategies.scala:83) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:268) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202