RE: Could Spark batch processing live within Spark Streaming?
Hi Raj, What you need seems to be an event based initiation of a DStream. Have not seen one yet. There are many types of DStreams that Spark implements. You can also implement your own. InputDStream is a close match for your requirement. See this for the available options with InputDStream: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html You could use a QueueInputDStream. It will wait for a message to arrive at a configured Queue before it can start processing. Another requirements could be to send an even from one SparkContext(your batch context) to another(your streaming context). I have not seen a build in API based solution for this. Broadcast variables(https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables) are tied to the sc which created it. There are a couple of adhoc approaches I have seen: http://mail-archives.apache.org/mod_mbox/spark-dev/201410.mbox/%3ccadtdqqk-wv9xdeeb-qh3biltjf0a46a5kuqmt2njzwvwuhg...@mail.gmail.com%3E Prajod From: diplomatic Guru [mailto:diplomaticg...@gmail.com] Sent: 11 June 2015 18:55 To: user@spark.apache.org Subject: Could Spark batch processing live within Spark Streaming? Hello all, I was wondering if it is possible to have a high latency batch processing job coexists within Spark Streaming job? If it's possible then could we share the state of the batch job with the Spark Streaming job? For example when the long-running batch computation is complete, could we inform that Spark streaming that batch job is complete? Kind regards, Raj The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: Reading Really Big File Stream from HDFS
Using sc.textFile will also read the file from HDFS one by one line through iterator, don't need to fit all into memory, even you have small size of memory, it still can be worked. 2015-06-12 13:19 GMT+08:00 SLiZn Liu sliznmail...@gmail.com: Hmm, you have a good point. So should I load the file by `sc.textFile()` and specify a high number of partitions, and the file is then split into partitions in memory across the cluster? On Thu, Jun 11, 2015 at 9:27 PM ayan guha guha.a...@gmail.com wrote: Why do you need to use stream in this use case? 50g need not to be in memory. Give it a try with high number of partitions. On 11 Jun 2015 23:09, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users, I'm trying to load a literally big file (50GB when compressed as gzip file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as this file cannot be fitted in my memory. However, it looks like no RDD will be received until I copy this big file to a prior-specified location on HDFS. Ideally, I'd like read this file by a small number of lines at a time, but receiving a file stream requires additional writing to HDFS. Any idea to achieve this? BEST REGARDS, Todd Leo
Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Thanks for the extra details and explanations Chaocai, will try to reproduce this when I got chance. Cheng On 6/12/15 3:44 PM, 姜超才 wrote: I said OOM occurred on slave node, because I monitored memory utilization during the query task, on driver, very few memory was ocupied. And i remember i have ever seen the OOM stderr log on slave node. But recently there seems no OOM log on slave node. Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on cluster mode. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian l...@databricks.com *收件人:* 姜超才 jiangchao...@haiyisoft.com, Hester wang hester9...@gmail.com, user@spark.apache.org *主题:* Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/12 15:30:08 (Fri) Hi Chaocai, Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment saying The OOM or lose heartbeat was occurred on slave node. Because from the log files you attached at first, those OOM actually happens on driver side (Thrift server log only contains log lines from driver side). Did you see OOM from executor stderr output? I ask this because there are still a large portion of users are using 1.3, and we may want deliver a fix if there does exist bugs that causes unexpected OOM. Cheng On 6/12/15 3:14 PM, 姜超才 wrote: Hi Lian, Today I update my spark to v1.4. This issue resolved. Thanks, SuperJ - 原始邮件信息 - *发件人:* 姜超才 *收件人:* Cheng Lian , Hester wang , *主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/11 08:56:28 (Thu) No problem on Local mode. I can get all rows. Select * from foo; The OOM or lose heartbeat was occured on slave node. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* 姜超才 , Hester wang , *主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 19:58:59 (Wed) Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar --conf spark.sql.thriftServer.incrementalCollect=true Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* 姜超才 , Hester wang , *主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:37:34 (Wed) Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* Hester wang , *主题:* Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:15:47 (Wed) Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side
RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Not sure if Spark RDD will provide API to fetch the record one by one from the final result set, instead of the pulling them all / (or whole partition data) and fit in the driver memory. Seems a big change. From: Cheng Lian [mailto:l...@databricks.com] Sent: Friday, June 12, 2015 3:51 PM To: 姜超才; Hester wang; user@spark.apache.org Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. Thanks for the extra details and explanations Chaocai, will try to reproduce this when I got chance. Cheng On 6/12/15 3:44 PM, 姜超才 wrote: I said OOM occurred on slave node, because I monitored memory utilization during the query task, on driver, very few memory was ocupied. And i remember i have ever seen the OOM stderr log on slave node. But recently there seems no OOM log on slave node. Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on cluster mode. Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com 收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, Hester wang hester9...@gmail.commailto:hester9...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org 主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/12 15:30:08 (Fri) Hi Chaocai, Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment saying The OOM or lose heartbeat was occurred on slave node. Because from the log files you attached at first, those OOM actually happens on driver side (Thrift server log only contains log lines from driver side). Did you see OOM from executor stderr output? I ask this because there are still a large portion of users are using 1.3, and we may want deliver a fix if there does exist bugs that causes unexpected OOM. Cheng On 6/12/15 3:14 PM, 姜超才 wrote: Hi Lian, Today I update my spark to v1.4. This issue resolved. Thanks, SuperJ - 原始邮件信息 - 发件人: 姜超才 收件人: Cheng Lian , Hester wang , 主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/11 08:56:28 (Thu) No problem on Local mode. I can get all rows. Select * from foo; The OOM or lose heartbeat was occured on slave node. Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian 收件人: 姜超才 , Hester wang , 主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/10 19:58:59 (Wed) Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar --conf spark.sql.thriftServer.incrementalCollect=true Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian 收件人: 姜超才 , Hester wang , 主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/10 16:37:34 (Wed) Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - 发件人:
Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
My guess the reason why local mode is OK while standalone cluster doesn't work is that in cluster mode, task results are serialized and sent to driver side. Driver need to deserialize the result, and thus occupies much more memory then local mode (where task result de/serialization is not performed). Cheng On 6/12/15 4:17 PM, Cheng, Hao wrote: Not sure if Spark Core will provide API to fetch the record one by one from the block manager, instead of the pulling them all into the driver memory. *From:*Cheng Lian [mailto:l...@databricks.com] *Sent:* Friday, June 12, 2015 3:51 PM *To:* 姜超才; Hester wang; user@spark.apache.org *Subject:* Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. Thanks for the extra details and explanations Chaocai, will try to reproduce this when I got chance. Cheng On 6/12/15 3:44 PM, 姜超才 wrote: I said OOM occurred on slave node, because I monitored memory utilization during the query task, on driver, very few memory was ocupied. And i remember i have ever seen the OOM stderr log on slave node. But recently there seems no OOM log on slave node. Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on cluster mode. Thanks, SuperJ - 原始邮件信息 - *发件人**:* Cheng Lian l...@databricks.com mailto:l...@databricks.com *收件人**:* 姜超才 jiangchao...@haiyisoft.com mailto:jiangchao...@haiyisoft.com, Hester wang hester9...@gmail.com mailto:hester9...@gmail.com, user@spark.apache.org mailto:user@spark.apache.org *主题**:* Re: 回复: Re: 回 复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期**:* 2015/06/12 15:30:08 (Fri) Hi Chaocai, Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment saying The OOM or lose heartbeat was occurred on slave node. Because from the log files you attached at first, those OOM actually happens on driver side (Thrift server log only contains log lines from driver side). Did you see OOM from executor stderr output? I ask this because there are still a large portion of users are using 1.3, and we may want deliver a fix if there does exist bugs that causes unexpected OOM. Cheng On 6/12/15 3:14 PM, 姜超才 wrote: Hi Lian, Today I update my spark to v1.4. This issue resolved. Thanks, SuperJ - 原始邮件信 息 - *发件人**:* 姜超才 *收件人**:* Cheng Lian , Hester wang , *主题**:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期**:* 2015/06/11 08:56:28 (Thu) No problem on Local mode. I can get all rows. Select * from foo; The OOM or lose heartbeat was occured on slave node. Thanks, SuperJ - 原始邮件信 息 - *发件人**:* Cheng Lian *收件人**:* 姜超才 , Hester wang , *主题**:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期**:* 2015/06/10 19:58:59 (Wed) Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512
Re: Spark Streaming reads from stdin or output from command line utility
Is it a lot of data that is expected to come through stdin? I mean is it even worth parallelizing the computation using something like Spark Streaming? On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote: Thanks for your reply! In my use case, it would be stream from only one stdin. Also I'm working with Scala. It would be great if you could talk about multi stdin case as well! Thanks. From: Tathagata Das t...@databricks.com Date: Thursday, June 11, 2015 at 8:11 PM To: Heath Guo heath...@fb.com Cc: user user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Are you going to receive data from one stdin from one machine, or many stdins on many machines? On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote: Hi, I'm new to Spark Streaming, and I want to create a application where Spark Streaming could create DStream from stdin. Basically I have a command line utility that generates stream data, and I'd like to pipe data into DStream. What's the best way to do that? I thought rdd.pipe() could help, but it seems that requires an rdd in the first place, which does not apply. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Hi Chaocai, Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment saying The OOM or lose heartbeat was occurred on slave node. Because from the log files you attached at first, those OOM actually happens on driver side (Thrift server log only contains log lines from driver side). Did you see OOM from executor stderr output? I ask this because there are still a large portion of users are using 1.3, and we may want deliver a fix if there does exist bugs that causes unexpected OOM. Cheng On 6/12/15 3:14 PM, 姜超才 wrote: Hi Lian, Today I update my spark to v1.4. This issue resolved. Thanks, SuperJ - 原始邮件信息 - *发件人:* 姜超才 jiangchao...@haiyisoft.com *收件人:* Cheng Lian l...@databricks.com, Hester wang hester9...@gmail.com, user@spark.apache.org *主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/11 08:56:28 (Thu) No problem on Local mode. I can get all rows. Select * from foo; The OOM or lose heartbeat was occured on slave node. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* 姜超才 , Hester wang , *主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 19:58:59 (Wed) Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar --conf spark.sql.thriftServer.incrementalCollect=true Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* 姜超才 , Hester wang , *主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:37:34 (Wed) Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* Hester wang , *主题:* Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:15:47 (Wed) Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side rather than executor side, so tuning executor memory size doesn't help. Cheng On 6/10/15 3:46 PM, Hester wang wrote: Hi Lian, I met a SparkSQL problem. I really appreciate it if you could give me some help! Below is the detailed description of the problem, for more information, attached are the original code and the log that you may need. Problem: I want to query my table which stored in Hive through the SparkSQL JDBC interface. And want to fetch more than 1,000,000 rows. But met OOM. sql = select * from TEMP_ADMIN_150601_01 limit XXX ; My Env: 5 Nodes = One master + 4 workers, 1000M Network Switch , Redhat 6.5 Each node: 8G RAM, 500G Harddisk Java 1.6, Scala 2.10.4, Hadoop
RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Not sure if Spark Core will provide API to fetch the record one by one from the block manager, instead of the pulling them all into the driver memory. From: Cheng Lian [mailto:l...@databricks.com] Sent: Friday, June 12, 2015 3:51 PM To: 姜超才; Hester wang; user@spark.apache.org Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. Thanks for the extra details and explanations Chaocai, will try to reproduce this when I got chance. Cheng On 6/12/15 3:44 PM, 姜超才 wrote: I said OOM occurred on slave node, because I monitored memory utilization during the query task, on driver, very few memory was ocupied. And i remember i have ever seen the OOM stderr log on slave node. But recently there seems no OOM log on slave node. Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on cluster mode. Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com 收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, Hester wang hester9...@gmail.commailto:hester9...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org 主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/12 15:30:08 (Fri) Hi Chaocai, Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment saying The OOM or lose heartbeat was occurred on slave node. Because from the log files you attached at first, those OOM actually happens on driver side (Thrift server log only contains log lines from driver side). Did you see OOM from executor stderr output? I ask this because there are still a large portion of users are using 1.3, and we may want deliver a fix if there does exist bugs that causes unexpected OOM. Cheng On 6/12/15 3:14 PM, 姜超才 wrote: Hi Lian, Today I update my spark to v1.4. This issue resolved. Thanks, SuperJ - 原始邮件信息 - 发件人: 姜超才 收件人: Cheng Lian , Hester wang , 主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/11 08:56:28 (Thu) No problem on Local mode. I can get all rows. Select * from foo; The OOM or lose heartbeat was occured on slave node. Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian 收件人: 姜超才 , Hester wang , 主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/10 19:58:59 (Wed) Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar --conf spark.sql.thriftServer.incrementalCollect=true Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian 收件人: 姜超才 , Hester wang , 主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. 日期: 2015/06/10 16:37:34 (Wed) Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - 发件人: Cheng Lian 收件人: Hester wang , 主题: Re: Met OOM when fetching
Re: Spark 1.4 release date
It was released yesterday. On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote: Hi When is official spark 1.4 release date? Best Ayan
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com: Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com wrote: OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
If not stop StreamingContext gracefully, will checkpoint data be consistent?
This is a quick question about Checkpoint. The question is: if the StreamingContext is not stopped gracefully, will the checkpoint be consistent? Or I should always gracefully shutdown the application even in order to use the checkpoint? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.4 release date
Hi When is official spark 1.4 release date? Best Ayan
Re: Spark Streaming reads from stdin or output from command line utility
Yes, it is lots of data, and the utility I'm working with prints out infinite real time data stream. Thanks. From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Thursday, June 11, 2015 at 11:43 PM To: Heath Guo heath...@fb.commailto:heath...@fb.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Is it a lot of data that is expected to come through stdin? I mean is it even worth parallelizing the computation using something like Spark Streaming? On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.commailto:heath...@fb.com wrote: Thanks for your reply! In my use case, it would be stream from only one stdin. Also I'm working with Scala. It would be great if you could talk about multi stdin case as well! Thanks. From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Thursday, June 11, 2015 at 8:11 PM To: Heath Guo heath...@fb.commailto:heath...@fb.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Are you going to receive data from one stdin from one machine, or many stdins on many machines? On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.commailto:heath...@fb.com wrote: Hi, I'm new to Spark Streaming, and I want to create a application where Spark Streaming could create DStream from stdin. Basically I have a command line utility that generates stream data, and I'd like to pipe data into DStream. What's the best way to do that? I thought rdd.pipe() could help, but it seems that requires an rdd in the first place, which does not apply. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlhttps://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Spark Streaming reads from stdin or output from command line utility
Would using the socketTextStream and `yourApp | nc -lk port` work?? Not sure how resilient the socket receiver is though. I've been playing with it for a little demo and I don't understand yet its reconnection behavior. Although I would think that putting some elastic buffer in between would be a good idea to decouple producer from consumer. Kafka would be my first choice. -kr, Gerard. On Fri, Jun 12, 2015 at 8:46 AM, Heath Guo heath...@fb.com wrote: Yes, it is lots of data, and the utility I'm working with prints out infinite real time data stream. Thanks. From: Tathagata Das t...@databricks.com Date: Thursday, June 11, 2015 at 11:43 PM To: Heath Guo heath...@fb.com Cc: user user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Is it a lot of data that is expected to come through stdin? I mean is it even worth parallelizing the computation using something like Spark Streaming? On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote: Thanks for your reply! In my use case, it would be stream from only one stdin. Also I'm working with Scala. It would be great if you could talk about multi stdin case as well! Thanks. From: Tathagata Das t...@databricks.com Date: Thursday, June 11, 2015 at 8:11 PM To: Heath Guo heath...@fb.com Cc: user user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Are you going to receive data from one stdin from one machine, or many stdins on many machines? On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote: Hi, I'm new to Spark Streaming, and I want to create a application where Spark Streaming could create DStream from stdin. Basically I have a command line utility that generates stream data, and I'd like to pipe data into DStream. What's the best way to do that? I thought rdd.pipe() could help, but it seems that requires an rdd in the first place, which does not apply. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: BigDecimal problem in parquet file
On 6/10/15 8:53 PM, Bipin Nag wrote: Hi Cheng, I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an existing parquet file, then repartitioning and saving it. Doing this gives the error. The code for this doesn't look like causing problem. I have a feeling the source - the existing parquet is the culprit. I created that parquet using a jdbcrdd (pulled from microsoft sql server). First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a dataframe from it using a schema then saved it as a parquet. Following is the code : For saving jdbcrdd: name - fullqualifiedtablename pk - string for primarykey pklast - last id to pull val myRDD = new JdbcRDD( sc, () = DriverManager.getConnection(url,username,password) , SELECT * FROM + name + WITH (NOLOCK) WHERE ? = +pk+ and +pk+ = ?, 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) myRDD.saveAsObjectFile(rawdata/+name); For applying schema and saving the parquet: val myschema = schemamap(name) val myrdd = sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x = org.apache.spark.sql.Row(x:_*)) Have you tried to print out |x| here to check its contents? My guess is that |x| actually contains unit values. For example, the follow Spark shell code can reproduce a similar exception: |import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val schema = StructType(StructField(dec,DecimalType(10,0)) ::Nil) val rdd = sc.parallelize(1 to10).map(_ =Array(())).map(arr =Row(arr: _*)) val df = sqlContext.createDataFrame(rdd, schema) df.saveAsParquetFile(file:///tmp/foo) | val actualdata = sqlContext.createDataFrame(myrdd, myschema) actualdata.saveAsParquetFile(/home/bipin/stageddata/+name) Schema structtype can be made manually, though I pull table's metadata and make one. It is a simple string translation (see sql docs https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx and/or spark datatypes https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types) That is how I created the parquet file. Any help to solve the issue is appreciated. Thanks Bipin On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Would you please provide a snippet that reproduce this issue? What version of Spark were you using? Cheng On 6/9/15 8:18 PM, bipin wrote: Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Optimizing Streaming from Websphere MQ
Hi, I have created a Custom Receiver in Java which receives data from Websphere MQ and I am only writing the received records on HDFS. I have referred many forums for optimizing speed of spark streaming application. Here I am listing a few: * Spark Officialhttp://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning * VIrdatahttp://www.virdata.com/tuning-spark/ * TD's Slide (A bit Old but Useful)http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 I got mainly two point for my applicability : * giving batch interval as 1 sec * Controlling spark.streaming.blockInterval =200ms * inputStream.repartition(3) But that did not improve my actual speed (records/sec) of receiver which is MAX 5-10 records /sec. This is way less from my expectation. Am I missing something? Regards, Umesh Chaudhary This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
How to use Window Operations with kafka Direct-API?
Hi, I'm using Spark Streaming(1.3.1). I want to get exactly-once messaging from Kafka and use Window operations of DStraem, When Window operations(eg DStream#reduceByKeyAndWindow) with kafka Direct-API java.lang.ClassCastException occurs as follows. --- stacktrace -- java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146) at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) --- my source --- JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval); jssc.checkpoint(checkpoint); JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream (jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaPairDStreamString, Listlt;String pairDS = messages.mapToPair(...); JavaPairDStreamString, Listlt;String windowDs = pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString, ListString() { @Override public ListString call(ListString list1, ListString list2) throws Exception { ... } }, windowDuration, slideDuration); windowDs.foreachRDD(new FunctionJavaPairRDDlt;String,Listlt;String, Void() { @Override public Void call(JavaPairRDDString, Listlt;String rdd) throws Exception { OffsetRange[] offsetsList = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // ClassCastException occurred KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams)); for (OffsetRange offsets : offsetsList) { TopicAndPartition topicAndPartition = new TopicAndPartition(offsets.topic(), offsets.partition()); HashMapTopicAndPartition, Object map = new HashMapTopicAndPartition, Object(); map.put(topicAndPartition, offsets.untilOffset()); kc.setConsumerOffsets(group1, toScalaMap(map)); } return null; } }); Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade to parquet 1.6.0
At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with package name renamed from com.twitter to org.apache.parquet) on master branch recently. Cheng On 6/12/15 6:16 PM, Eric Eijkelenboom wrote: Hi What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like newer Parquet versions are available (e.g. 1.6.0). This would fix problems with ‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, because of a bug in Parquet 1.6.0rc3. Thanks! Eric
Re: BigDecimal problem in parquet file
Hi Cheng, Yes, some rows contain unit instead of decimal values. I believe some rows from original source I had don't have any value i.e. it is null. And that shows up as unit. How does the spark-sql or parquet handle null in place of decimal values, assuming that field is nullable. I will have to change it properly. Thanks for helping out. Bipin On 12 June 2015 at 14:57, Cheng Lian lian.cs@gmail.com wrote: On 6/10/15 8:53 PM, Bipin Nag wrote: Hi Cheng, I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an existing parquet file, then repartitioning and saving it. Doing this gives the error. The code for this doesn't look like causing problem. I have a feeling the source - the existing parquet is the culprit. I created that parquet using a jdbcrdd (pulled from microsoft sql server). First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a dataframe from it using a schema then saved it as a parquet. Following is the code : For saving jdbcrdd: name - fullqualifiedtablename pk - string for primarykey pklast - last id to pull val myRDD = new JdbcRDD( sc, () = DriverManager.getConnection(url,username,password) , SELECT * FROM + name + WITH (NOLOCK) WHERE ? = +pk+ and +pk+ = ?, 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) myRDD.saveAsObjectFile(rawdata/+name); For applying schema and saving the parquet: val myschema = schemamap(name) val myrdd = sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x = org.apache.spark.sql.Row(x:_*)) Have you tried to print out x here to check its contents? My guess is that x actually contains unit values. For example, the follow Spark shell code can reproduce a similar exception: import org.apache.spark.sql.types._import org.apache.spark.sql.Row val schema = StructType(StructField(dec, DecimalType(10, 0)) :: Nil)val rdd = sc.parallelize(1 to 10).map(_ = Array(())).map(arr = Row(arr: _*))val df = sqlContext.createDataFrame(rdd, schema) df.saveAsParquetFile(file:///tmp/foo) val actualdata = sqlContext.createDataFrame(myrdd, myschema) actualdata.saveAsParquetFile(/home/bipin/stageddata/+name) Schema structtype can be made manually, though I pull table's metadata and make one. It is a simple string translation (see sql docs https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx and/or spark datatypes https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types ) That is how I created the parquet file. Any help to solve the issue is appreciated. Thanks Bipin On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com wrote: Would you please provide a snippet that reproduce this issue? What version of Spark were you using? Cheng On 6/9/15 8:18 PM, bipin wrote: Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4 release date
Thanks a lot. On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com wrote: It was released yesterday. On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote: Hi When is official spark 1.4 release date? Best Ayan
Re: Spark 1.4 release date
Here is a spark 1.4 release blog by data bricks. https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html Guru Medasani gdm...@gmail.com On Jun 12, 2015, at 7:08 AM, ayan guha guha.a...@gmail.com wrote: Thanks a lot. On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com mailto:tsind...@gmail.com wrote: It was released yesterday. On Friday, June 12, 2015, ayan guha guha.a...@gmail.com mailto:guha.a...@gmail.com wrote: Hi When is official spark 1.4 release date? Best Ayan
Upgrade to parquet 1.6.0
Hi What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like newer Parquet versions are available (e.g. 1.6.0). This would fix problems with ‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, because of a bug in Parquet 1.6.0rc3. Thanks! Eric
Re: Upgrade to parquet 1.6.0
Great, thanks for the extra info! On 12 Jun 2015, at 12:41, Cheng Lian lian.cs@gmail.com wrote: At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with package name renamed from com.twitter to org.apache.parquet) on master branch recently. Cheng On 6/12/15 6:16 PM, Eric Eijkelenboom wrote: Hi What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like newer Parquet versions are available (e.g. 1.6.0). This would fix problems with ‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, because of a bug in Parquet 1.6.0rc3. Thanks! Eric
Re: How to use Window Operations with kafka Direct-API?
Hi Shao, Thank you for your quick prompt. I was disappointed. I will try window operations with Receiver-based Approach(KafkaUtils.createStream). Thank you again, ZIGEN 2015/06/12 17:18、Saisai Shao sai.sai.s...@gmail.com のメッセージ: I think you could not use offsetRange in such way, when you transform a DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is changed into normal RDD, but offsetRange is a specific attribute for KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet such exception. you could only do something like: directKafkaInputDStream.foreachRDD { rdd = rdd.asInstanceOf[HasOffsetRanges] ... } Apply foreachRDD directly on DirectKafkaInputDStream. 2015-06-12 16:10 GMT+08:00 ZIGEN dbviewer.zi...@gmail.com: Hi, I'm using Spark Streaming(1.3.1). I want to get exactly-once messaging from Kafka and use Window operations of DStraem, When Window operations(eg DStream#reduceByKeyAndWindow) with kafka Direct-API java.lang.ClassCastException occurs as follows. --- stacktrace -- java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146) at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) --- my source --- JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval); jssc.checkpoint(checkpoint); JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream (jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaPairDStreamString, Listlt;String pairDS = messages.mapToPair(...); JavaPairDStreamString, Listlt;String windowDs = pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString, ListString() { @Override public ListString call(ListString list1, ListString list2) throws Exception { ... } }, windowDuration, slideDuration); windowDs.foreachRDD(new FunctionJavaPairRDDlt;String,Listlt;String, Void() { @Override public Void call(JavaPairRDDString, Listlt;String rdd) throws Exception { OffsetRange[] offsetsList = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // ClassCastException occurred KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams)); for (OffsetRange offsets : offsetsList) { TopicAndPartition topicAndPartition = new TopicAndPartition(offsets.topic(), offsets.partition()); HashMapTopicAndPartition, Object map = new HashMapTopicAndPartition, Object(); map.put(topicAndPartition, offsets.untilOffset()); kc.setConsumerOffsets(group1, toScalaMap(map)); } return null; } }); Thanks! -- View this message in context:
Scheduling and node affinity
I would like to know if Spark has any facility by which particular tasks can be scheduled to run on chosen nodes. The use case: we have a large custom-format database. It is partitioned and the segments are stored on local SSD on multiple nodes. Incoming queries are matched against the database; this involves either sending each key to the correct node, or sending a batch to all nodes simultaneously, where the queries are filtered and processed against one segment each, and the results are merged at the end. Currently we are doing this with htcondor using a DAG to define the workflow and requirements expressions to match particular jobs to particular databases, but it's coarse-grained and more suited for batch processing than real-time, as well as being cumbersome to define and manage. I wonder whether Spark would suit this workflow, and if so how? It seems that either we would need to schedule parts of our jobs on the appropriate nodes, which I can't see how to do: http://spark.apache.org/docs/latest/job-scheduling.html Or possibly we could define our partitioned database as a custom type of RDD - however then we would need to define operations which work on two RDDs simultaneously (i.e. the static database and the incoming set of queries) which doesn't seem to fit Spark well AFAICS. Any other ideas how we could approach this, either with Spark or suggestions for other frameworks to look at? (We would actually prefer non-Java frameworks but are happy to look at all options) Thanks, Brian Candler. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Extracting k-means cluster values along with centers?
Greetings. I have been following some of the tutorials online for Spark k-means clustering. I would like to be able to just dump all the cluster values and their centroids to text file so I can explore the data. I have the clusters as such: val clusters = KMeans.train(parsedData, numClusters, numIterations) clusters res2: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@59de440b Is there a way to build something akin to a key value RDD that has the center as the key and the array of values associated with that center as the value? I don't see anything in the tutorials, API docs, or the Learning book for how to do this. Thank you
--jars not working?
Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos 0.19.0)... Regardless, I'm running into a really weird situation where when I pass --jars to bin/spark-shell I can't reference those classes on the repl. Is this expected? The logs even tell me that my jars have been added, and yet the classes inside of them are not available. Am I missing something obvious?
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
On 12 Jun 2015, at 23:19, Cody Koeninger c...@koeninger.org wrote: A. No, it's called once per partition. Usually you have more partitions than executors, so it will end up getting called multiple times per executor. But you can use a lazy val, singleton, etc to make sure the setup only takes place once per JVM. B. I cant speak to the specifics there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. Great thanks so much - (I guess I am not yet clear about the relationship of partition / executor / stage, but I get the idea.) Jan On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi, If you want I would be happy to work in this. I have worked with KafkaUtils.createDirectStream before, in a pull request that wasn't accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python and I'm starting to feel comfortable with Scala, so if someone opens a JIRA I can take it. Greetings, Juan Rodriguez 2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org: The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd look at would be adding that functionality back in. If someone wants help creating a patch for that, just let me know. Dealing with offsets on a per-message basis may not be as efficient as dealing with them on a batch basis using the HasOffsetRanges interface... but if efficiency was a primary concern, you probably wouldn't be using Python anyway. On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com: Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com wrote: OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
A. No, it's called once per partition. Usually you have more partitions than executors, so it will end up getting called multiple times per executor. But you can use a lazy val, singleton, etc to make sure the setup only takes place once per JVM. B. I cant speak to the specifics there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?
To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 1000 rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
Re: [Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer
Anyone met the same problem like me? 2015-06-12 23:40 GMT+08:00 Tao Li litao.bupt...@gmail.com: Hi all: I complied new spark 1.4.0 version today. But when I run WordCount demo, it throws NoSuchMethodError *java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r. I found the default *fasterxml.jackson.version* is *2.4.4*. It's there any wrong with the jackson version?
Re: how to use a properties file from a url in spark-submit
Turns out one of the other developers wrapped the jobs in script and did a cd to another folder in the script before executing spark-submit. On 12 June 2015 at 14:20, Matthew Jones mle...@gmail.com wrote: Hmm either spark-submit isn't picking up the relative path or Chronos is not setting your working directory to your sandbox. Try using cd $MESOS_SANDBOX spark-submit --properties-file props.properties On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden gog...@gmail.com wrote: That's a great idea. I did what you suggested and added the url to the props file in the uri of the json. The properties file now shows up in the sandbox. But when it goes to run spark-submit with --properties-file props.properties it fails to find it: Exception in thread main java.lang.IllegalArgumentException: requirement failed: Properties file props.properties does not exist On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote: If you are using chronos you can just put the url in the task json and chronos will download it into your sandbox. Then just use spark-submit --properties-file app.properties. On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote: That's not supported. You could use wget / curl to download the file to a temp location before running spark-submit, though. On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote: I have a properties file that is hosted at a url. I would like to be able to use the url in the --properties-file parameter when submitting a job to mesos using spark-submit via chronos I would rather do this than use a file on the local server. This doesn't seem to work though when submitting from chronos: bin/spark-submit --properties-file http://server01/props/app.properties Inside the properties file: spark.executor.memory=256M spark.cores.max=1 spark.shuffle.consolidateFiles=true spark.task.cpus=1 spark.deploy.defaultCores=1 spark.driver.cores=1 spark.scheduler.mode=FAIR So how do I specify a properties file in a url? -- Marcelo
Re: Reading file from S3, facing java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException
Looks like your spark is not able to pick up the HADOOP_CONF. To fix this, you can actually add jets3t-0.9.0.jar to the classpath (sc.addJar(/path/to/jets3t-0.9.0.jar). Thanks Best Regards On Thu, Jun 11, 2015 at 6:44 PM, shahab shahab.mok...@gmail.com wrote: Hi, I tried to read a csv file from amazon s3, but I get the following exception which I have no clue how to solve this. I tried both spark 1.3.1 and 1.2.1, but no success. Any idea how to solve this is appreciated. best, /Shahab the code: val hadoopConf=sc.hadoopConfiguration; hadoopConf.set(fs.s3.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId, aws_access_key_id) hadoopConf.set(fs.s3.awsSecretAccessKey, aws_secret_access_key) val csv = sc.textFile(s3n://mybucket/info.csv) // original file val data = csv.map(line = line.split(,).map(elem = elem.trim)) //lines in rows Here is the exception I faced: Exception in thread main java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore( NativeS3FileSystem.java:280) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize( NativeS3FileSystem.java:270) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus( FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus( FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits( FileInputFormat.java:304) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
Re: spark stream and spark sql with data warehouse
This is a good start, if you haven't read it already http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations Thanks Best Regards On Thu, Jun 11, 2015 at 8:17 PM, 唐思成 jadetan...@qq.com wrote: Hi all: We are trying to using spark to do some real time data processing. I need do some sql-like query and analytical tasks with the real time data against historical normalized data stored in data bases. Is there anyone has done this kind of work or design? Any suggestion or material would be truly welcomed.
[Spark 1.4.0] java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation
Hello. I used to be able to run debug my Spark apps in Eclipse for Spark 1.3.1 by creating a launch and setting the vm var -Dspark.master=local[4]. I am not playing with the new 1.4 and trying out some of my simple samples, which all fail with the same exception as shown below. Running them with spark-submit works fine. Anybody has any hints for getting it to work in the IDE again? It seems to be related to loading input files, which path I provide via the main args and the load via sc.textFile() in Java8. Are there any new options that I missed to tell the app to use the local file system? Exception in thread main java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:213) at org.apache.hadoop.fs.FileSystem.loadFileSystems( FileSystem.java:2401) at org.apache.hadoop.fs.FileSystem.getFileSystemClass( FileSystem.java:2411) at org.apache.hadoop.fs.FileSystem.createFileSystem( FileSystem.java:2428) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal( FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:166) at org.apache.hadoop.mapred.JobConf.getWorkingDirectory( JobConf.java:653) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths( FileInputFormat.java:389) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths( FileInputFormat.java:362) at org.apache.spark.SparkContext$$anonfun$28.apply( SparkContext.scala:762) at org.apache.spark.SparkContext$$anonfun$28.apply( SparkContext.scala:762) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply( HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply( HadoopRDD.scala:172) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:196) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217 ) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217 ) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217 ) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions( MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217 ) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1535) at org.apache.spark.rdd.RDD.reduce(RDD.scala:900) at org.apache.spark.api.java.JavaRDDLike$class.reduce( JavaRDDLike.scala:357) at org.apache.spark.api.java.AbstractJavaRDDLike.reduce( JavaRDDLike.scala:46) at com.databricks.apps.logs.LogAnalyzer.main(LogAnalyzer.java:60) Thanks and best regards, Peter Haumer.
Re: Limit Spark Shuffle Disk Usage
You can disable shuffle spill (spark.shuffle.spill http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior) if you are having enough memory to hold that much data. I believe adding more resources would be your only choice. Thanks Best Regards On Thu, Jun 11, 2015 at 9:46 PM, Al M alasdair.mcbr...@gmail.com wrote: I am using Spark on a machine with limited disk space. I am using it to analyze very large (100GB to 1TB per file) data sets stored in HDFS. When I analyze these datasets, I will run groups, joins and cogroups. All of these operations mean lots of shuffle files written to disk. Unfortunately what happens is my disk fills up very quickly (I only have 40GB free). Then my process dies because I don't have enough space on disk. I don't want to write my shuffles to HDFS because it's already pretty full. The shuffle files are cleared up between runs, but this doesnt help when a single run requires 300GB+ shuffle disk space. Is there any way that I can limit the amount of disk space used by my shuffles? I could set up a cron job to delete old shuffle files whilst the job is still running, but I'm concerned that they are left there for a good reason. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: --jars not working?
You can verify if the jars are shipped properly by looking at the driver UI (running on 4040) Environment tab. Thanks Best Regards On Sat, Jun 13, 2015 at 12:43 AM, Jonathan Coveney jcove...@gmail.com wrote: Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos 0.19.0)... Regardless, I'm running into a really weird situation where when I pass --jars to bin/spark-shell I can't reference those classes on the repl. Is this expected? The logs even tell me that my jars have been added, and yet the classes inside of them are not available. Am I missing something obvious?
Re: takeSample() results in two stages
It launches two jobs because it doesn't know ahead of time how big your RDD is, so it doesn't know what the sampling rate should be. After counting all the records, it can determine what the sampling rate should be -- then it does another pass through the data, sampling by the rate its just determined. Note that this suggests: (a) if you know the size of your RDD ahead of time, you could eliminate that first pass and (b) since you end up computing the input RDD twice, it may make sense to cache it. On Thu, Jun 11, 2015 at 11:43 AM, barmaley o...@solver.com wrote: I've observed interesting behavior in Spark 1.3.1, the reason for which is not clear. Doing something as simple as sc.textFile(...).takeSample(...) always results in two stages:Spark's takeSample() results in two stages http://apache-spark-user-list.1001560.n3.nabble.com/file/n23280/Capture.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/takeSample-results-in-two-stages-tp23280.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Optimizing Streaming from Websphere MQ
How many cores are you allocating for your job? And how many receivers are you having? It would be good if you can post your custom receiver code, it will help people to understand it better and shed some light. Thanks Best Regards On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh umesh.chaudh...@searshc.com wrote: Hi, I have created a Custom Receiver in Java which receives data from Websphere MQ and I am only writing the received records on HDFS. I have referred many forums for optimizing speed of spark streaming application. Here I am listing a few: · Spark Official http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning · VIrdata http://www.virdata.com/tuning-spark/ · TD’s Slide (A bit Old but Useful) http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 I got mainly two point for my applicability : · giving batch interval as 1 sec · Controlling “spark.streaming.blockInterval” =200ms · inputStream.repartition(3) But that did not improve my actual speed (records/sec) of receiver which is MAX 5-10 records /sec. This is way less from my expectation. Am I missing something? Regards, Umesh Chaudhary This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
Re: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?
BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you dont need to create the singleton yourself. On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Note: CCing user@spark.apache.org First, you must check if the RDD is empty: messages.foreachRDD { rdd = if (!rdd.isEmpty) { }} Now, you can obtain the instance of a SQLContext: val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) *Optional* In this moment, I like work with DataFrame. I convert RDD to DataFrame. I see that you recive a JSON: val df :DataFrame = sqlContext.jsonRDD(message, getSchema(getSchemaStr)).toDF() My getSchema function create a Schema of my JSON: def getSchemaStr() :String = feature1 feature2 ... def getSchema(schema: String) :StructType = StructType (schema.split( ).map(fieldName = StructField(fieldName, StringType, true))) I hope you helps. Regards. 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] ml-node+s1001560n23226...@n3.nabble.com: I don't know why, you said “Why? I tried this solution and works fine.” means your SQLContext instance alive all the streaming application’s life time, rather than one bath duration ? My code as below: object SQLContextSingleton extends java.io.Serializable{ @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } // type_-typex, id_-id, url_-url case class (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends Serializable case class Count(x: Int) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val kafkaParams = Map(metadata.broker.list - 10.20.30.40:9092,) @transient val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) @transient val dddstream= newsIdDStream.map(x = x._2).flatMap(x = x.split(\n)) dddstream.foreachRDD { rdd = SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable(ttable) val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql(SELECT COUNT(*) FROM ttable) ret.foreach{ x = println(x(0)) } } ssc.start() ssc.awaitTermination() 在 2015-06-09 17:41:44,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23226i=0 写道: Why? I tried this solution and works fine. El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23219i=0 escribió: Hi drarse, thanks for replying, the way you said use a singleton object does not work 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道: The best way is create a singleton object like: object SQLContextSingleton { @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }} You have more information in the programming guide: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23216i=0: I used SQLContext in a spark streaming application as blew: case class topic_name (f1: Int, f2: Int) val sqlContext = new SQLContext(sc) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) theDStream.map(x = x._2).foreach { rdd = sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name) sqlContext.sql(select count(*) from topic_name).foreach { x = WriteToFile(file_path, x(0).toString) } } ssc.start() ssc.awaitTermination() I found i could only get every 5 seconds's count of message, because The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame, i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it? Thanks~ -- If you reply to
Parquet Multiple Output
Hi, I have a scenario where I'd like to store a RDD using parquet format in many files, which corresponds to days, such as 2015/01/01, 2015/02/02, etc. So far I used this method http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job to store text files (then I have to read text files and convert to parquet and store again). Anyone has tried to store many parquet files from one RDD? Thanks, Xin
Re: Spark SQL and Skewed Joins
2. Does 1.3.2 or 1.4 have any enhancements that can help? I tried to use 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is available, would any of the JOIN enhancements help this situation? I would try Spark 1.4 after running SET spark.sql.planner.sortMergeJoin=true. Please report back if this works for you.
Are there ways to restrict what parameters users can set for a Spark job?
For example, Hive lets you set a whole bunch of parameters (# of reducers, # of mappers, size of reducers, cache size, max memory to use for a join), while Impala gives users a much smaller subset of parameters to work with, which makes it nice to give to a BI team. Is there a way to restrict which parameters a user can set for a Spark job? Maybe to cap the # of executors, or cap the memory for each executor, or to enforce a default setting no matter what parameters are used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Are-there-ways-to-restrict-what-parameters-users-can-set-for-a-Spark-job-tp23301.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?
Hi Andrew, Thanks a lot! Indeed, it doesn't start with spark, the following properties are read by implementation of the driver rather than spark conf: --conf spooky.root=s3n://spooky- \ --conf spooky.checkpoint=s3://spooky-checkpoint \ This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set the same properties? Yours Peng On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote: Hi Peng, Setting properties through --conf should still work in Spark 1.4. From the warning it looks like the config you are trying to set does not start with the prefix spark.. What is the config that you are trying to set? -Andrew 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au: In Spark 1.3.x, the system property of the driver can be set by --conf option, shared between setting spark properties and system properties. In Spark 1.4.0 this feature is removed, the driver instead log the following warning: Warning: Ignoring non-spark config property: xxx.xxx=v How do set driver's system property in 1.4.0? Is there a reason it is removed without a deprecation warning? Thanks a lot for your advices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Resource allocation configurations for Spark on Yarn
Hi Team, Sharing one article which summarize the Resource allocation configurations for Spark on Yarn: Resource allocation configurations for Spark on Yarn http://www.openkb.info/2015/06/resource-allocation-configurations-for.html -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Dynamic allocator requests -1 executors
Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
[Spark] What is the most efficient way to do such a join and column manipulation?
Hi, I want to use spark to select N columns, top M rows of all csv files under a folder. To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 100K rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?
This is the SPARK JIRA which introduced the warning: [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties in spark-shell and spark-submit On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote: Hi Andrew, Thanks a lot! Indeed, it doesn't start with spark, the following properties are read by implementation of the driver rather than spark conf: --conf spooky.root=s3n://spooky- \ --conf spooky.checkpoint=s3://spooky-checkpoint \ This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set the same properties? Yours Peng On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote: Hi Peng, Setting properties through --conf should still work in Spark 1.4. From the warning it looks like the config you are trying to set does not start with the prefix spark.. What is the config that you are trying to set? -Andrew 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au: In Spark 1.3.x, the system property of the driver can be set by --conf option, shared between setting spark properties and system properties. In Spark 1.4.0 this feature is removed, the driver instead log the following warning: Warning: Ignoring non-spark config property: xxx.xxx=v How do set driver's system property in 1.4.0? Is there a reason it is removed without a deprecation warning? Thanks a lot for your advices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi Juan, I have created a ticket for this: https://issues.apache.org/jira/browse/SPARK-8337 Thanks! Amit On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, If you want I would be happy to work in this. I have worked with KafkaUtils.createDirectStream before, in a pull request that wasn't accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python and I'm starting to feel comfortable with Scala, so if someone opens a JIRA I can take it. Greetings, Juan Rodriguez 2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org: The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd look at would be adding that functionality back in. If someone wants help creating a patch for that, just let me know. Dealing with offsets on a per-message basis may not be as efficient as dealing with them on a batch basis using the HasOffsetRanges interface... but if efficiency was a primary concern, you probably wouldn't be using Python anyway. On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com: Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com wrote: OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: NullPointerException with functions.rand()
Created PR and verified the example given by Justin works with the change: https://github.com/apache/spark/pull/6793 Cheers On Wed, Jun 10, 2015 at 7:15 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like the NPE came from this line: @transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.get().partitionId()) Could TaskContext.get() be null ? On Wed, Jun 10, 2015 at 6:15 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I am using 1.4.0 and found the following weird behavior. This case works fine: scala sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn(index, rand(30)).show() +--+---+---+ |_1| _2| index| +--+---+---+ | 1| 2| 0.6662967911724369| | 3|100|0.35734504984676396| +--+---+---+ However, when I use sqlContext.createDataFrame instead, I get a NPE: scala sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index, rand(30)).show() java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39) at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39) .. Does any one know why? Thanks. Justin -- View this message in context: NullPointerException with functions.rand() http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-with-functions-rand-tp23267.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Parquet Multiple Output
Spark 1.4 supports dynamic partitioning, you can first convert your RDD to a DataFrame and then save the contents partitioned by date column. Say you have a DataFrame df containing three columns a, b, and c, you may have something like this: df.write.partitionBy(a, b).mode(overwrite).parquet(path/to/file) Cheng On 6/13/15 5:31 AM, Xin Liu wrote: Hi, I have a scenario where I'd like to store a RDD using parquet format in many files, which corresponds to days, such as 2015/01/01, 2015/02/02, etc. So far I used this method http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job to store text files (then I have to read text files and convert to parquet and store again). Anyone has tried to store many parquet files from one RDD? Thanks, Xin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?
Thanks all for your information. Andrew, I dig out one of your old post which is relevant: http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html But didn't mention how to supply the properties that don't start with spark. On 12 June 2015 at 19:39, Ted Yu yuzhih...@gmail.com wrote: This is the SPARK JIRA which introduced the warning: [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties in spark-shell and spark-submit On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote: Hi Andrew, Thanks a lot! Indeed, it doesn't start with spark, the following properties are read by implementation of the driver rather than spark conf: --conf spooky.root=s3n://spooky- \ --conf spooky.checkpoint=s3://spooky-checkpoint \ This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set the same properties? Yours Peng On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote: Hi Peng, Setting properties through --conf should still work in Spark 1.4. From the warning it looks like the config you are trying to set does not start with the prefix spark.. What is the config that you are trying to set? -Andrew 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au: In Spark 1.3.x, the system property of the driver can be set by --conf option, shared between setting spark properties and system properties. In Spark 1.4.0 this feature is removed, the driver instead log the following warning: Warning: Ignoring non-spark config property: xxx.xxx=v How do set driver's system property in 1.4.0? Is there a reason it is removed without a deprecation warning? Thanks a lot for your advices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL and Skewed Joins
Greetings, I am trying to implement a classic star schema ETL pipeline using Spark SQL, 1.2.1. I am running into problems with shuffle joins, for those dimension tables which have skewed keys and are too large to let Spark broadcast them. I have a few questions 1. Can I split my queries so a unique, skewed key gets processed by by multiple reducer steps? I have tried this (using a UNION) but I am always left with the 199/200 executors complete, which times out and even starts throwing memory errors. That single executor is processing 95% of the 80G fact table for the single skewed key. 2. Does 1.3.2 or 1.4 have any enhancements that can help? I tried to use 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is available, would any of the JOIN enhancements help this situation? 3. Do you have suggestions for memory config if I wanted to broadcast 2G dimension tables? Is this even feasible? Do table broadcasts wind up in the heap or in dedicated storage space? Thanks for your help, Jon
Reliable SQS Receiver for Spark Streaming
I would like to have a Spark Streaming SQS Receiver which deletes SQS messages only after they were successfully stored on S3. For this a Custom Receiver can be implemented with the semantics of the Reliable Receiver. The store(multiple-records) call blocks until the given records have been stored and replicated inside Spark. If the write-ahead logs are enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. The checkpoint directory can be pointed to S3. After the store(multiple-records) blocking call finishes, are the records already stored in the checkpoint directory (and thus can be safely deleted from SQS)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: BigDecimal problem in parquet file
Maybe it's related to a bug, which is fixed by https://github.com/apache/spark/pull/6558 recently. On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag bipin@gmail.com wrote: Hi Cheng, Yes, some rows contain unit instead of decimal values. I believe some rows from original source I had don't have any value i.e. it is null. And that shows up as unit. How does the spark-sql or parquet handle null in place of decimal values, assuming that field is nullable. I will have to change it properly. Thanks for helping out. Bipin On 12 June 2015 at 14:57, Cheng Lian lian.cs@gmail.com wrote: On 6/10/15 8:53 PM, Bipin Nag wrote: Hi Cheng, I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an existing parquet file, then repartitioning and saving it. Doing this gives the error. The code for this doesn't look like causing problem. I have a feeling the source - the existing parquet is the culprit. I created that parquet using a jdbcrdd (pulled from microsoft sql server). First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a dataframe from it using a schema then saved it as a parquet. Following is the code : For saving jdbcrdd: name - fullqualifiedtablename pk - string for primarykey pklast - last id to pull val myRDD = new JdbcRDD( sc, () = DriverManager.getConnection(url,username,password) , SELECT * FROM + name + WITH (NOLOCK) WHERE ? = +pk+ and +pk+ = ?, 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) myRDD.saveAsObjectFile(rawdata/+name); For applying schema and saving the parquet: val myschema = schemamap(name) val myrdd = sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x = org.apache.spark.sql.Row(x:_*)) Have you tried to print out x here to check its contents? My guess is that x actually contains unit values. For example, the follow Spark shell code can reproduce a similar exception: import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val schema = StructType(StructField(dec, DecimalType(10, 0)) :: Nil) val rdd = sc.parallelize(1 to 10).map(_ = Array(())).map(arr = Row(arr: _*)) val df = sqlContext.createDataFrame(rdd, schema) df.saveAsParquetFile(file:///tmp/foo) val actualdata = sqlContext.createDataFrame(myrdd, myschema) actualdata.saveAsParquetFile(/home/bipin/stageddata/+name) Schema structtype can be made manually, though I pull table's metadata and make one. It is a simple string translation (see sql docs and/or spark datatypes) That is how I created the parquet file. Any help to solve the issue is appreciated. Thanks Bipin On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com wrote: Would you please provide a snippet that reproduce this issue? What version of Spark were you using? Cheng On 6/9/15 8:18 PM, bipin wrote: Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Broadcast value
Hi, I am taking Broadcast value from file. I want to use it creating Rating Object (ALS) . But I am getting null. Here is my code https://gist.github.com/yaseminn/d6afd0263f6db6ea4ec5 : At lines 17 18 is ok but 19 returns null so 21 returns me error. Why I don't understand.Do you have any idea ? Best, yasemin -- hiç ender hiç
Writing data to hbase using Sparkstreaming
Hi I am trying to write data that is produced from kafka commandline producer for some topic. I am facing problem and unable to proceed. Below is my code which I am creating a jar and running through spark-submit on spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) line in below error message? SparkConf sparkConf = new SparkConf().setAppName(JavaKafkaDemo).setMaster(local).setSparkHome(/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4); // Create the context with a 1 second batch size JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(5000)); int numThreads = 2; MapString, Integer topicMap = new HashMapString, Integer(); // topicMap.put(viewTopic, numThreads); topicMap.put(nonview, numThreads); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jsc, localhost, ViewConsumer, topicMap); JavaDStreamString lines = messages.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); lines.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { JavaPairRDDImmutableBytesWritable, Put hbasePuts = stringJavaRDD.mapToPair( new PairFunctionString, ImmutableBytesWritable, Put() { @Override public Tuple2ImmutableBytesWritable, Put call(String line) throws Exception { Put put = new Put(Bytes.toBytes(Rowkey + Math.random())); put.addColumn(Bytes.toBytes(firstFamily), Bytes.toBytes(firstColumn), Bytes.toBytes(line+fc)); return new Tuple2ImmutableBytesWritable, Put(new ImmutableBytesWritable(), put); } }); // save to HBase- Spark built-in API method hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); return null; } } ); jsc.start(); jsc.awaitTermination(); I see below error on spark-shell. ./bin/spark-submit --class SparkKafkaDemo --master local /Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.map(RDD.scala:286) at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113) at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46) at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: How to pass arguments dynamically, that needs to be used in executors
Thanks Todd, that solved my problem. Regards, Gaurav (please excuse spelling mistakes) Sent from phone On Jun 11, 2015 6:42 PM, Todd Nist tsind...@gmail.com wrote: Hi Gaurav, Seems like you could use a broadcast variable for this if I understand your use case. Create it in the driver based on the CommandLineArguments and then use it in the workers. https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables So something like: BroadcastInteger cmdLineArg = sc.broadcast(Inetger.parseInd(args[12])); Then just reference the broadcast variable in you workers. It will get shipped once to all nodes in the cluster and can be referenced by them. HTH. -Todd On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma sharmagaura...@gmail.com wrote: Hi, I am using Kafka Spark cluster for real time aggregation analytics use case in production. Cluster details 6 nodes, each node running 1 Spark and kafka processes each. Node1 - 1 Master , 1 Worker, 1 Driver, 1 Kafka process Node 2,3,4,5,6 - 1 Worker prcocess each 1 Kafka process each Spark version 1.3.0 Kafka Veriosn 0.8.1 I am using Kafka Directstream for Kafka Spark Integration. Analytics code is written in using Spark Java API. Problem Statement : I want to accept a paramter as command line argument, and pass on to the executors. (want to use the paramter in rdd.foreach method which is executed on executor) I understand that when driver is started, only the jar is transported to all the Workers. But i need to use the dynamically passed command line argument in the reduce operation executed on executors. Code Snippets for better understanding my problem : public class KafkaReconcilationJob { private static Logger logger = Logger.getLogger(KafkaReconcilationJob.class); public static void main(String[] args) throws Exception { CommandLineArguments.CLICK_THRESHOLD = Integer.parseInt(args[12]); --- I want to use this command line argument } } JavaRDDAggregatedAdeStats adeAggregatedFilteredData = adeAudGeoAggDataRdd.filter(new FunctionAggregatedAdeStats, Boolean() { @Override public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception { if(adeAggregatedObj.getImpr() CommandLineArguments.IMPR_THRESHOLD || adeAggregatedObj.getClick() CommandLineArguments.CLICK_THRESHOLD){ return true; }else { return false; } } }); The above mentioned Filter operation gets executed on executor which has 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD Regards, Gaurav
Re: how to use a properties file from a url in spark-submit
That's a great idea. I did what you suggested and added the url to the props file in the uri of the json. The properties file now shows up in the sandbox. But when it goes to run spark-submit with --properties-file props.properties it fails to find it: Exception in thread main java.lang.IllegalArgumentException: requirement failed: Properties file props.properties does not exist On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote: If you are using chronos you can just put the url in the task json and chronos will download it into your sandbox. Then just use spark-submit --properties-file app.properties. On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote: That's not supported. You could use wget / curl to download the file to a temp location before running spark-submit, though. On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote: I have a properties file that is hosted at a url. I would like to be able to use the url in the --properties-file parameter when submitting a job to mesos using spark-submit via chronos I would rather do this than use a file on the local server. This doesn't seem to work though when submitting from chronos: bin/spark-submit --properties-file http://server01/props/app.properties Inside the properties file: spark.executor.memory=256M spark.cores.max=1 spark.shuffle.consolidateFiles=true spark.task.cpus=1 spark.deploy.defaultCores=1 spark.driver.cores=1 spark.scheduler.mode=FAIR So how do I specify a properties file in a url? -- Marcelo
Re: ClassCastException: BlockManagerId cannot be cast to [B
Hello, Just in case someone finds the same issue, it was caused by running the streaming app with different version of the cluster jars (the uber jar contained both yarn and spark). Regards J -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276p23296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
These are both really good posts: you should try and get them in to the documentation. with anything implementing dynamicness, there are some fun problems (a) detecting the delays in the workflow. There's some good ideas here (b) deciding where to address it. That means you need to monitor the entire pipeline —which you should be doing in production anyway. (c) choosing the action. More nodes, more memory CPU (not that useful for Java code, even when YARN adds support for dynamic container resize) (d) choosing the size of the action. In a shared cluster, extra resources for one app comes at the expense of others. If you have pre-emption turned on in YARN, the scheduler can take containers off lower priority work, which automates a lot of this decision making. That will lose other work though, so to justify it you'd better hang on those containers (e) deciding if/when to hand things back. Scaling things down can be very expensive if lots of state has to get rebuilt elsewhere. I think Apache Helix from LinkedIn has done some good work here -worth looking at to see what lessons code to lift. And as you'd expect, it sits right behind Kafka in production. I think it gets away with low delays to scale up/down and relying on low rebuild costs. [In the work I've been doing with colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do any autoscale, because scale down is an expensive decision...we're focusing on liveness detection and reaction, then publishing the metrics needed to allow people or cross-application tools to make the decision) On 12 Jun 2015, at 04:38, Dmitry Goldenberg dgoldenberg...@gmail.commailto:dgoldenberg...@gmail.com wrote: Yes, Tathagata, thank you. For #1, the 'need detection', one idea we're entertaining is timestamping the messages coming into the Kafka topics. The consumers would check the interval between the time they get the message and that message origination timestamp. As Kafka topics start to fill up more, we would presumably see longer and longer wait times (delays) for messages to be getting processed by the consumers. The consumers would then start firing off critical events into an Event Analyzer/Aggregator which would decide that more resources are needed, then ask the Provisioning Component to allocate N new machines. We do want to set maxRatePerPartition in order to not overwhelm the consumers and run out of memory. Machine provisioning may take a while, and if left with no maxRate guards, our consumers could run out of memory. Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks... no need to do anything further. This is great, actually. We were wondering whether we'd need to restart the consumers once the new machines have been added. Tathagata's point implies, as I read it, that no further orchestration is needed, the load will start getting redistributed automatically. This makes implementation of autoscaling a lot simpler, as far as #3. One issue that's not yet been covered much is the scenario when *fewer* cluster resources become required (a system load valley rather than a peak). To detect a low volume, we'd need to measure the throughput in messages per second over time. Real low volumes would cause firing off of critical events signaling to the Analyzer that machines could be decommissioned. If machines are being decommissioned, it would seem that the consumers would need to get acquiesced (allowed to process any current batch, then shut down), then they would restart themselves or be restarted. Thoughts on this? There is also a hefty #4 here which is the hysteresis of this, where the system operates adaptively and learns over time, remembering the history of cluster expansions and contractions and allowing a certain slack for letting things cool down or heat up more gradually; also not contracting or expanding too frequently. PID controllers and thermostat types of design patterns have been mentioned before in this discussion. If you look at the big cloud apps, they dynamically reallocate VM images based on load history, with Netflix being the poster user: Hadoop work in the quiet hours, user interaction evenings and weekends. Excluding special events (including holidays), there's a lot of regularity over time, which lets you predict workload in advance. It's like your thermostat knowing fridays are cold and it should crank up the heating in advance. On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Let me try to add some clarity in the different thought directions that's going on in this thread. 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES? If there are not rate limits set up, the most reliable way to detect whether the current Spark cluster is being insufficient to handle the data load is to use the StreamingListner interface which
Re: How to use Window Operations with kafka Direct-API?
The other thing to keep in mind about spark window operations against Kafka is that spark streaming is based on current system clock, not the time embedded in your messages. So you're going to get a fundamentally wrong answer from a window operation after a failure / restart, regardless of whether you're using the createStream or createDirectStream api. On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger c...@koeninger.org wrote: Casting to HasOffsetRanges would be meaningless anyway if done after an operation that changes partitioning. You can still use the messageHandler argument to createDirectStream to get access to offsets on a per-message basis. Also, it doesn't look like what you're doing is particularly concerned about transactional correctness (since you're saving offsets to a kafka api backed by zookeeper), so you can try doing a transform as the first step in your stream, and casting to HasOffsetRanges there. On Fri, Jun 12, 2015 at 5:03 AM, zigen dbviewer.zi...@gmail.com wrote: Hi Shao, Thank you for your quick prompt. I was disappointed. I will try window operations with Receiver-based Approach(KafkaUtils.createStream). Thank you again, ZIGEN 2015/06/12 17:18、Saisai Shao sai.sai.s...@gmail.com のメッセージ: I think you could not use offsetRange in such way, when you transform a DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is changed into normal RDD, but offsetRange is a specific attribute for KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet such exception. you could only do something like: directKafkaInputDStream.foreachRDD { rdd = rdd.asInstanceOf[HasOffsetRanges] ... } Apply foreachRDD directly on DirectKafkaInputDStream. 2015-06-12 16:10 GMT+08:00 ZIGEN dbviewer.zi...@gmail.com: Hi, I'm using Spark Streaming(1.3.1). I want to get exactly-once messaging from Kafka and use Window operations of DStraem, When Window operations(eg DStream#reduceByKeyAndWindow) with kafka Direct-API java.lang.ClassCastException occurs as follows. --- stacktrace -- java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146) at org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) --- my source --- JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval); jssc.checkpoint(checkpoint); JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream (jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaPairDStreamString, Listlt;String pairDS = messages.mapToPair(...); JavaPairDStreamString, Listlt;String windowDs = pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString, ListString() { @Override public ListString call(ListString list1, ListString list2) throws Exception { ... } }, windowDuration, slideDuration); windowDs.foreachRDD(new
RE: How to set spark master URL to contain domain name?
I think the problem is that in my local etc/hosts file, I have 10.196.116.95 WIN02 I will remove it and try. Thanks for the help. Ningjun From: prajod.vettiyat...@wipro.com [mailto:prajod.vettiyat...@wipro.com] Sent: Friday, June 12, 2015 1:44 AM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to set spark master URL to contain domain name? Hi Ningjun, This is probably a configuration difference between WIN01 and WIN02. Execute: ipconfig/all on the windows command line on both machines and compare them. Also if you have a localhost entry in the hosts file, it should not have the wrong sequence: See the first answer in this link: http://stackoverflow.com/questions/6049260/fully-qualified-machine-name-java-with-etc-hosts Regards, Prajod From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: 12 June 2015 01:32 To: user@spark.apache.orgmailto:user@spark.apache.org Subject: How to set spark master URL to contain domain name? I start spark master on windows using bin\spark-class.cmd org.apache.spark.deploy.master.Master Then I goto http://localhost:8080/ to find the master URL, it is spark://WIN02:7077 Here WIN02 is my machine name. Why does it missing the domain name? If I start the spark master on other machines, the master URL will contain domain name, e.g. spark://WIN01.mycompany.com:7077 Only on machine WIN02, the master URL does not contains the domain name. How can I config WIN02 so that spark master URL will also contain the domain name, e.g. Spark://WIN02.mycompany.com Thanks Ningjun The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com
Spark Streaming - Can i BIND Spark Executor to Kafka Partition Leader
Hi, I am using Kafka Spark cluster for real time aggregation analytics use case in production. *Cluster details* *6 nodes*, each node running 1 Spark and kafka processes each. Node1 - 1 Master , 1 Worker, 1 Driver, 1 Kafka process Node 2,3,4,5,6 - 1 Worker prcocess each 1 Kafka process each Spark version 1.3.0 Kafka Veriosn 0.8.1 I am using *Kafka* *Directstream* for Kafka Spark Integration. Analytics code is written in using Spark Java API. *Problem Statement : * We are dealing with about *10 M records per hour*. My Spark Streaming Batch runs at *1 hour interval*( at 11:30 12:30 1:30 and so on) Since i am using Direct Stream, it reads all the data for past hour at 11:30 12:30 1:30 and so on Though as of now it takes *about 3 minutes* to read the data with Network bandwidth utilization of *100-200 MBPS per node*( out of 6 node Spark Cluster) Since i am running both Spark and Kafka on same machine * I WANT TO BIND MY SPARK EXECUTOR TO KAFKA PARTITION LEADER*, so as to elliminate the Network bandwidth consumption of Spark. I understand that the number of partitions created on Spark for a Direct Stream is equivalent to the number of partitions on Kafka, which is the reason got a curiosity, perhaps there might be such a provision in SPark. Regards, Gaurav
Issues with `when` in Column class
I’m trying to iterate through a list of Columns and create new Columns based on a condition. However, the when method keeps giving me errors that don’t quite make sense. If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at compile time: [error] not found: value when However, this works in the REPL just fine after I import org.apache.spark.sql.Column. On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will compile successfully, but then at runtime, I get this error: java.lang.IllegalArgumentException: when() can only be applied on a Column previously generated by when() function This appears to be pretty circular logic. How can `when` only be applied to a Column previously generated by `when`? How would I call `when` in the first place?
Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark-1.4.0]jackson-databind conflict?
I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it failed after I upgrade Spark to spark-1.4.0:( sc.parallelize(1 to 1).count code [info] com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope) [info] at [Source: {id:0,name:parallelize}; line: 1, column: 1] [info] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) [info] at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143) /code -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark-1.4.0]jackson-databind conflict?
I see the same thing in an app that uses Jackson 2.5. Downgrading to 2.4 made it work. I meant to go back and figure out if there's something that can be done to work around this in Spark or elsewhere, but for now, harmonize your Jackson version at 2.4.x if you can. On Fri, Jun 12, 2015 at 4:20 PM, Earthson earthson...@gmail.com wrote: I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it failed after I upgrade Spark to spark-1.4.0:( sc.parallelize(1 to 1).count code [info] com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope) [info] at [Source: {id:0,name:parallelize}; line: 1, column: 1] [info] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) [info] at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143) /code -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with `when` in Column class
Hi Chris, Have you imported org.apache.spark.sql.functions._? Thanks, Yin On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman cfree...@alteryx.com wrote: I’m trying to iterate through a list of Columns and create new Columns based on a condition. However, the when method keeps giving me errors that don’t quite make sense. If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at compile time: [error] not found: value when However, this works in the REPL just fine after I import org.apache.spark.sql.Column. On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will compile successfully, but then at runtime, I get this error: java.lang.IllegalArgumentException: when() can only be applied on a Column previously generated by when() function This appears to be pretty circular logic. How can `when` only be applied to a Column previously generated by `when`? How would I call `when` in the first place?
[Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer
Hi all: I complied new spark 1.4.0 version today. But when I run WordCount demo, it throws NoSuchMethodError *java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r. I found the default *fasterxml.jackson.version* is *2.4.4*. It's there any wrong with the jackson version?
Apache Spark architecture
Trying to find a complete documentation about an internal architecture of Apache Spark, but have no results there. For example I'm trying to understand next thing: Assume that we have 1Tb text file on HDFS (3 nodes in a cluster, replication factor is 1). This file will be spitted into 128Mb chunks and each chunk will be stored only on one node. We run Spark Workers on these nodes. I know that Spark is trying to work with data stored in HDFS on the same node (to avoid network I/O). For example I'm trying to do a word count in this 1Tb text file. Here I have next questions: 1. Does Spark will load chuck (128Mb) into RAM, count words, and then delete it from memory and do it sequentially? What if there will be no available RAM? 2. When does Spark will use not local data on HDFS? 3. What if I will need to do more complex task, when a results of each iteration on each Worker need to be transferred to all other Workers (shuffling?), do I need to write them by my self to HDFS and then read them? For example I can't understand how does K-means clustering or Gradient descent works on Spark. I will appreciate any link to Apache Spark architecture guide. -- Best regards, Vitalii Duk
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with `when` in Column class
That did it! Thanks! From: Yin Huai Date: Friday, June 12, 2015 at 10:31 AM To: Chris Freeman Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Issues with `when` in Column class Hi Chris, Have you imported org.apache.spark.sql.functions._? Thanks, Yin On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman cfree...@alteryx.commailto:cfree...@alteryx.com wrote: I’m trying to iterate through a list of Columns and create new Columns based on a condition. However, the when method keeps giving me errors that don’t quite make sense. If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at compile time: [error] not found: value when However, this works in the REPL just fine after I import org.apache.spark.sql.Column. On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will compile successfully, but then at runtime, I get this error: java.lang.IllegalArgumentException: when() can only be applied on a Column previously generated by when() function This appears to be pretty circular logic. How can `when` only be applied to a Column previously generated by `when`? How would I call `when` in the first place?
Re: how to use a properties file from a url in spark-submit
Hmm either spark-submit isn't picking up the relative path or Chronos is not setting your working directory to your sandbox. Try using cd $MESOS_SANDBOX spark-submit --properties-file props.properties On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden gog...@gmail.com wrote: That's a great idea. I did what you suggested and added the url to the props file in the uri of the json. The properties file now shows up in the sandbox. But when it goes to run spark-submit with --properties-file props.properties it fails to find it: Exception in thread main java.lang.IllegalArgumentException: requirement failed: Properties file props.properties does not exist On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote: If you are using chronos you can just put the url in the task json and chronos will download it into your sandbox. Then just use spark-submit --properties-file app.properties. On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote: That's not supported. You could use wget / curl to download the file to a temp location before running spark-submit, though. On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote: I have a properties file that is hosted at a url. I would like to be able to use the url in the --properties-file parameter when submitting a job to mesos using spark-submit via chronos I would rather do this than use a file on the local server. This doesn't seem to work though when submitting from chronos: bin/spark-submit --properties-file http://server01/props/app.properties Inside the properties file: spark.executor.memory=256M spark.cores.max=1 spark.shuffle.consolidateFiles=true spark.task.cpus=1 spark.deploy.defaultCores=1 spark.driver.cores=1 spark.scheduler.mode=FAIR So how do I specify a properties file in a url? -- Marcelo
Re: Is it possible to see Spark jobs on MapReduce job history ? (running Spark on YARN cluster)
For that you need SPARK-1537 and the patch to go with it It is still the spark web UI, it just hands off storage and retrieval of the history to the underlying Yarn timeline server, rather than through the filesystem. You'll get to see things as they go along too. If you do want to try it, please have a go, and provide any feedback on the JIRA/pull request. I should warn: it needs Hadoop 2.6 (Apache, HDP 2.2, CDH5.4), due to some API changes. While the patch is for 1.4+, I already have a local branch with it applied to spark 1.3.1 On 12 Jun 2015, at 03:01, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, I wonder if anyone has used use MapReduce Job History to show Spark jobs. I can see my Spark jobs (Spark running on Yarn cluster) on Resource manager (RM). I start Spark History server, and then through Spark's web-based user interface I can monitor the cluster (and track cluster and job statistics). Basically Yarn RM gets linked to Spark History server, which enables monitoring. But instead of using Spark History Server , is it possible to see Spark jobs on MapReduce job history ? (in addition to seeing them on RM) (I know through yarn logs -applicationId app ID we can get all logs after Spark job has completed, but my concern is to see the logs and completed jobs through common web ui - MapReduce Job History ) Thanks in advance. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd look at would be adding that functionality back in. If someone wants help creating a patch for that, just let me know. Dealing with offsets on a per-message basis may not be as efficient as dealing with them on a batch basis using the HasOffsetRanges interface... but if efficiency was a primary concern, you probably wouldn't be using Python anyway. On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com: Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com wrote: OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space
Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys Using Hive and Impala daily intensively. Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster 32GB RAM per node 8 cores per node spark 1.2.0+cdh5.3.3+371 I am testing some stuff on one view and getting memory errors Possibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view aers.aers_demo_view (7 million+ records) === isr bigint case id event_dtbigint Event date age double age of patient age_cod string days,months years sex string M or F yearint quarter int VIEW DEFINITION CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter` FROM `aers`.`aers_demo_v1` UNION ALL SELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter` FROM `aers`.`aers_demo_v2` UNION ALL SELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter` FROM `aers`.`aers_demo_v3` UNION ALL SELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter` FROM `aers`.`aers_demo_v4` UNION ALL SELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter` FROM `aers`.`aers_demo_v5` UNION ALL SELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`, `aers_demo_v6`.`sex` AS `GNDR_COD`, `aers_demo_v6`.`year`, `aers_demo_v6`.`quarter` FROM `aers`.`aers_demo_v6`) `aers_demo_view` 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x01b99855, /10.0.0.19:58117 = /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at org.jboss.netty.buffer.HeapChannelBuffer.init(HeapChannelBuffer.java:42) at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.init(BigEndianHeapChannelBuffer.java:34) at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507) at org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/06/11 08:36:40 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: GC
Re: Optimisation advice for Avro-Parquet merge job
Hey Kiran, Thanks very much for the response. I left for vacation before I could try this out, but I'll experiment once I get back and let you know how it goes. Thanks! James. On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote: It turns out my assumption on load and unionAll being blocking is not correct. They are transformations. So instead of just running only the load and unionAll in the run() methods, I think you will have to save the intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS like http://tachyon-project.org/) in the run() methods. The second for loop will also have to load from the intermediate parquet files. Then finally save the final dfInput[0] to the HDFS. I think this way of parallelizing will force the cluster to utilize the all the resources. -Kiran On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote: James, As I can see, there are three distinct parts to your program: - for loop - synchronized block - final outputFrame.save statement Can you do a separate timing measurement by putting a simple System.currentTimeMillis() around these blocks to know how much they are taking and then try to optimize where it takes longest? In the second block, you may want to measure the time for the two statements. Improving this boils down to playing with spark settings. Now consider the first block: I think this is a classic case of merge sort or a reduce tree. You already tried to improve this by submitting jobs in parallel using executor pool/Callable etc. To further improve the parallelization, I suggest you use a reduce tree like approach. For example, lets say you want to compute sum of all elements of an array in parallel. The way its solved for a GPU like platform is you divide your input array initially in chunks of 2, compute those n/2 sums parallely on separate threads and save the result in the first of the two elements. In the next iteration, you compute n/4 sums parallely of the earlier sums and so on till you are left with only two elements whose sum gives you final sum. You are performing many sequential unionAll operations for inputs.size() avro files. Assuming the unionAll() on DataFrame is blocking (and not a simple transformation like on RDDs) and actually performs the union operation, you will certainly benefit by parallelizing this loop. You may change the loop to something like below: // pseudo code only int n = inputs.size() // initialize executor executor = new FixedThreadPoolExecutor(n/2) dfInput = new DataFrame[n/2] for(int i =0;i n/2;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) in your code dfInput[i] = sqlContext.load(inputPaths.get(i), com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i + n/2), com.databricks.spark.avro)) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) int steps = log(n)/log(2.0) for(s = 2; s steps;s++) { int stride = n/(1 s); // n/(2^s) for(int i = 0;i stride;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) and dfInput(i+stride) in your code dfInput[i] = dfInput[i].unionAll(dfInput[i + stride]) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) } Let me know if it helped. -Kiran On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com wrote: Thanks for the confirmation! We're quite new to Spark, so a little reassurance is a good thing to have sometimes :-) The thing that's concerning me at the moment is that my job doesn't seem to run any faster with more compute resources added to the cluster, and this is proving a little tricky to debug. There are a lot of variables, so here's what we've tried already and the apparent impact. If anyone has any further suggestions, we'd love to hear! * Increase the minimum number of output files (targetPartitions above), so that input groups smaller than our minimum chunk size can still be worked on by more than one executor. This does measurably speed things up, but obviously it's a trade-off, as the original goal for this job is to merge our data into fewer, larger files. * Submit many jobs in parallel, by running the above code in a Callable, on an executor pool. This seems to help, to some extent, but I'm not sure what else needs to be configured alongside it -- driver threads, scheduling policy, etc. We set scheduling to FAIR when doing this, as that seemed like the right approach, but we're not 100% confident. It seemed to help quite substantially anyway, so perhaps this just needs further tuning? * Increasing executors,
Re: Spark 1.4 release date
Thanks guys, my question must look like a stupid one today :) Looking forward to test out 1.4.0, just downloaded it. Congrats to the team for this much anticipate release. On Fri, Jun 12, 2015 at 10:12 PM, Guru Medasani gdm...@gmail.com wrote: Here is a spark 1.4 release blog by data bricks. https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html Guru Medasani gdm...@gmail.com On Jun 12, 2015, at 7:08 AM, ayan guha guha.a...@gmail.com wrote: Thanks a lot. On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com wrote: It was released yesterday. On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote: Hi When is official spark 1.4 release date? Best Ayan -- Best Regards, Ayan Guha
Re: Spark Java API and minimum set of 3rd party dependencies
You don't add dependencies to your app -- you mark Spark as 'provided' in the build and you rely on the deployed Spark environment to provide it. On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, We want to integrate Spark in our Java application using the Spark Java Api and run then on the Yarn clusters. If i want to run Spark on Yarn, which dependencies are must for including ? I looked at Spark POM which lists that Spark requires 50+ 3rd party dependencies. Is there minimum set of Spark dependencies which are necessary for Spark Java API (for Spark client run on Yarn cluster) ? Thanks in advance. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark 1.4.0]How to set driver's system property using spark-submit options?
In Spark 1.3.x, the system property of the driver can be set by --conf option, shared between setting spark properties and system properties. In Spark 1.4.0 this feature is removed, the driver instead log the following warning: Warning: Ignoring non-spark config property: xxx.xxx=v How do set driver's system property in 1.4.0? Is there a reason it is removed without a deprecation warning? Thanks a lot for your advices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?
Hi Peng, Setting properties through --conf should still work in Spark 1.4. From the warning it looks like the config you are trying to set does not start with the prefix spark.. What is the config that you are trying to set? -Andrew 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au: In Spark 1.3.x, the system property of the driver can be set by --conf option, shared between setting spark properties and system properties. In Spark 1.4.0 this feature is removed, the driver instead log the following warning: Warning: Ignoring non-spark config property: xxx.xxx=v How do set driver's system property in 1.4.0? Is there a reason it is removed without a deprecation warning? Thanks a lot for your advices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception when using CLUSTER BY or ORDER BY
Tom, Can you file a JIRA and attach a small reproducible test case if possible? On Tue, May 19, 2015 at 1:50 PM, Thomas Dudziak tom...@gmail.com wrote: Under certain circumstances that I haven't yet been able to isolate, I get the following error when doing a HQL query using HiveContext (Spark 1.3.1 on Mesos, fine-grained mode). Is this a known problem or should I file a JIRA for it ? org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Spark Java API and minimum set of 3rd party dependencies
Thanks for prompt response, Sean. The issue is that we are restricted on dependencies we can include in our project. There are 2 issues while including dependencies: 1) there are several dependencies which we and Spark has, but the versions are conflicting. 2) there are dependencies Spark has, and our project does not have. How do we handle these 2 cases differently for including Spark dependencies (direct and transitive ones)? We need to include all dependencies (so there should not be 3rd party transitive dependency) in our POM file, more like Apache Ivy style of management of dependencies (which includes all transitive dependencies in POM file) rather than Maven style. Our main goal is: We want to integrate Spark in our Java application using the Spark Java APIi and run then on the Yarn clusters. Thanks a lot. On Fri, Jun 12, 2015 at 11:17 AM, Sean Owen so...@cloudera.com wrote: You don't add dependencies to your app -- you mark Spark as 'provided' in the build and you rely on the deployed Spark environment to provide it. On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, We want to integrate Spark in our Java application using the Spark Java Api and run then on the Yarn clusters. If i want to run Spark on Yarn, which dependencies are must for including ? I looked at Spark POM which lists that Spark requires 50+ 3rd party dependencies. Is there minimum set of Spark dependencies which are necessary for Spark Java API (for Spark client run on Yarn cluster) ? Thanks in advance. -- Best regards, Elkhan Dadashov
Spark Java API and minimum set of 3rd party dependencies
Hi all, We want to integrate Spark in our Java application using the Spark Java Api and run then on the Yarn clusters. If i want to run Spark on Yarn, which dependencies are must for including ? I looked at Spark POM http://central.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.1/spark-core_2.10-1.3.1.pom which lists that Spark requires 50+ 3rd party dependencies. Is there minimum set of Spark dependencies which are necessary for Spark Java API (for Spark client run on Yarn cluster) ? Thanks in advance.
log4j configuration ignored for some classes only
Hi all, I am running spark standalone (local[*]), and have tried to cut back on some of the logging noise from the framework by editing log4j.properties in spark/conf. The following lines are working as expected: log4j.logger.org.apache.spark=WARN log4j.logger.org.apache.spark.storage.BlockManager=ERROR (I've even guaranteed that it's definitely using my configuration by adding a prefix to the conversion pattern). However, I am still getting log messages at INFO from classes like: org.apache.spark.Logging$class(should be covered by the org.apache.spark setting) kafka.utils.Logging$class(when I add a similar setting for kafka.utils) I suspect it's because these are inner classes. It still happens even when I go up a level and add configurations like log4j.logger.org=WARN. Is this a known bug in log4j? Is there any known way to suppress these, ideally through configuration rather than programmatically? Many thanks
Fwd: Spark/PySpark errors on mysterious missing /tmp file
(This question is also present on StackOverflow http://stackoverflow.com/questions/30656083/spark-pyspark-errors-on-mysterious-missing-tmp-file ) I'm having issues with pyspark and a missing /tmp file. I've narrowed down the behavior to a short snippet. a=sc.parallelize([(16646160,1)]) # yes, just a single element b=stuff # this is read in from a text file above - the contents are shown below # b=sc.parallelize(b.collect()) a.join(b).take(10) This fails, but if I include the commented line (which should be the same thing), then it succeeds. Here is the error: --- Py4JJavaError Traceback (most recent call last) ipython-input-101-90fe86df7879 in module() 3 b=stuff.map(lambda x:(16646160,1)) 4 #b=sc.parallelize(b.collect()) 5 a.join(b).take(10) 6 b.take(10) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1109 1110 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - res = self.context.runJob(self, takeUpToNumLeft, p, True) 1112 1113 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 816 # SparkContext#runJob. 817 mappedRDD = rdd.mapPartitions(partitionFunc) -- 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 819 return list(mappedRDD._collect_iterator_through_file(it)) 820 /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 210.0 failed 1 times, most recent failure: Lost task 1.0 in stage 210.0 (TID 884, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /usr/lib/spark/python/pyspark/worker.py, line 92, in main command = pickleSer.loads(command.value) File /usr/lib/spark/python/pyspark/broadcast.py, line 106, in value self._value = self.load(self._path) File /usr/lib/spark/python/pyspark/broadcast.py, line 87, in load with open(path, 'rb', 1 20) as f: IOError: [Errno 2] No such file or directory: '/tmp/spark-4a8c591e-9192-4198-a608-c7daa3a5d494/tmpuzsAVM' at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at
Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space
It sounds like this might be caused by a memory configuration problem. In addition to looking at the executor memory, I'd also bump up the driver memory, since it appears that your shell is running out of memory when collecting a large query result. Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys Using Hive and Impala daily intensively. Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster 32GB RAM per node 8 cores per node spark 1.2.0+cdh5.3.3+371 I am testing some stuff on one view and getting memory errors Possibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view aers.aers_demo_view (7 million+ records) === isr bigint case id event_dtbigint Event date age double age of patient age_cod string days,months years sex string M or F yearint quarter int VIEW DEFINITION CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter` FROM `aers`.`aers_demo_v1` UNION ALL SELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter` FROM `aers`.`aers_demo_v2` UNION ALL SELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter` FROM `aers`.`aers_demo_v3` UNION ALL SELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter` FROM `aers`.`aers_demo_v4` UNION ALL SELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter` FROM `aers`.`aers_demo_v5` UNION ALL SELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`, `aers_demo_v6`.`sex` AS `GNDR_COD`, `aers_demo_v6`.`year`, `aers_demo_v6`.`quarter` FROM `aers`.`aers_demo_v6`) `aers_demo_view` 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x01b99855, /10.0.0.19:58117 = /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at org.jboss.netty.buffer.HeapChannelBuffer.init(HeapChannelBuffer.java:42) at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.init(BigEndianHeapChannelBuffer.java:34) at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507) at org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at