Re: Spark SQL reads all leaf directories on a partitioned Hive table
Thank you, Subash. It works! On Tue, Aug 13, 2019 at 5:58 AM Subash Prabakar wrote: > I had the similar issue reading the external parquet table . In my case I > had permission issue in one partition so I added filter to exclude that > partition but still the spark didn’t prune it. Then I read that in order > for spark to be aware of all the partitions it first read the folders and > then updated its metastore . Then the sql is applied on TOP of it. Instead > of using the existing hive SerDe and this property is only for parquet > files. > > Hive metastore Parquet table conversion > <https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#hive-metastore-parquet-table-conversion> > > When reading from and writing to Hive metastore Parquet tables, Spark SQL > will try to use its own Parquet support instead of Hive SerDe for better > performance. This behavior is controlled by the > spark.sql.hive.convertMetastoreParquetconfiguration, and is turned on by > default. > > Reference: > https://spark.apache.org/docs/2.3.0/sql-programming-guide.html > > Set the above property to false . It should work. > > If anyone have better explanation please let me know - I have same > question. Why only parquet has this problem ? > > Thanks > Subash > > On Fri, 9 Aug 2019 at 16:18, Hao Ren wrote: > >> Hi Mich, >> >> Thank you for your reply. >> I need to be more clear about the environment. I am using spark-shell to >> run the query. >> Actually, the query works even without core-site, hdfs-site being under >> $SPARK_HOME/conf. >> My problem is efficiency. Because all of the partitions was scanned >> instead of the one in question during the execution of the spark sql query. >> This is why this simple query takes too much time. >> I would like to know how to improve this by just reading the specific >> partition in question. >> >> Feel free to ask more questions if I am not clear. >> >> Best regards, >> Hao >> >> On Thu, Aug 8, 2019 at 9:05 PM Mich Talebzadeh >> wrote: >> >>> also need others as well using soft link ls -l >>> >>> cd $SPARK_HOME/conf >>> >>> hive-site.xml -> ${HIVE_HOME/conf/hive-site.xml >>> core-site.xml -> ${HADOOP_HOME}/etc/hadoop/core-site.xml >>> hdfs-site.xml -> ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Thu, 8 Aug 2019 at 15:16, Hao Ren wrote: >>> >>>> >>>> >>>> -- Forwarded message - >>>> From: Hao Ren >>>> Date: Thu, Aug 8, 2019 at 4:15 PM >>>> Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive >>>> table >>>> To: Gourav Sengupta >>>> >>>> >>>> Hi Gourva, >>>> >>>> I am using enableHiveSupport. >>>> The table was not created by Spark. The table already exists in Hive. >>>> All I did is just reading it by using SQL query in Spark. >>>> FYI, I put hive-site.xml in spark/conf/ directory to make sure that >>>> Spark can access to Hive. >>>> >>>> Hao >>>> >>>> On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta < >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Just out of curiosity did you start the SPARK session using >>>>> enableHiveSupport() ? >>>>> >>>>> Or are you creating the table using SPARK? >>>>> >>>>> >>>>> Regards, >>>>> Gourav >>>>> >>>>> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren wrote: >>>>> >>>>>> Hi, >>>>>> I am using Spark SQL 2.3.3 to read a hive table which is
Re: Spark SQL reads all leaf directories on a partitioned Hive table
Hi Mich, Thank you for your reply. I need to be more clear about the environment. I am using spark-shell to run the query. Actually, the query works even without core-site, hdfs-site being under $SPARK_HOME/conf. My problem is efficiency. Because all of the partitions was scanned instead of the one in question during the execution of the spark sql query. This is why this simple query takes too much time. I would like to know how to improve this by just reading the specific partition in question. Feel free to ask more questions if I am not clear. Best regards, Hao On Thu, Aug 8, 2019 at 9:05 PM Mich Talebzadeh wrote: > also need others as well using soft link ls -l > > cd $SPARK_HOME/conf > > hive-site.xml -> ${HIVE_HOME/conf/hive-site.xml > core-site.xml -> ${HADOOP_HOME}/etc/hadoop/core-site.xml > hdfs-site.xml -> ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 8 Aug 2019 at 15:16, Hao Ren wrote: > >> >> >> -- Forwarded message - >> From: Hao Ren >> Date: Thu, Aug 8, 2019 at 4:15 PM >> Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive >> table >> To: Gourav Sengupta >> >> >> Hi Gourva, >> >> I am using enableHiveSupport. >> The table was not created by Spark. The table already exists in Hive. All >> I did is just reading it by using SQL query in Spark. >> FYI, I put hive-site.xml in spark/conf/ directory to make sure that Spark >> can access to Hive. >> >> Hao >> >> On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta >> wrote: >> >>> Hi, >>> >>> Just out of curiosity did you start the SPARK session using >>> enableHiveSupport() ? >>> >>> Or are you creating the table using SPARK? >>> >>> >>> Regards, >>> Gourav >>> >>> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren wrote: >>> >>>> Hi, >>>> I am using Spark SQL 2.3.3 to read a hive table which is partitioned by >>>> day, hour, platform, request_status and is_sampled. The underlying data is >>>> in parquet format on HDFS. >>>> Here is the SQL query to read just *one partition*. >>>> >>>> ``` >>>> spark.sql(""" >>>> SELECT rtb_platform_id, SUM(e_cpm) >>>> FROM raw_logs.fact_request >>>> WHERE day = '2019-08-01' >>>> AND hour = '00' >>>> AND platform = 'US' >>>> AND request_status = '3' >>>> AND is_sampled = 1 >>>> GROUP BY rtb_platform_id >>>> """).show >>>> ``` >>>> >>>> However, from the Spark web UI, the stage description shows: >>>> >>>> ``` >>>> Listing leaf files and directories for 201616 paths: >>>> viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0, >>>> ... >>>> ``` >>>> >>>> It seems the job is reading all of the partitions of the table and the >>>> job takes too long for just one partition. One workaround is using >>>> `spark.read.parquet` API to read parquet files directly. Spark has >>>> partition-awareness for partitioned directories. >>>> >>>> But still, I would like to know if there is a way to leverage >>>> partition-awareness via Hive by using `spark.sql` API? >>>> >>>> Any help is highly appreciated! >>>> >>>> Thank you. >>>> >>>> -- >>>> Hao Ren >>>> >>> >> >> -- >> Hao Ren >> >> Software Engineer in Machine Learning @ Criteo >> >> Paris, France >> >> >> -- >> Hao Ren >> >> Software Engineer in Machine Learning @ Criteo >> >> Paris, France >> > -- Hao Ren Software Engineer in Machine Learning @ Criteo Paris, France
Fwd: Spark SQL reads all leaf directories on a partitioned Hive table
-- Forwarded message - From: Hao Ren Date: Thu, Aug 8, 2019 at 4:15 PM Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive table To: Gourav Sengupta Hi Gourva, I am using enableHiveSupport. The table was not created by Spark. The table already exists in Hive. All I did is just reading it by using SQL query in Spark. FYI, I put hive-site.xml in spark/conf/ directory to make sure that Spark can access to Hive. Hao On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta wrote: > Hi, > > Just out of curiosity did you start the SPARK session using > enableHiveSupport() ? > > Or are you creating the table using SPARK? > > > Regards, > Gourav > > On Wed, Aug 7, 2019 at 3:28 PM Hao Ren wrote: > >> Hi, >> I am using Spark SQL 2.3.3 to read a hive table which is partitioned by >> day, hour, platform, request_status and is_sampled. The underlying data is >> in parquet format on HDFS. >> Here is the SQL query to read just *one partition*. >> >> ``` >> spark.sql(""" >> SELECT rtb_platform_id, SUM(e_cpm) >> FROM raw_logs.fact_request >> WHERE day = '2019-08-01' >> AND hour = '00' >> AND platform = 'US' >> AND request_status = '3' >> AND is_sampled = 1 >> GROUP BY rtb_platform_id >> """).show >> ``` >> >> However, from the Spark web UI, the stage description shows: >> >> ``` >> Listing leaf files and directories for 201616 paths: >> viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0, >> ... >> ``` >> >> It seems the job is reading all of the partitions of the table and the >> job takes too long for just one partition. One workaround is using >> `spark.read.parquet` API to read parquet files directly. Spark has >> partition-awareness for partitioned directories. >> >> But still, I would like to know if there is a way to leverage >> partition-awareness via Hive by using `spark.sql` API? >> >> Any help is highly appreciated! >> >> Thank you. >> >> -- >> Hao Ren >> > -- Hao Ren Software Engineer in Machine Learning @ Criteo Paris, France -- Hao Ren Software Engineer in Machine Learning @ Criteo Paris, France
Spark SQL reads all leaf directories on a partitioned Hive table
Hi, I am using Spark SQL 2.3.3 to read a hive table which is partitioned by day, hour, platform, request_status and is_sampled. The underlying data is in parquet format on HDFS. Here is the SQL query to read just *one partition*. ``` spark.sql(""" SELECT rtb_platform_id, SUM(e_cpm) FROM raw_logs.fact_request WHERE day = '2019-08-01' AND hour = '00' AND platform = 'US' AND request_status = '3' AND is_sampled = 1 GROUP BY rtb_platform_id """).show ``` However, from the Spark web UI, the stage description shows: ``` Listing leaf files and directories for 201616 paths: viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0, ... ``` It seems the job is reading all of the partitions of the table and the job takes too long for just one partition. One workaround is using `spark.read.parquet` API to read parquet files directly. Spark has partition-awareness for partitioned directories. But still, I would like to know if there is a way to leverage partition-awareness via Hive by using `spark.sql` API? Any help is highly appreciated! Thank you. -- Hao Ren
[Spark Streaming] map and window operation on DStream only process one batch
Spark Streaming v 1.6.2 Kafka v0.10.1 I am reading msgs from Kafka. What surprised me is the following DStream only process the first batch. KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set(topic)) .map(_._2) .window(Seconds(windowLengthInSec)) Some logs as below are endlessly repeated: 16/11/22 14:20:40 INFO MappedDStream: Slicing from 1479820835000 ms to 147982084 ms (aligned to 1479820835000 ms and 147982084 ms) 16/11/22 14:20:40 INFO JobScheduler: Added jobs for time 147982084 ms And the action on the DStream is just a rdd count windowedStream foreachRDD { rdd => rdd.count } >From the webUI, only the first batch is in status: Processing, the others are all Queued. However, if I permute map and window operation, everything is ok. KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set(topic)) .window(Seconds(windowLengthInSec)) .map(_._2) I think the two are equivalent. But they are not. Furthermore, if I replace my KafkaDStream with a QueueStream, it works for no matter which order of map and window operation. I am not sure whether this is related with KafkaDStream or just DStream. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
Yes, it is. You can define a udf like that. Basically, it's a udf Int => Int which is a closure contains a non serializable object. The latter should cause Task not serializable exception. Hao On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <bablo...@gmail.com> wrote: > Hello Hao Ren, > > Doesn't the code... > > val add = udf { > (a: Int) => a + notSer.value > } > Mean UDF function that Int => Int ? > > Thanks, > Muthu > > On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote: > >> I am playing with spark 2.0 >> What I tried to test is: >> >> Create a UDF in which there is a non serializable object. >> What I expected is when this UDF is called during materializing the >> dataFrame where the UDF is used in "select", an task non serializable >> exception should be thrown. >> It depends also which "action" is called on that dataframe. >> >> Here is the code for reproducing the pb: >> >> >> object DataFrameSerDeTest extends App { >> >> class A(val value: Int) // It is not serializable >> >> def run() = { >> val spark = SparkSession >> .builder() >> .appName("DataFrameSerDeTest") >> .master("local[*]") >> .getOrCreate() >> >> import org.apache.spark.sql.functions.udf >> import spark.sqlContext.implicits._ >> >> val notSer = new A(2) >> val add = udf { >> (a: Int) => a + notSer.value >> } >> val df = spark.createDataFrame(Seq( >> (1, 2), >> (2, 2), >> (3, 2), >> (4, 2) >> )).toDF("key", "value") >> .select($"key", add($"value").as("added")) >> >> df.show() // *It should not work because the udf contains a >> non-serializable object, but it works* >> >> df.filter($"key" === 2).show() // *It does not work as expected >> (org.apache.spark.SparkException: Task not serializable)* >> } >> >> run() >> } >> >> >> Also, I tried collect(), count(), first(), limit(). All of them worked >> without non-serializable exceptions. >> It seems only filter() throws the exception. (feature or bug ?) >> >> Any ideas ? Or I just messed things up ? >> Any help is highly appreciated. >> >> -- >> Hao Ren >> >> Data Engineer @ leboncoin >> >> Paris, France >> > > -- Hao Ren Data Engineer @ leboncoin Paris, France
[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
I am playing with spark 2.0 What I tried to test is: Create a UDF in which there is a non serializable object. What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. It depends also which "action" is called on that dataframe. Here is the code for reproducing the pb: object DataFrameSerDeTest extends App { class A(val value: Int) // It is not serializable def run() = { val spark = SparkSession .builder() .appName("DataFrameSerDeTest") .master("local[*]") .getOrCreate() import org.apache.spark.sql.functions.udf import spark.sqlContext.implicits._ val notSer = new A(2) val add = udf { (a: Int) => a + notSer.value } val df = spark.createDataFrame(Seq( (1, 2), (2, 2), (3, 2), (4, 2) )).toDF("key", "value") .select($"key", add($"value").as("added")) df.show() // *It should not work because the udf contains a non-serializable object, but it works* df.filter($"key" === 2).show() // *It does not work as expected (org.apache.spark.SparkException: Task not serializable)* } run() } Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions. It seems only filter() throws the exception. (feature or bug ?) Any ideas ? Or I just messed things up ? Any help is highly appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
[MLlib] Term Frequency in TF-IDF seems incorrect
When computing term frequency, we can use either HashTF or CountVectorizer feature extractors. However, both of them just use the number of times that a term appears in a document. It is not a true frequency. Acutally, it should be divided by the length of the document. Is this a wanted feature ? -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: SQLContext and HiveContext parse a query string differently ?
Basically, I want to run the following query: select 'a\'b', case(null as Array) However, neither HiveContext and SQLContext can execute it without exception. I have tried sql(select 'a\'b', case(null as Array)) and df.selectExpr("'a\'b'", "case(null as Array)") Neither of them works. >From the exceptions, I find the query is parsed differently. On Fri, May 13, 2016 at 8:00 AM, Yong Zhang <java8...@hotmail.com> wrote: > Not sure what do you mean? You want to have one exactly query running fine > in both sqlContext and HiveContext? The query parser are different, why do > you want to have this feature? Do I understand your question correctly? > > Yong > > -- > Date: Thu, 12 May 2016 13:09:34 +0200 > Subject: SQLContext and HiveContext parse a query string differently ? > From: inv...@gmail.com > To: user@spark.apache.org > > > HI, > > I just want to figure out why the two contexts behavior differently even > on a simple query. > In a netshell, I have a query in which there is a String containing single > quote and casting to Array/Map. > I have tried all the combination of diff type of sql context and query > call api (sql, df.select, df.selectExpr). > I can't find one rules all. > > Here is the code for reproducing the problem. > > - > > import org.apache.spark.sql.SQLContext > import org.apache.spark.sql.hive.HiveContext > import org.apache.spark.{SparkConf, SparkContext} > > object Test extends App { > > val sc = new SparkContext("local[2]", "test", new SparkConf) > val hiveContext = new HiveContext(sc) > val sqlContext = new SQLContext(sc) > > val context = hiveContext > // val context = sqlContext > > import context.implicits._ > > val df = Seq((Seq(1, 2), 2)).toDF("a", "b") > df.registerTempTable("tbl") > df.printSchema() > > // case 1 > context.sql("select cast(a as array) from tbl").show() > // HiveContext => org.apache.spark.sql.AnalysisException: cannot recognize > input near 'array' '<' 'string' in primitive type specification; line 1 pos 17 > // SQLContext => OK > > // case 2 > context.sql("select 'a\\'b'").show() > // HiveContext => OK > // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string > literal) found > > // case 3 > df.selectExpr("cast(a as array)").show() // OK with HiveContext and > SQLContext > > // case 4 > df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end > of input expected > } > > - > > Any clarification / workaround is high appreciated. > > -- > Hao Ren > > Data Engineer @ leboncoin > > Paris, France > -- Hao Ren Data Engineer @ leboncoin Paris, France
SQLContext and HiveContext parse a query string differently ?
HI, I just want to figure out why the two contexts behavior differently even on a simple query. In a netshell, I have a query in which there is a String containing single quote and casting to Array/Map. I have tried all the combination of diff type of sql context and query call api (sql, df.select, df.selectExpr). I can't find one rules all. Here is the code for reproducing the problem. - import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Test extends App { val sc = new SparkContext("local[2]", "test", new SparkConf) val hiveContext = new HiveContext(sc) val sqlContext = new SQLContext(sc) val context = hiveContext // val context = sqlContext import context.implicits._ val df = Seq((Seq(1, 2), 2)).toDF("a", "b") df.registerTempTable("tbl") df.printSchema() // case 1 context.sql("select cast(a as array) from tbl").show() // HiveContext => org.apache.spark.sql.AnalysisException: cannot recognize input near 'array' '<' 'string' in primitive type specification; line 1 pos 17 // SQLContext => OK // case 2 context.sql("select 'a\\'b'").show() // HiveContext => OK // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string literal) found // case 3 df.selectExpr("cast(a as array)").show() // OK with HiveContext and SQLContext // case 4 df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end of input expected } --------- Any clarification / workaround is high appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: Can not kill driver properly
Update: I am using --supervise flag for fault tolerance. On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren <inv...@gmail.com> wrote: > Using spark 1.6.1 > Spark Streaming Jobs are submitted via spark-submit (cluster mode) > > I tried to kill drivers via webUI, it does not work. These drivers are > still running. > I also tried: > 1. spark-submit --master --kill > 2. ./bin/spark-class org.apache.spark.deploy.Client kill > > > Neither works. The workaround is to ssh to the driver node, then kill -9 > ... > jsp shows the same classname DriverWrapper, so need to pick carefully... > > Any idea why this happens ? > BTW, my streaming job's batch duration is one hour. So do we need to wait > for job processing to kill kill driver ? > > -- > Hao Ren > > Data Engineer @ leboncoin > > Paris, France > -- Hao Ren Data Engineer @ leboncoin Paris, France
Can not kill driver properly
Using spark 1.6.1 Spark Streaming Jobs are submitted via spark-submit (cluster mode) I tried to kill drivers via webUI, it does not work. These drivers are still running. I also tried: 1. spark-submit --master --kill 2. ./bin/spark-class org.apache.spark.deploy.Client kill Neither works. The workaround is to ssh to the driver node, then kill -9 ... jsp shows the same classname DriverWrapper, so need to pick carefully... Any idea why this happens ? BTW, my streaming job's batch duration is one hour. So do we need to wait for job processing to kill kill driver ? -- Hao Ren Data Engineer @ leboncoin Paris, France
[Streaming] textFileStream has no events shown in web UI
Just a quick question, When using textFileStream, I did not see any events via web UI. Actually, I am uploading files to s3 every 5 seconds, And the mini-batch duration is 30 seconds. On web ui,: *Input Rate* Avg: 0.00 events/sec But the schedule time and processing time are correct, and the output of the steam is also correct. Not sure why web ui has not detected any events. Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: [Streaming] Difference between windowed stream and stream with large batch size?
Any ideas ? Feel free to ask me more details, if my questions are not clear. Thank you. On Mon, Mar 7, 2016 at 3:38 PM, Hao Ren <inv...@gmail.com> wrote: > I want to understand the advantage of using windowed stream. > > For example, > > Stream 1: > initial duration = 5 s, > and then transformed into a stream windowed by (*windowLength = *30s, > *slideInterval > = *30s) > > Stream 2: > Duration = 30 s > > Questions: > > 1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the > same result ? > 2. If yes, what is the advantage of one vs. another ? Performance or > something else ? > 3. Is a stream with large batch reasonable , say 30 mins or even an hour ? > > Thank you. > > -- > Hao Ren > > Data Engineer @ leboncoin > > Paris, France > -- Hao Ren Data Engineer @ leboncoin Paris, France
[Streaming] Difference between windowed stream and stream with large batch size?
I want to understand the advantage of using windowed stream. For example, Stream 1: initial duration = 5 s, and then transformed into a stream windowed by (*windowLength = *30s, *slideInterval = *30s) Stream 2: Duration = 30 s Questions: 1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the same result ? 2. If yes, what is the advantage of one vs. another ? Performance or something else ? 3. Is a stream with large batch reasonable , say 30 mins or even an hour ? Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France
Unresolved dep when building project with spark 1.6
Hi, I am upgrading my project to spark 1.6. It seems that the deps are broken. Deps used in sbt val scalaVersion = "2.10" val sparkVersion = "1.6.0" val hadoopVersion = "2.7.1" // Libraries val scalaTest = "org.scalatest" %% "scalatest" % "2.2.4" % "test" val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion val sparkML = "org.apache.spark" %% "spark-mllib" % sparkVersion val hadoopAWS = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion val scopt = "com.github.scopt" %% "scopt" % "3.3.0" val jodacvt = "org.joda" % "joda-convert" % "1.8.1" Sbt exception: [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.fusesource.leveldbjni#leveldbjni-all;1.8: org.fusesource.leveldbjni#leveldbjni-all;1.8!leveldbjni-all.pom(pom.original) origin location must be absolute: file:/home/invkrh/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.pom [warn] :: [warn] [warn] Note: Unresolved dependencies path: [warn] org.fusesource.leveldbjni:leveldbjni-all:1.8 [warn] +- org.apache.spark:spark-network-shuffle_2.10:1.6.0 [warn] +- org.apache.spark:spark-core_2.10:1.6.0 [warn] +- org.apache.spark:spark-catalyst_2.10:1.6.0 [warn] +- org.apache.spark:spark-sql_2.10:1.6.0 (/home/invkrh/workspace/scala/confucius/botdet/build.sbt#L14) [warn] +- org.apache.spark:spark-mllib_2.10:1.6.0 (/home/invkrh/workspace/scala/confucius/botdet/build.sbt#L14) [warn] +- fr.leboncoin:botdet_2.10:0.1 sbt.ResolveException: unresolved dependency: org.fusesource.leveldbjni#leveldbjni-all;1.8: org.fusesource.leveldbjni#leveldbjni-all;1.8!leveldbjni-all.pom(pom.original) origin location must be absolute: file:/home/invkrh/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.pom Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France
ClosureCleaner does not work for java code
It seems that, in order to serialize the anonymous class ` *test.AbstractTest$1*` (ops), it serialize `*test.AbstractTest*` first, which should not be serialized. The difference is on the type of RDD. In java code, JavaRDD is used. I am wondering whether the ClosureCleaner does not work well with JavaRDD. According to spark code, JavaRDD uses scala API apparently: def map[R](f: JFunction[T, R]): JavaRDD[R] = new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag) You can reproduce this issue easily, any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
How to distribute non-serializable object in transform task or broadcast ?
Is there any workaround to distribute non-serializable object for RDD transformation or broadcast variable ? Say I have an object of class C which is not serializable. Class C is in a jar package, I have no control on it. Now I need to distribute it either by rdd transformation or by broadcast. I tried to subclass the class C with Serializable interface. It works for serialization, but deserialization does not work, since there are no parameter-less constructor for the class C and deserialization is broken with an invalid constructor exception. I think it's a common use case. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
DataFrame writer removes fields which is null for all rows
Consider the following code: val df = Seq((1, 3), (2, 3)).toDF(key, value).registerTempTable(tbl) sqlContext.sql(select key, null as value from tbl) .write.format(json).mode(SaveMode.Overwrite).save(test) sqlContext.read.format(json).load(test).printSchema() It shows: root |-- key: long (nullable = true) The field `value` is removed from the schema when saving the DF to json file, since it is null for all rows. Saving to parquet file is the same. Null fields missed ! It seems that it's a default behavior for DF. But I would like to keep the null fields for schema consistency. Are there some options/configs to do for this purpose ? Thx. -- Hao Ren Data Engineer @ leboncoin Paris, France
S3 Read / Write makes executors deadlocked
Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: S3 Read / Write makes executors deadlocked
I have tested on another pc which has 8 CPU cores. But it hangs when defaultParallelismLevel 4, e.g. sparkConf.setMaster(local[*]) local[1] ~ local[3] work well. 4 is the mysterious boundary. It seems that I am not the only one encountered this problem: https://issues.apache.org/jira/browse/SPARK-8898 Here is Sean's answer for the jira above: this is a jets3t problem. You will have to manage to update it in your build or get EC2 + Hadoop 2 to work, which I think can be done. At least, this is just a subset of EC2 should support Hadoop 2 and/or that the EC2 support should move out of Spark anyway. I don't know there's another action to take in Spark. But I just use sbt the get the published spark 1.4, and it does not work on my local PC, not EC2. Seriously, I do think something should be done for Spark, because s3 read/write is quite a common use case. Any help on this issue is highly appreciated. If you need more info, checkout the jira I created: https://issues.apache.org/jira/browse/SPARK-8869 On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren inv...@gmail.com wrote: Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90
Re: spark-submit can not resolve spark-hive_2.10
Thanks for the reply. Actually, I don't think excluding spark-hive from spark-submit --packages is a good idea. I don't want to recompile spark by assembly for my cluster, every time a new spark release is out. I prefer using binary version of spark and then adding some jars for job execution. e.g. Add spark-hive for HiveContext usage. FYI, spark-hive is just 1.2MB: http://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10/1.4.0 On Wed, Jul 8, 2015 at 2:03 AM, Burak Yavuz brk...@gmail.com wrote: spark-hive is excluded when using --packages, because it can be included in the spark-assembly by adding -Phive during mvn package or sbt assembly. Best, Burak On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren inv...@gmail.com wrote: I want to add spark-hive as a dependence to submit my job, but it seems that spark-submit can not resolve it. $ ./bin/spark-submit \ → --packages org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1 \ → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \ → --master spark://localhost:7077 \ Ivy Default Cache set to: /home/invkrh/.ivy2/cache The jars for the packages stored in: /home/invkrh/.ivy2/jars https://repository.jboss.org/nexus/content/repositories/releases/ added as a remote repository with the name: repo-1 :: loading settings :: url = jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-hive_2.10 added as a dependency org.postgresql#postgresql added as a dependency joda-time#joda-time added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache found joda-time#joda-time;2.8.1 in central :: resolution report :: resolve 139ms :: artifacts dl 3ms :: modules in use: joda-time#joda-time;2.8.1 from central in [default] org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/6ms) Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveContext at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveContext at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 7 more Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 16:57:59 INFO Utils: Shutdown hook called Any help is appreciated. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Hao Ren Data Engineer @ leboncoin Paris, France
[SPARK-SQL] Window Functions optimization
Hi, I would like to know: Is there any optimization has been done for window functions in Spark SQL? For example. select key, max(value1) over(partition by key) as m1, max(value2) over(partition by key) as m2, max(value3) over(partition by key) as m3 from table The query above creates 3 fields based on the same partition rule. The question is: Will spark-sql partition the table 3 times in the same way to get the three max values ? or just partition once if it finds the partition rule is the same ? It would be nice if someone could point out some lines of code on it. Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Window-Functions-optimization-tp23796.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-submit can not resolve spark-hive_2.10
I want to add spark-hive as a dependence to submit my job, but it seems that spark-submit can not resolve it. $ ./bin/spark-submit \ → --packages org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1 \ → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \ → --master spark://localhost:7077 \ Ivy Default Cache set to: /home/invkrh/.ivy2/cache The jars for the packages stored in: /home/invkrh/.ivy2/jars https://repository.jboss.org/nexus/content/repositories/releases/ added as a remote repository with the name: repo-1 :: loading settings :: url = jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-hive_2.10 added as a dependency org.postgresql#postgresql added as a dependency joda-time#joda-time added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache found joda-time#joda-time;2.8.1 in central :: resolution report :: resolve 139ms :: artifacts dl 3ms :: modules in use: joda-time#joda-time;2.8.1 from central in [default] org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/6ms) Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveContext at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveContext at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 7 more Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 16:57:59 INFO Utils: Shutdown hook called Any help is appreciated. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SPARK-SQL] Re-use col alias in the select clause to avoid sub query
Hi, I want to re-use column alias in the select clause to avoid sub query. For example: select check(key) as b, abs(b) as abs, value1, value2, ..., value30 from test The query above does not work, because b is not defined in the test's schema. In stead, I should change the query to the following: select check(key) as b, abs(check(key)) as abs, value1, value2, ..., value30 from test Apparently, check function are called twice. In my use case, the check function is time-consuming. The workaround is to use sub-query : select b, abs(b), value1, value2, ..., value30 as abs from ( select check(key) as b, value1, value2, ..., value30 from test ) t The problem is that I have to repeat the 30 following column twice. Image the following case which does not work: select check(key) as b, abs(b) as absv, tan(absv) as tanv, value1, value2, ..., value30 from test In order not to call my check function many times, I need to change the query to 3 sub-queries, which makes query too long, hard to read. I am wondering whether we can reuse column alias in an efficient way ? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Re-use-col-alias-in-the-select-clause-to-avoid-sub-query-tp23645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext and JavaSparkContext
It seems that JavaSparkContext is just a wrapper of scala sparkContext. In JavaSparkContext, the scala one is used to do all the job. If I pass the same scala sparkContext to initialize JavaSparkContext, I still manipulate on the same sparkContext. Sry for spamming. Hao On Mon, Jun 29, 2015 at 11:15 AM, Hao Ren inv...@gmail.com wrote: Hi, I am working on legacy project using spark java code. I have a function which takes sqlContext as an argument, however, I need a JavaSparkContext in that function. It seems that sqlContext.sparkContext() return a scala sparkContext. I did not find any API for casting a scala sparkContext to a java one except : new JavaSparkContext(sqlContext.sparkContext()) I think it will create a new sparkContext. So there will be mutilple sparkContext during run time. According to some posts, there are some limitations on this. But I did not encounter that. Question: What is the best way to cast a scala sparkContext to a java one ? What problem will multiple sparkContext cause ? Thank you. =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-and-JavaSparkContext-tp23525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Hao Ren Data Engineer @ leboncoin Paris, France
SparkContext and JavaSparkContext
Hi, I am working on legacy project using spark java code. I have a function which takes sqlContext as an argument, however, I need a JavaSparkContext in that function. It seems that sqlContext.sparkContext() return a scala sparkContext. I did not find any API for casting a scala sparkContext to a java one except : new JavaSparkContext(sqlContext.sparkContext()) I think it will create a new sparkContext. So there will be mutilple sparkContext during run time. According to some posts, there are some limitations on this. But I did not encounter that. Question: What is the best way to cast a scala sparkContext to a java one ? What problem will multiple sparkContext cause ? Thank you. =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-and-JavaSparkContext-tp23525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map vs mapPartitions
It's not the number of executors that matters, but the # of the CPU cores of your cluster. Each partition will be loaded on a core for computing. e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24 partitions (24 tasks for narrow dependency). Then all the 24 partitions will be loaded to your cluster in parallel, one on each core. You may notice that some tasks will finish more quickly than others. So divide the RDD into (2~3) x (# of cores) for better pipeline performance. Say we have 72 partitions in your RDD, then initially 24 tasks run on 24 cores, then first done first served until all 72 tasks are processed. Back to your origin question, map and mapPartitions are both transformation, but on different granularity. map = apply the function on each record in each partition. mapPartitions = apply the function on each partition. But the rule is the same, one partition per core. Hope it helps. Hao On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora shushantaror...@gmail.com wrote: say source is HDFS,And file is divided in 10 partitions. so what will be input contains. public IterableInteger call(IteratorString input) say I have 10 executors in job each having single partition. will it have some part of partition or complete. And if some when I call input.next() - it will fetch rest or how is it handled ? On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote: No, or at least, it depends on how the source of the partitions was implemented. On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora shushantaror...@gmail.com wrote: Does mapPartitions keep complete partitions in memory of executor as iterable. JavaRDDString rdd = jsc.textFile(path); JavaRDDInteger output = rdd.mapPartitions(new FlatMapFunctionIteratorString, Integer() { public IterableInteger call(IteratorString input) throws Exception { ListInteger output = new ArrayListInteger(); while(input.hasNext()){ output.add(input.next().length()); } return output; } }); Here does input is present in memory and can contain complete partition of gbs ? Will this function call(IteratorString input) is called only for no of partitions(say if I have 10 in this example) times. Not no of lines times(say 1000) . And whats the use of mapPartitionsWithIndex ? Thanks -- Hao Ren Data Engineer @ leboncoin Paris, France
Fwd: map vs mapPartitions
-- Forwarded message -- From: Hao Ren inv...@gmail.com Date: Thu, Jun 25, 2015 at 7:03 PM Subject: Re: map vs mapPartitions To: Shushant Arora shushantaror...@gmail.com In fact, map and mapPartitions produce RDD of the same type: MapPartitionsRDD. Check RDD api source code below: def map[U: ClassTag](f: T = U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) = iter.map(cleanF)) } def mapPartitions[U: ClassTag]( f: Iterator[T] = Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) = cleanedF(iter), preservesPartitioning) } So, even map uses iterator ! For map, `iter.map(cleanF)` means when action is called, the passed function must be applied to all records in each partition. For mapPartitions, your function is applied on an iterator. No guarantee on that all records will be loaded in memory. For example, If the function just takes the first record, for example: rdd.mapPartitions(iter = Iterator.single(iter.next)), the iterator is not traversed. It really depends on your function. It gives you the control on partition level. Just that. The two APIs are for different purposes. The choice depends on your need. In the given example, your mapPartitions is doing the same thing as map = rdd.map(_.length). The performance is the same. On Thu, Jun 25, 2015 at 5:36 PM, Shushant Arora shushantaror...@gmail.com wrote: yes, 1 partition per core and mapPartitions apply function on each partition. Question is Does complete partition loads in memory so that function can be applied to it or its an iterator and iterator.next() loads next record and if yes then how is it efficient than map which also works on 1 record at a time. Is the only difference is -- only while loop as in below runs per record as in map . But code above that will be run once per partition. public IterableInteger call(IteratorString input) throws Exception { ListInteger output = new ArrayListInteger(); while(input.hasNext()){ output.add(input.next().length()); } so if I don't have any heavy code above while loop, performance will be same as of map function. On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote: It's not the number of executors that matters, but the # of the CPU cores of your cluster. Each partition will be loaded on a core for computing. e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24 partitions (24 tasks for narrow dependency). Then all the 24 partitions will be loaded to your cluster in parallel, one on each core. You may notice that some tasks will finish more quickly than others. So divide the RDD into (2~3) x (# of cores) for better pipeline performance. Say we have 72 partitions in your RDD, then initially 24 tasks run on 24 cores, then first done first served until all 72 tasks are processed. Back to your origin question, map and mapPartitions are both transformation, but on different granularity. map = apply the function on each record in each partition. mapPartitions = apply the function on each partition. But the rule is the same, one partition per core. Hope it helps. Hao On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora shushantaror...@gmail.com wrote: say source is HDFS,And file is divided in 10 partitions. so what will be input contains. public IterableInteger call(IteratorString input) say I have 10 executors in job each having single partition. will it have some part of partition or complete. And if some when I call input.next() - it will fetch rest or how is it handled ? On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote: No, or at least, it depends on how the source of the partitions was implemented. On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora shushantaror...@gmail.com wrote: Does mapPartitions keep complete partitions in memory of executor as iterable. JavaRDDString rdd = jsc.textFile(path); JavaRDDInteger output = rdd.mapPartitions(new FlatMapFunctionIteratorString, Integer() { public IterableInteger call(IteratorString input) throws Exception { ListInteger output = new ArrayListInteger(); while(input.hasNext()){ output.add(input.next().length()); } return output; } }); Here does input is present in memory and can contain complete partition of gbs ? Will this function call(IteratorString input) is called only for no of partitions(say if I have 10 in this example) times. Not no of lines times(say 1000) . And whats the use of mapPartitionsWithIndex ? Thanks -- Hao Ren Data Engineer @ leboncoin Paris, France -- Hao Ren Data Engineer @ leboncoin Paris, France -- Hao Ren Data Engineer @ leboncoin Paris, France
Big performance difference when joining 3 tables in different order
Hi, I encountered a performance issue when join 3 tables in sparkSQL. Here is the query: SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt FROM t_category c, t_zipcode z, click_meter_site_grouped g WHERE c.refCategoryID = g.category AND z.regionCode = g.region I need to pay a lot of attention to the table order in FROM clause, if not, some order makes the driver broken, some order makes the job extremely slow, only one order makes the job finished quickly. For the slow one, I noticed a table is loaded 56 times !!! from its CSV file. I would like to know more about join implement in SparkSQL the understand the issue (auto broadcast, etc). For ones want to know more about the details, here is the jira: https://issues.apache.org/jira/browse/SPARK-8102 Any help is welcome. =) Thx Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Big-performance-difference-when-joining-3-tables-in-different-order-tp23150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Hi, Just a quick question, Regarding the source code of groupByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453 In the end, it cast CompactBuffer to Iterable. But why ? Any advantage? Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SQL can't not create Hive database
Hi, I am working on the local mode. The following code hiveContext.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse) hiveContext.sql(create database if not exists db1) throws 15/04/09 13:53:16 ERROR RetryingHMSHandler: MetaException(message:Unable to create database path file:/user/hive/warehouse/db1.db, failed to create database db1) It seems that it uses hdfs path, not the local one specified in hiveContext. Any idea ? Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-can-t-not-create-Hive-database-tp22435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The differentce between SparkSql/DataFram join and Rdd join
) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at akka.actor.ActorSystemImpl$TerminationCallbacks.ready(ActorSystem.scala:817) at akka.actor.ActorSystemImpl$TerminationCallbacks.ready(ActorSystem.scala:786) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:86) at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:642) at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:643) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:531) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) VM Thread prio=10 tid=0x7f149407d000 nid=0xe75 runnable GC task thread#0 (ParallelGC) prio=10 tid=0x7f149401f000 nid=0xe6d runnable GC task thread#1 (ParallelGC) prio=10 tid=0x7f1494021000 nid=0xe6e runnable GC task thread#2 (ParallelGC) prio=10 tid=0x7f1494022800 nid=0xe6f runnable GC task thread#3 (ParallelGC) prio=10 tid=0x7f1494024800 nid=0xe70 runnable GC task thread#4 (ParallelGC) prio=10 tid=0x7f1494026800 nid=0xe71 runnable GC task thread#5 (ParallelGC) prio=10 tid=0x7f1494028000 nid=0xe72 runnable GC task thread#6 (ParallelGC) prio=10 tid=0x7f149402a000 nid=0xe73 runnable GC task thread#7 (ParallelGC) prio=10 tid=0x7f149402c000 nid=0xe74 runnable VM Periodic Task Thread prio=10 tid=0x7f14940c2800 nid=0xe7c waiting on condition JNI global references: 230 Tell me if anything else is needed. Thank you. Hao. On Tue, Apr 7, 2015 at 8:00 PM, Michael Armbrust mich...@databricks.com wrote: The joins here are totally different implementations, but it is worrisome that you are seeing the SQL join hanging. Can you provide more information about the hang? jstack of the driver and a worker that is processing a task would be very useful. On Tue, Apr 7, 2015 at 8:33 AM, Hao Ren inv...@gmail.com wrote: Hi, We have 2 hive tables and want to join one with the other. Initially, we ran a sql request on HiveContext. But it did not work. It was blocked on 30/600 tasks. Then we tried to load tables into two DataFrames, we have encountered the same problem. Finally, it works with RDD.join. What we have done is basically transforming 2 tables into 2 pair RDDs, then calling a join operation. It works great in about 500 s. However, workaround is just a workaround, since we have to transform hive tables into RDD. This is really annoying. Just wondering whether the underlying code of DF/SQL's join operation is the same as rdd's, knowing that there is a syntax analysis layer for DF/SQL, while RDD's join is straightforward on two pair RDDs. SQL request: -- select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount from table1 as v1 left join table2 as v2 on v1.receipt_id = v2.receipt_id where v1.sku != DataFrame: - val rdd1 = ss.hiveContext.table(table1) val rdd1Filt = rdd1.filter(rdd1.col(sku) !== ) val rdd2 = ss.hiveContext.table(table2) val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) === rdd2(receipt_id)) rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite) RDD workaround in this case is a bit cumbersome, for short, we just created 2 RDDs, join, and then apply a new schema on the result RDD. This approach works, at least all tasks were finished, while the DF/SQL approach don't. Any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
The differentce between SparkSql/DataFram join and Rdd join
Hi, We have 2 hive tables and want to join one with the other. Initially, we ran a sql request on HiveContext. But it did not work. It was blocked on 30/600 tasks. Then we tried to load tables into two DataFrames, we have encountered the same problem. Finally, it works with RDD.join. What we have done is basically transforming 2 tables into 2 pair RDDs, then calling a join operation. It works great in about 500 s. However, workaround is just a workaround, since we have to transform hive tables into RDD. This is really annoying. Just wondering whether the underlying code of DF/SQL's join operation is the same as rdd's, knowing that there is a syntax analysis layer for DF/SQL, while RDD's join is straightforward on two pair RDDs. SQL request: -- select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount from table1 as v1 left join table2 as v2 on v1.receipt_id = v2.receipt_id where v1.sku != DataFrame: - val rdd1 = ss.hiveContext.table(table1) val rdd1Filt = rdd1.filter(rdd1.col(sku) !== ) val rdd2 = ss.hiveContext.table(table2) val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) === rdd2(receipt_id)) rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite) RDD workaround in this case is a bit cumbersome, for short, we just created 2 RDDs, join, and then apply a new schema on the result RDD. This approach works, at least all tasks were finished, while the DF/SQL approach don't. Any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext setConf seems not stable
Hi, Jira created: https://issues.apache.org/jira/browse/SPARK-6675 Thank you. On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA please? On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote: Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) *hc.setConf(spark.sql.shuffle.partitions, 10)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... *hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(spark.sql.shuffle.partitions, 10)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) *You can see that I just permuted the two setConf call, then that leads to two different Hive configuration.* *It seems that HiveContext can not set a new value on hive.metastore.warehouse.dir key in one or the first setConf call.* *You need another setConf call before changing hive.metastore.warehouse.dir. For example, set hive.metastore.warehouse.dir twice and the snippet 1* snippet 3: ... * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
HiveContext setConf seems not stable
Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) *hc.setConf(spark.sql.shuffle.partitions, 10)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... *hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(spark.sql.shuffle.partitions, 10)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) *You can see that I just permuted the two setConf call, then that leads to two different Hive configuration.* *It seems that HiveContext can not set a new value on hive.metastore.warehouse.dir key in one or the first setConf call.* *You need another setConf call before changing hive.metastore.warehouse.dir. For example, set hive.metastore.warehouse.dir twice and the snippet 1* snippet 3: ... * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
Broadcast variable questions
Hi, Spark 1.2.0, standalone, local mode(for test) Here are several questions on broadcast variable: 1) Where is the broadcast variable cached on executors ? In memory or On disk ? I read somewhere, it was said these variables are stored in spark.local.dir. But I can find any info in Spark 1.2 document. I encountered a problem with broadcast variables. I have a loop in which a broadcast variable is created, after 3 iteration, the used memory increased quickly until the full size, and Spark is blocked, no error message, no exception, just blocked. I would like to make sure whether it is caused by too many broadcast variables, because I did not call unpersist() on each broadcast variable. 2) I find that broadcast variable has destroy() and unpersist() method, what's the difference between them? If a broadcast variable is destroyed, is it removed from where it is stored ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variable-questions-tp21292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SchemaRDD.sample problem
update: t1 is good. After collecting on t1, I find that all row is ok (is_new = 0) Just after sampling, there are some rows where is_new = 1 which should have been filtered by Where clause. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741p20833.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SchemaRDD.sample problem
Hi, I am using SparkSQL on 1.2.1 branch. The problem comes froms the following 4-line code: *val t1: SchemaRDD = hiveContext hql select * from product where is_new = 0 val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05) tb1.registerTempTable(t1_tmp) (hiveContext sql select count(*) from t1_tmp where is_new = 1) collect foreach println* We know that *t1* contains only rows whose is_new field is zero. After sampling t1 by taking 5% rows, normally, the sampled table should always contains only rows where is_new = 0. However, line 4 gives a number about 5 by chance. That means there are some rows where is_new = 1 in the sampled table, which is not logically possible. I am not sure SchemaRDD.sample is doing his work well. Any idea ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL 1.2.1-snapshot Left Join problem
Hi, When running SparkSQL branch 1.2.1 on EC2 standalone cluster, the following query does not work: create table debug as select v1.* from t1 as v1 left join t2 as v2 on v1.sku = v2.sku where v2.sku is null Both t1 and t2 have 200 partitions. t1 has 10k rows, and t2 has 4k rows. this query block at: 14/12/17 15:56:54 INFO TaskSetManager: Finished task 133.0 in stage 2.0 (TID 541) in 370 ms on ip-10-79-184-49.ec2.internal (122/200) Via WebUI, I can see there are 24 tasks running, as the cluster has 24 core. The other tasks are succeeded. It seems that the 24 tasks are blocked and won't end. However, SparkSQL 1.1.0 works fine. There might be some problems with join on 1.2.1 Any idea? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-2-1-snapshot-Left-Join-problem-tp20748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
registerTempTable: Table not found
Hi, I am using Spark SQL on 1.2.1-snapshot. Here is problem I encountered. Bacially, I want to save a schemaRDD to HiveContext val scm = StructType( Seq( StructField(name, StringType, nullable = false), StructField(cnt, IntegerType, nullable = false) )) val schRdd = hiveContext.applySchema(ranked, scm) // ranked above is RDD[Row] whose row contains 2 fields schRdd.registerTempTable(schRdd) hiveContext sql select count(name) from schRdd limit 20 // = ok hiveContext sql create table t as select * from schRdd // = table not found A query like select works well and gives the correct answer, but when I try to save the temple table into Hive Context by createTableAsSelect, it does not work. *Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:32 Table not found 'schRdd' at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1243)* I thought that was caused by registerTempTable, so I replace it by saveAsTable. It does not work neither. *Exception in thread main java.lang.AssertionError: assertion failed: No plan for CreateTableAsSelect Some(sephcn), schRdd, false, None LogicalRDD [name#6,cnt#7], MappedRDD[3] at map at Creation.scala:70 at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)* I also checked source code of QueryPlanner: def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator assert(iter.hasNext, sNo plan for $plan) iter } The comment shows that there are some works to do with it. Any help is appreciated. Thx. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: registerTempTable: Table not found
Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592p20594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
scala.MatchError on SparkSQL when creating ArrayType of StructType
Hi, I am using SparkSQL on 1.1.0 branch. The following code leads to a scala.MatchError at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) val scm = StructType(inputRDD.schema.fields.init :+ StructField(list, ArrayType( StructType( Seq(StructField(date, StringType, nullable = false), StructField(nbPurchase, IntegerType, nullable = false, nullable = false)) // purchaseRDD is RDD[sql.ROW] whose schema is corresponding to scm. It is transformed from inputRDD val schemaRDD = hiveContext.applySchema(purchaseRDD, scm) schemaRDD.registerTempTable(t_purchase) Here's the stackTrace: scala.MatchError: ArrayType(StructType(List(StructField(date,StringType, true ), StructField(n_reachat,IntegerType, true ))),true) (of class org.apache.spark.sql.catalyst.types.ArrayType) at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The strange thing is that nullable of date and nbPurchase field are set to true while it were false in the code. If I set both to true, it works. But, in fact, they should not be nullable. Here's what I find at Cast.scala:247 on 1.1.0 branch private[this] lazy val cast: Any = Any = dataType match { case StringType = castToString case BinaryType = castToBinary case DecimalType = castToDecimal case TimestampType = castToTimestamp case BooleanType = castToBoolean case ByteType = castToByte case ShortType = castToShort case IntegerType = castToInt case FloatType = castToFloat case LongType = castToLong case DoubleType = castToDouble } Any idea? Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp20459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 cluster with SSD ebs
Hi, I found that the ec2 script has been improved a lot. And the option ebs-vol-type is added to specify ebs type. However, it seems that the option does not work, the cmd I used is the following: $SPARK_HOME/ec2/spark-ec2 -k sparkcv -i spark.pem -m r3.4xlarge -s 3 -t r3.2xlarge --ebs-vol-type=gp2 --ebs-vol-size=200 --copy-aws-credentials launch spark-cluster When checking AWS EC2 console, I find 'standard' as the volume type. Any idea ? Thank you. =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-with-SSD-ebs-tp19474p19642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why is ALS class serializable ?
It makes sense. Thx. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is ALS class serializable ?
Hi, When reading through ALS code, I find that: class ALS private ( private var numUserBlocks: Int, private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, private var implicitPrefs: Boolean, private var alpha: Double, private var seed: Long = System.nanoTime() ) extends *Serializable *with Logging and why should ALS extend Serializable ? if not, there will be an Exception: task is not serializable, ALS is not serializable. I did not find any closure functions in which ALS is referenced. Any idea ? Thx. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding spark operation pipeline and block storage
Anyone has idea on this ? Thx -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Spark with hive does not work
nvm, it would be better if correctness of flags could be checked by sbt during building. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Building Spark with hive does not work
Hi, I am building spark on the most recent master branch. I checked this page: https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md The cmd *./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly* works fine. A fat jar is created. However, when I started the SQL-CLI, I encountered an exception: Spark assembly has been built with Hive, including Datanucleus jars on classpath java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:337) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. *You need to build Spark with -Phive and -Phive-thriftserver.* Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties It's suggested to do with -Phive and -Phive-thriftserver, which is actually what I have done. Any idea ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Spark with hive does not work
Sry for spamming, Just after my previous post, I noticed that the command used is: ./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly thriftserver* the typo error is the evil. Stupid, me. I believe I just copy-pasted from somewhere else, but no even checked it, meanwhile no error msg, such as no such option, is displayed, which makes me consider the flags are correct. Sry for the carelessness. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding spark operation pipeline and block storage
Hey, guys Feel free to ask for more details if my questions are not clear. Any insight here ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p18496.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Understanding spark operation pipeline and block storage
Hi, I would like to understand the pipeline of spark's operation(transformation and action) and some details on block storage. Let's consider the following code: val rdd1 = SparkContext.textFile(hdfs://...) rdd1.map(func1).map(func2).count For example, we have a file in hdfs about 80Gb, already split in 32 files, each 2.5Gb. q1) How many partitions will rdd1 have ? rule 1) Maybe 32, since there are 32 split files ? Because, most of the case, this rule is true if the file is not big in size. rule 2) Maybe more, I am not sure whether spark's block store can contain a 2.5Gb partition. Is there some parameter specify the block store size ? AFAIK, hdfs block size is used to read data from hdfs by spark. So there will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many ? Which criterion will it take ? the number of split files or hdfs block size. q2) Here, func1 and func2 are sequentially added into DAG. What's the workflow on the partition level ? option1: Given a partition, func1 and func2 will be applied to each element in this partition sequentially. After everything is done, we count the # of line in the partition and send count result to drive. Then, we take the next partition and do the same thing? option2: Or else, we apply func1 to all the partitions first, then apply func2 to all partitions which have applied func1, count # of line in each partition and send result to driver ? I have do some tests, it seems that option1 is correct. Can anyone confirm this ? So in option 1, we have 1 job count which contains 3 stages: map(func1), map(func2), count. q3) What if we run out of memory ? Suppose we have 12 cores, 15Gb memory in cluster. Case1 : For example, the func1 will take one line in file, and create an big object for each line, then the partition applied func1 will become a large partition. If we have 12 cores in clusters, that means we may have 12 large partitions in memory. What if these partitions are much bigger than memory ? What will happen ? an exception OOM / heap size, etc ? Case2 : Suppose the input is 80 GB, but we force RDD to be repartitioned into 6 partitions which is small than the number of core. Normally, each partition will be send to a core, then all the input will be in memory. However, we have 15G memory in Cluster. What will happen ? OOM Exception ? Then, could we just split the RDD into more partitions so that 80GB / #partition *12(which is # of cores) 15Gb(memory size) ? Meanwhile, we can not split too many, which leads to some overhead on task distribution. If we read data from hdfs using hdfs block size 64MB as partition size, we will have a formula like: 64Mb * # of cores Memory which in most case is true. Could this explain why we reading hdfs using block size will not leads to OOM like case 2, even if the data is very big in size. Sorry for making this post a bit long. Hope I make myself clear. Any help on any question will be appreciated. Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL: set hive.metastore.warehouse.dir in CLI doesn't work
Hi, The following query in sparkSQL 1.1.0 CLI doesn't work. *SET hive.metastore.warehouse.dir=/home/spark/hive/warehouse ; create table test as select v1.*, v2.card_type, v2.card_upgrade_time_black, v2.card_upgrade_time_gold from customer v1 left join customer_loyalty v2 on v1.account_id = v2.account_id limit 5 ;* StackTrack = org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(*message:file:/user/hive/warehouse/test* is not a directory or unable to create one) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:602) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:559) at org.apache.spark.sql.hive.HiveMetastoreCatalog.createTable(HiveMetastoreCatalog.scala:99) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:116) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$.apply(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: MetaException(message:file:/user/hive/warehouse/test is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1060) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1107) at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) at com.sun.proxy.$Proxy15.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:482) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:471) at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy16.createTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:596) ... 30 more It seems that CLI doesn't take the hive.metastore.warehouse.dir value when creating table with as select If just create the table, like create table t (...), and then load
Re: SparkSQL: select syntax
Update: This syntax is mainly for avoiding retyping column names. Let's take the example in my previous post, where *a* is a table of 15 columns, *b* has 5 columns, after a join, I have a table of (15 + 5 - 1(key in b)) = 19 columns and register the table in sqlContext. I don't want to actually retype all the 19 columns' name when querying with select. This feature exists in hive. But in SparkSql, it gives an exception. Any ideas ? Thx Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-tp16299p16364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL: select syntax
Thank you, Gen. I will give hiveContext a try. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-tp16299p16368.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org