Re: Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query
The issue is now resolved. One of the csv files had an incorrect record at the end. On Fri, Feb 27, 2015 at 4:24 PM, anamika gupta anamika.guo...@gmail.com wrote: I have three tables with the following schema: case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp, DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR: Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String, MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int) case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int, MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int, VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int, SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME: java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp, INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME: java.sql.Timestamp) class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{ @throws(classOf[IndexOutOfBoundsException]) override def productElement(n: Int) = n match { case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 = ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID; case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 = PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 = IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 = ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 = TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS; case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 = LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString()) } override def productArity: Int = 29; override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d] } Non-join queries work fine: *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID), COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR, DAY_OF_YEAR)* res4: Array[org.apache.spark.sql.Row] = Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1], [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1], [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1], [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1], [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1], [2014,315,2014,2014,1], [2014,316,20141112,20141112,1], [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1], [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1], [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1], [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1], [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1], [2014,327,20141123,20141123,1], [2014,328,20141... But the join queries throw this error:* java.lang.ArrayIndexOutOfBoundsException* *scala val q = sqlContext.sql(select * from date_d dd join interval_f intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND intf.DATE_WID = 20141110)* q: org.apache.spark.sql.SchemaRDD = SchemaRDD[38] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29] ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight Exchange (HashPartitioning [WID#0], 200) InMemoryColumnarTableScan [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA... *scala q.take(5).foreach(println)* 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at basicOperators.scala:136 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46
SparkSQL production readiness
Hi, I am exploring SparkSQL for my purposes of performing large relational operations across a cluster. However, it seems to be in alpha right now. Is there any indication when it would be considered production-level? I don't see any info on the site. Regards, Ashish
RE: SparkSQL production readiness
Hopefully the alpha tag will be remove in 1.4.0, if the community can review code a little bit faster :P Thanks, Daoyuan From: Ashish Mukherjee [mailto:ashish.mukher...@gmail.com] Sent: Saturday, February 28, 2015 4:28 PM To: user@spark.apache.org Subject: SparkSQL production readiness Hi, I am exploring SparkSQL for my purposes of performing large relational operations across a cluster. However, it seems to be in alpha right now. Is there any indication when it would be considered production-level? I don't see any info on the site. Regards, Ashish
Re: Upgrade to Spark 1.2.1 using Guava
Maybe but any time the work around is to use spark-submit --conf spark.executor.extraClassPath=/guava.jar blah” that means that standalone apps must have hard coded paths that are honored on every worker. And as you know a lib is pretty much blocked from use of this version of Spark—hence the blocker severity. I could easily be wrong but userClassPathFirst doesn’t seem to be the issue. There is no class conflict. On Feb 27, 2015, at 7:13 PM, Sean Owen so...@cloudera.com wrote: This seems like a job for userClassPathFirst. Or could be. It's definitely an issue of visibility between where the serializer is and where the user class is. At the top you said Pat that you didn't try this, but why not? On Fri, Feb 27, 2015 at 10:11 PM, Pat Ferrel p...@occamsmachete.com wrote: I’ll try to find a Jira for it. I hope a fix is in 1.3 On Feb 27, 2015, at 1:59 PM, Pat Ferrel p...@occamsmachete.com wrote: Thanks! that worked. On Feb 27, 2015, at 1:50 PM, Pat Ferrel p...@occamsmachete.com wrote: I don’t use spark-submit I have a standalone app. So I guess you want me to add that key/value to the conf in my code and make sure it exists on workers. On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote: I changed in the spark master conf, which is also the only worker. I added a path to the jar that has guava in it. Still can’t find the class. Sorry, I'm still confused about what config you're changing. I'm suggesting using: spark-submit --conf spark.executor.extraClassPath=/guava.jar blah -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running in-memory SQL on streamed relational data
Hi, I have been looking at Spark Streaming , which seems to be for the use case of live streams which are processed one line at a time generally in real-time. Since SparkSQL reads data from some filesystem, I was wondering if there is something which connects SparkSQL with Spark Streaming, so I can send live relational tuples in a stream (rather than read filesystem data) for SQL operations. Also, at present, doing it with Spark Streaming would have complexities of handling multiple Dstreams etc. since I may want to run multiple adhoc queries of this kind on adhoc data I stream through. Has anyone done this kind of thing with Spark before? i.e combination of SparkSQL with Streaming. Regards, Ashish
Re: Running in-memory SQL on streamed relational data
I think you can do simple operations like foreachRDD or transform to get access to the RDDs in the stream and then you can do SparkSQL over it. Thanks Best Regards On Sat, Feb 28, 2015 at 3:27 PM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: Hi, I have been looking at Spark Streaming , which seems to be for the use case of live streams which are processed one line at a time generally in real-time. Since SparkSQL reads data from some filesystem, I was wondering if there is something which connects SparkSQL with Spark Streaming, so I can send live relational tuples in a stream (rather than read filesystem data) for SQL operations. Also, at present, doing it with Spark Streaming would have complexities of handling multiple Dstreams etc. since I may want to run multiple adhoc queries of this kind on adhoc data I stream through. Has anyone done this kind of thing with Spark before? i.e combination of SparkSQL with Streaming. Regards, Ashish
SORT BY and ORDER BY file size v/s RAM size
*Hi devs,* *Is there any connection between the input file size and RAM size for sorting using SparkSQL ?* *I tried 1 GB file with 8 GB RAM with 4 cores and got java.lang.OutOfMemoryError: GC overhead limit exceeded.* *Or could it be for any other reason ? Its working for other SparkSQL operations.* 15/02/28 16:33:03 INFO Utils: Successfully started service 'sparkDriver' on port 41392. 15/02/28 16:33:03 INFO SparkEnv: Registering MapOutputTracker 15/02/28 16:33:03 INFO SparkEnv: Registering BlockManagerMaster 15/02/28 16:33:03 INFO DiskBlockManager: Created local directory at /tmp/spark-ecf4d6f0-c526-48fa-bd8a-d74a8bf64820/spark-4865c193-05e6-4aa1-999b-ab8c426479ab 15/02/28 16:33:03 INFO MemoryStore: MemoryStore started with capacity 944.7 MB 15/02/28 16:33:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/28 16:33:03 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af545c0b-15e6-4efa-a151-2c73faba8948/spark-987f58b4-5735-4965-91d1-38f238f4bb11 15/02/28 16:33:03 INFO HttpServer: Starting HTTP Server 15/02/28 16:33:03 INFO Utils: Successfully started service 'HTTP file server' on port 44588. 15/02/28 16:33:08 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/28 16:33:08 INFO SparkUI: Started SparkUI at http://10.30.9.7:4040 15/02/28 16:33:08 INFO Executor: Starting executor ID driver on host localhost 15/02/28 16:33:08 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.30.9.7:41392/user/HeartbeatReceiver 15/02/28 16:33:08 INFO NettyBlockTransferService: Server created on 34475 15/02/28 16:33:08 INFO BlockManagerMaster: Trying to register BlockManager 15/02/28 16:33:08 INFO BlockManagerMasterActor: Registering block manager localhost:34475 with 944.7 MB RAM, BlockManagerId(driver, localhost, 34475) 15/02/28 16:33:08 INFO BlockManagerMaster: Registered BlockManager 15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(193213) called with curMem=0, maxMem=990550425 15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 188.7 KB, free 944.5 MB) 15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=193213, maxMem=990550425 15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 944.5 MB) 15/02/28 16:33:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34475 (size: 24.8 KB, free: 944.6 MB) 15/02/28 16:33:09 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/02/28 16:33:09 INFO SparkContext: Created broadcast 0 from textFile at SortSQL.scala:20 15/02/28 16:33:10 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/28 16:33:10 INFO ObjectStore: ObjectStore, initialize called 15/02/28 16:33:10 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/28 16:33:10 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/28 16:33:12 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/28 16:33:12 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/28 16:33:13 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/28 16:33:13 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/28 16:33:13 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/28 16:33:13 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/28 16:33:13 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/28 16:33:13 INFO ObjectStore: Initialized ObjectStore 15/02/28 16:33:14 INFO HiveMetaStore: Added admin role in metastore 15/02/28 16:33:14 INFO HiveMetaStore: Added public role in metastore 15/02/28 16:33:14 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/28 16:33:14 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/28 16:33:14 INFO ParseDriver: Parsing command: SELECT * FROM people SORT BY B DESC 15/02/28 16:33:14 INFO ParseDriver: Parse Completed 15/02/28 16:33:14 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/02/28 16:33:14 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/02/28 16:33:14 INFO deprecation:
Reg. Difference in Performance
Hi, I am running Spark applications in GCE. I set up cluster with different number of nodes varying from 1 to 7. The machines are single core machines. I set the spark.default.parallelism to the number of nodes in the cluster for each cluster. I ran the four applications available in Spark Examples, SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations. What I notice is the following: In case of SparkTC and SparkALS, the time to complete the job increases with the increase in number of nodes in cluster, where as in SparkLR and SparkPi, the time to complete the job remains the same across all the configurations. Could anyone explain me this? Thank You Regards, Deep
bitten by spark.yarn.executor.memoryOverhead
hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me.
Re: bitten by spark.yarn.executor.memoryOverhead
I have created SPARK-6085 with pull request: https://github.com/apache/spark/pull/4836 Cheers On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: +1 to a better default as well. We were working find until we ran against a real dataset which was much larger than the test dataset we were using locally. It took me a couple days and digging through many logs to figure out this value was what was causing the problem. On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote: Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bitten by spark.yarn.executor.memoryOverhead
Thanks for taking this on Ted! On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu yuzhih...@gmail.com wrote: I have created SPARK-6085 with pull request: https://github.com/apache/spark/pull/4836 Cheers On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: +1 to a better default as well. We were working find until we ran against a real dataset which was much larger than the test dataset we were using locally. It took me a couple days and digging through many logs to figure out this value was what was causing the problem. On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote: Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL production readiness
We are planning to remove the alpha tag in 1.3.0. On Sat, Feb 28, 2015 at 12:30 AM, Wang, Daoyuan daoyuan.w...@intel.com wrote: Hopefully the alpha tag will be remove in 1.4.0, if the community can review code a little bit faster :P Thanks, Daoyuan *From:* Ashish Mukherjee [mailto:ashish.mukher...@gmail.com] *Sent:* Saturday, February 28, 2015 4:28 PM *To:* user@spark.apache.org *Subject:* SparkSQL production readiness Hi, I am exploring SparkSQL for my purposes of performing large relational operations across a cluster. However, it seems to be in alpha right now. Is there any indication when it would be considered production-level? I don't see any info on the site. Regards, Ashish
Re: Tools to manage workflows on Spark
Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Reg. Difference in Performance
Hi Deep, Compute times may not be very meaningful for small examples like those. If you increase the sizes of the examples, then you may start to observe more meaningful trends and speedups. Joseph On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running Spark applications in GCE. I set up cluster with different number of nodes varying from 1 to 7. The machines are single core machines. I set the spark.default.parallelism to the number of nodes in the cluster for each cluster. I ran the four applications available in Spark Examples, SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations. What I notice is the following: In case of SparkTC and SparkALS, the time to complete the job increases with the increase in number of nodes in cluster, where as in SparkLR and SparkPi, the time to complete the job remains the same across all the configurations. Could anyone explain me this? Thank You Regards, Deep
Re: Tools to manage workflows on Spark
You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com mailto:caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
How to debug a hung spark application
Hi, I have a spark application that hangs on doing just one task (Rest 200-300 task gets completed in reasonable time) I can see in the Thread dump which function gets stuck how ever I don't have a clue as to what value is causing that behaviour. Also, logging the inputs before the function is executed does not help as the actual message gets buried in logs. How do one go about debugging such case? Also, is there a way I can wrap my function inside some sort of timer based environment and if it took too long I would throw a stack trace or some sort. Thanks - Manas Kar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-debug-a-hung-spark-application-tp21859.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to debug a Hung task
For what it's worth, I was seeing mysterious hangs, but it went away when upgrading from spark1.2 to 1.2.1.I don't know if this is your problem.Also, I'm using AWS EMR images, which were also upgraded. Anyway, that's my experience. -Mike From: Manas Kar manasdebashis...@gmail.com To: user@spark.apache.org user@spark.apache.org Sent: Friday, February 27, 2015 3:50 PM Subject: How to debug a Hung task Hi, I have a spark application that hangs on doing just one task (Rest 200-300 task gets completed in reasonable time)I can see in the Thread dump which function gets stuck how ever I don't have a clue as to what value is causing that behaviour.Also, logging the inputs before the function is executed does not help as the actual message gets buried in logs. How do one go about debugging such case?Also, is there a way I can wrap my function inside some sort of timer based environment and if it took too long I would throw a stack trace or some sort. ThanksManas
Re: Getting to proto buff classes in Spark Context
Maybe try including the jar with --driver-class-path jar On Feb 26, 2015, at 12:16 PM, Akshat Aranya aara...@gmail.com wrote: My guess would be that you are packaging too many things in your job, which is causing problems with the classpath. When your jar goes in first, you get the correct version of protobuf, but some other version of something else. When your jar goes in later, other things work, but protobuf breaks. This is just a guess though; take a look at what you're packaging in your jar and look for things that Spark or Kafka could also be using. On Thu, Feb 26, 2015 at 10:06 AM, necro351 . necro...@gmail.com wrote: Hello everyone, We are trying to decode a message inside a Spark job that we receive from Kafka. The message is encoded using Proto Buff. The problem is when decoding we get class-not-found exceptions. We have tried remedies we found online in Stack Exchange and mail list archives but nothing seems to work. (This question is a re-ask, but we really cannot figure this one out.) We created a standalone repository with a very simple Spark job that exhibits the above issues. The spark job reads the messages from the FS, decodes them, and prints them. Its easy to checkout and try to see the exception yourself: just uncomment the code that prints the messages from within the RDD. The only sources are the generated Proto Buff java sources and a small Spark Job that decodes a message. I'd appreciate if anyone could take a look. https://github.com/vibhav/spark-protobuf We tried a couple remedies already. Setting spark.files.userClassPathFirst didn't fix the problem for us. I am not very familiar with the Spark and Scala environment, so please correct any incorrect assumptions or statements I make. However, I don't believe this to be a classpath visibility issue. I wrote a small helper method to print out the classpath from both the driver and worker, and the output is identical. (I'm printing out System.getProperty(java.class.path) -- is there a better way to do this or check the class path?). You can print out the class paths the same way we are from the example project above. Furthermore, userClassPathFirst seems to have a detrimental effect on otherwise working code, which I cannot explain or do not understand. For example, I created a trivial RDD as such: val l = List(1, 2, 3) sc.makeRDD(l).foreach((x: Int) = { println(x.toString) }) With userClassPathFirst set, I encounter a java.lang.ClassCastException trying to execute that code. Is that to be expected? You can re-create this issue by commenting out the block of code that tries to print the above in the example project we linked to above. We also tried dynamically adding the jar with .addJar to the Spark Context but this seemed to have no effect. Thanks in advance for any help y'all can provide.
Re: Problem getting program to run on 15TB input
A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have
Re: Upgrade to Spark 1.2.1 using Guava
Yes. I ran into this problem with mahout snapshot and spark 1.2.0 not really trying to figure out why that was a problem, since there were already too many moving parts in my app. Obviously there is a classpath issue somewhere. /Erlend On 27 Feb 2015 22:30, Pat Ferrel p...@occamsmachete.com wrote: @Erlend hah, we were trying to merge your PR and ran into this—small world. You actually compile the JavaSerializer source in your project? @Marcelo do you mean by modifying spark.executor.extraClassPath on all workers, that didn’t seem to work? On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg erl...@hamnaberg.net wrote: Hi. I have had a simliar issue. I had to pull the JavaSerializer source into my own project, just so I got the classloading of this class under control. This must be a class loader issue with spark. -E On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel p...@occamsmachete.com wrote: I understand that I need to supply Guava to Spark. The HashBiMap is created in the client and broadcast to the workers. So it is needed in both. To achieve this there is a deps.jar with Guava (and Scopt but that is only for the client). Scopt is found so I know the jar is fine for the client. I pass in the deps.jar to the context creation code. I’ve checked the content of the jar and have verified that it is used at context creation time. I register the serializer as follows: class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) = { val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]() //kryo.addDefaultSerializer(h.getClass, new JavaSerializer()) log.info(\n\n\nRegister Serializer for + h.getClass.getCanonicalName + \n\n\n) // just to be sure this does indeed get logged kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) } } The job proceeds up until the broadcast value, a HashBiMap, is deserialized, which is where I get the following error. Have I missed a step for deserialization of broadcast values? Odd that serialization happened but deserialization failed. I’m running on a standalone localhost-only cluster. 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, 192.168.0.2): java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95) at my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090) ... 19 more root eror == Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
Tools to manage workflows on Spark
Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Tools to manage workflows on Spark
Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Ted, spark-catalyst_2.11-1.2.1.jar is present in the class path. BTW, I am running the code locally in eclipse workspace. Here’s complete exception stack trace - Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [/Applications/eclipse/plugins/org.scala-lang.scala-library_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-reflect_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-actors_2.11.5.v20150101-184742-3fafbc204f.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:341) at scala.reflect.api.Universe.typeOf(Universe.scala:61) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111) —— On Feb 28, 2015, at 9:31 AM, Ted Yu yuzhih...@gmail.com wrote: Have you verified that spark-catalyst_2.10 jar was in the classpath ? Cheers On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish
Re: Is there any Sparse Matrix implementation in Spark/MLib?
Hi Shahab, There are actually a few distributed Matrix types which support sparse representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix. The documentation has a bit more info about the various uses: http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix The Spark 1.3 RC includes a new one: BlockMatrix. But since these are distributed, they are represented using RDDs, so they of course will not be as fast as computations on smaller, locally stored matrices. Joseph On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: try using breeze (scala linear algebra library) On Fri, Feb 27, 2015 at 5:56 PM, shahab shahab.mok...@gmail.com wrote: Thanks a lot Vijay, let me see how it performs. Best Shahab On Friday, February 27, 2015, Vijay Saraswat vi...@saraswat.org wrote: Available in GML -- http://x10-lang.org/x10-community/applications/global- matrix-library.html We are exploring how to make it available within Spark. Any ideas would be much appreciated. On 2/27/15 7:01 AM, shahab wrote: Hi, I just wonder if there is any Sparse Matrix implementation available in Spark, so it can be used in spark application? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some questions after playing a little with the new ml.Pipeline.
Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation and give data in batch to the network. In the pipeline, this corresponds to run transformers that work on a partition basis and give the whole partition to a single caffe network. How can we create such a transformer ? Best, Jao
Re: bitten by spark.yarn.executor.memoryOverhead
There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bitten by spark.yarn.executor.memoryOverhead
Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem getting program to run on 15TB input
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer,
Re: Scheduler hang?
Moving user to bcc. What I found was that the TaskSetManager for my task set that had 5 tasks had preferred locations set for 4 of the 5. Three had localhost/driver and had completed. The one that had nothing had also completed. The last one was set by our code to be my IP address. Local mode can hang on this because of https://issues.apache.org/jira/browse/SPARK-4939 addressed by https://github.com/apache/spark/pull/4147, which is obviously not an optimal solution but since it's only local mode, it's very good enough. I'm not going to wait for those seconds to tick by to complete the task, so I'll fix the IP address reporting side for local mode in my code. On Thu, Feb 26, 2015 at 8:32 PM, Victor Tso-Guillen v...@paxata.com wrote: Of course, breakpointing on every status update and revive offers invocation kept the problem from happening. Where could the race be? On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Love to hear some input on this. I did get a standalone cluster up on my local machine and the problem didn't present itself. I'm pretty confident that means the problem is in the LocalBackend or something near it. On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com wrote: Okay I confirmed my suspicions of a hang. I made a request that stopped progressing, though the already-scheduled tasks had finished. I made a separate request that was small enough not to hang, and it kicked the hung job enough to finish. I think what's happening is that the scheduler or the local backend is not kicking the revive offers messaging at the right time, but I have to dig into the code some more to nail the culprit. Anyone on these list have experience in those code areas that could help? On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote: Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Failed to parse Hive query
Hi, I reconfigured everything. Still facing the same issue. Can someone please help? On Friday, February 27, 2015, Anusha Shamanur anushas...@gmail.com wrote: I do. What tags should I change in this? I changed the value of hive.exec.scratchdir to /tmp/hive. What else? On Fri, Feb 27, 2015 at 2:14 PM, Michael Armbrust mich...@databricks.com javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote: Do you have a hive-site.xml file or a core-site.xml file? Perhaps something is misconfigured there? On Fri, Feb 27, 2015 at 7:17 AM, Anusha Shamanur anushas...@gmail.com javascript:_e(%7B%7D,'cvml','anushas...@gmail.com'); wrote: Hi, I am trying to do this in spark-shell: val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val listTables =hiveCtx.hql(show tables) The second line fails to execute with this message: warning: there were 1 deprecation warning(s); re-run with -deprecation for details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: show tables at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException: Conf non-local session path expected to be non-null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586) at org.apache.hadoop.hive.ql.Context.(Context.java:129) at org.apache.hadoop.hive.ql.Context.(Context.java:116) at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more Any help would be appreciated. -- Sent from Gmail mobile -- Regards, Anusha -- Sent from Gmail mobile
Scalable JDBCRDD
Hi Spark community, We have a use case where we need to pull huge amounts of data from a SQL query against a database into Spark. We need to execute the query against our huge database and not a substitute (SparkSQL, Hive, etc) because of a couple of factors including custom functions used in the queries that only our database has. We started by looking at JDBC RDD, which utilizes a prepared statement with two parameters that are meant to be used to partition the result set to the workers... e.g.: select * from table limit ?,? turns into select * from table limit 1,100 on worker 1 select * from table limit 101,200 on worker 2 This will not work for us because our database cannot support multiple execution of these queries without being crippled. But, additionally, our database doesn't support the above LIMIT syntax and we don't have a generic way of partitioning the various queries. As a result -- we stated by forking JDBCRDD and made a version that executes the SQL query once in getPartitions into a Vector and then hands each worker node an index and iterator. Here's a snippet of getPartitions and compute: override def getPartitions: Array[Partition] = { //Compute the DB query once here val results = computeQuery (0 until numPartitions).map(i = { // TODO: would be better to do this partitioning when scrolling through result set if still loading into memory val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector new DBPartition(i, partitionItems) }).toArray } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { val part = thePart.asInstanceOf[DBPartition[T]] //Shift the result vector to our index number and then do a sliding iterator over it val iterator = part.items.iterator override def getNext : T = { if (iterator.hasNext) { iterator.next() } else { finished = true null.asInstanceOf[T] } } override def close: Unit = () } This is a little better since we can just execute the query once. However, the result-set needs to fit in memory. We've been trying to brainstorm a way to A) have that result set distribute out to the worker RDD partitions as it's streaming in from the cursor? B) have the result set spill to disk if it exceeds memory and do something clever around the iterators? C) something else? We're not familiar enough yet with all of the workings of Spark to know how to proceed on this. We also thought of the worker-around of having the DB query dump to HDFS/S3 and then pick it up for there, but it adds more moving parts and latency to our processing. Does anyone have a clever suggestion? Are we missing something? thanks, Michal
Re: Problem getting program to run on 15TB input
I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection: Exception while reading SendingConnection ... java.nio.channels.ClosedChannelException (^ guessing that is symptom of something else) WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms (^ guessing that is symptom of something else) ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded* Other times I will get messages about executor lost... about 1 message per second, after ~~50k tasks complete, until there are almost no executors left and progress slows to nothing. I ran with verbose GC info; I do see failing yarn containers that have multiple (like 30) Full GC messages but I don't know how to interpret if that is the problem. Typical Full GC time taken seems ok: [Times: user=23.30 sys=0.06, real=1.94 secs]
Re: Problem getting program to run on 15TB input
But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with:
Re: Problem getting program to run on 15TB input
So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection: Exception while reading SendingConnection ... java.nio.channels.ClosedChannelException (^ guessing that is symptom of something else) WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms (^ guessing that is symptom of something else) ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded* Other times I will get messages about executor lost... about 1 message per second, after ~~50k tasks complete, until there are almost no executors left and progress slows to nothing. I ran with verbose GC info; I do see failing yarn containers that have multiple (like 30) Full GC messages but I don't know how to interpret if that is the problem. Typical Full GC time taken seems ok: [Times: user=23.30 sys=0.06, real=1.94 secs] Suggestions, please? Huge thanks for useful suggestions, Arun
Re: Problem getting program to run on 15TB input
The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection:
getting this error while runing
conf = SparkConf().setAppName(spark_calc3merged).setMaster(spark://ec2-54-145-68-13.compute-1.amazonaws.com:7077) sc = SparkContext(conf=conf,pyFiles=[/root/platinum.py,/root/collections2.py]) 15/02/28 19:06:38 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0 (TID 38, ip-10-80-15-145.ec2.internal): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 73065 com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 15/02/28 19:06:38 INFO scheduler.TaskSetManager: Starting task 5.1 in stage 3.0 (TID 44, ip-10-80-15-145.ec2.internal, NODE_LOCAL, 1502 bytes) 15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 8.0 in stage 3.0 (TID 41) in 7040 ms on ip-10-80-98-118.ec2.internal (9/11) 15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 9.0 in stage 3.0 (TID 42) in 7847 ms on ip-10-80-15-145.ec2.internal (10/11) 15/02/28 19:06:50 WARN scheduler.TaskSetManager: Lost task 5.1 in stage 3.0 (TID 44, ip-10-80-15-145.ec2.internal): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 73065 com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 15/02/28 19:06:50 INFO scheduler.TaskSetManager: Starting task 5.2 in stage 3.0 (TID 45, ip-10-80-98-118.ec2.internal, NODE_LOCAL, 1502 bytes) 15/02/28 19:07:01 WARN scheduler.TaskSetManager: Lost task 5.2 in stage 3.0 (TID 45, ip-10-80-98-118.ec2.internal): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 73065 com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
Re: Problem getting program to run on 15TB input
The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true)
Re: bitten by spark.yarn.executor.memoryOverhead
+1 to a better default as well. We were working find until we ran against a real dataset which was much larger than the test dataset we were using locally. It took me a couple days and digging through many logs to figure out this value was what was causing the problem. On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote: Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Have you verified that spark-catalyst_2.10 jar was in the classpath ? Cheers On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish
Re: Missing shuffle files
Just wanted to point out- raising the memory-head (as I saw in the logs) was the fix for this issue and I have not seen dying executors since this calue was increased On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote: If you thinking of the yarn memory overhead, then yes, I have increased that as well. However, I'm glad to say that my job finished successfully finally. Besides the timeout and memory settings, performing repartitioning (with shuffling) at the right time seems to be the key to make this large job succeed. With all the transformations in the job, the partition distribution was becoming increasingly skewed. Not easy to figure out when and to what number of partitions to set, and takes forever to tweak these settings since it's works perfectly for small datasets and you'll have to experiment with large time-consuming jobs. Imagine if there was an automatic partition reconfiguration function that automagically did that... On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote: I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the
Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish
Re: getting this error while runing
Also the data file is on hdfs -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-this-error-while-runing-tp21860p21861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish
Re: Accumulator in SparkUI for streaming
So somehow Spark Streaming doesn't support display of named accumulators in the WebUI? On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic petar.zece...@gmail.com wrote: Interesting. Accumulators are shown on Web UI if you are using the ordinary SparkContext (Spark 1.2). It just has to be named (and that's what you did). scala val acc = sc.accumulator(0, test accumulator) acc: org.apache.spark.Accumulator[Int] = 0 scala val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 scala rdd.foreach(x = acc += 1) scala acc.value res1: Int = 1000 The Stage details page shows: On 20.2.2015. 9:25, Tim Smith wrote: On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
Re: Tools to manage workflows on Spark
Sorry not really. Spork is a way to migrate your existing pig scripts to Spark or write new pig jobs then can execute on spark. For orchestration you are better off using Oozie especially if you are using other execution engines/systems besides spark. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks Mayur! I'm looking for something that would allow me to easily describe and manage a workflow on Spark. A workflow in my context is a composition of Spark applications that may depend on one another based on hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on app level. On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: We do maintain it but in apache repo itself. However Pig cannot do orchestration for you. I am not sure what you are looking at from Pig in this context. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote: Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Tools to manage workflows on Spark
Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Connection pool in workers
Hi guys, I am new to spark and we are running a small project that collects data from Kinesis and inserts in to mongo. I would like to share a high level view of how it is done and would love you input on it. I am fetching kinesis data and for each RDD - Parsing String data - Inserting into a mongo storage So what I understand is when in each RDD we are parsing data”, that is serialized and send to workers. So when I would want to write to mongo. Each workers creates a new connection to write to data. Is there any way I can use a connection pool? By the way I am using scala and spark streaming. A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources
Re: Tools to manage workflows on Spark
Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Tools to manage workflows on Spark
Thanks Mayur! I'm looking for something that would allow me to easily describe and manage a workflow on Spark. A workflow in my context is a composition of Spark applications that may depend on one another based on hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on app level. On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: We do maintain it but in apache repo itself. However Pig cannot do orchestration for you. I am not sure what you are looking at from Pig in this context. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote: Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Reg. Difference in Performance
You mean the size of the data that we take? Thank You Regards, Deep On Sun, Mar 1, 2015 at 6:04 AM, Joseph Bradley jos...@databricks.com wrote: Hi Deep, Compute times may not be very meaningful for small examples like those. If you increase the sizes of the examples, then you may start to observe more meaningful trends and speedups. Joseph On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running Spark applications in GCE. I set up cluster with different number of nodes varying from 1 to 7. The machines are single core machines. I set the spark.default.parallelism to the number of nodes in the cluster for each cluster. I ran the four applications available in Spark Examples, SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations. What I notice is the following: In case of SparkTC and SparkALS, the time to complete the job increases with the increase in number of nodes in cluster, where as in SparkLR and SparkPi, the time to complete the job remains the same across all the configurations. Could anyone explain me this? Thank You Regards, Deep
Re: Tools to manage workflows on Spark
We do maintain it but in apache repo itself. However Pig cannot do orchestration for you. I am not sure what you are looking at from Pig in this context. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote: Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish