Re: settings from props file seem to be ignored in mesos
There isn't a conf/spark-defaults.conf file in the .tgz. There's a template file, but we didn't think we'd need one. I assumed using the defaults and anything we wanted to override would be in the properties file we load via --properties-file, as well as command line parms (--master etc). On 16 June 2015 at 04:34, Akhil Das ak...@sigmoidanalytics.com wrote: Whats in your executor (that .tgz file) conf/spark-default.conf file? Thanks Best Regards On Mon, Jun 15, 2015 at 7:14 PM, Gary Ogden gog...@gmail.com wrote: I'm loading these settings from a properties file: spark.executor.memory=256M spark.cores.max=1 spark.shuffle.consolidateFiles=true spark.task.cpus=1 spark.deploy.defaultCores=1 spark.driver.cores=1 spark.scheduler.mode=FAIR Once the job is submitted to mesos, I can go to the spark UI for that job (hostname:4040) and on the environment tab. I see that those settings are there. If I then comment out all those settings and allow spark to use the defaults, it still appears to use the same settings in mesos. Under both runs, it still shows 1 task, 3 cpu, 1GB memory. Nothing seems to change no matter what is put in that props file, even if they show up in the spark environment tab.
Re: Spark History Server pointing to S3
Not quiet sure, but try pointing the spark.history.fs.logDirectory to your s3 Thanks Best Regards On Tue, Jun 16, 2015 at 6:26 PM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: In Spark website it’s stated in the View After the Fact section ( https://spark.apache.org/docs/latest/monitoring.html) that you can point the start-history-server.sh script to a directory in order do view the Web UI using the logs as data source. Is it possible to point that script to S3? Maybe from a EC2 instance? Thanks, Gianluca - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: tasks won't run on mesos when using fine grained
On the master node, I see this printed over and over in the mesos-master.WARNING log file: W0615 06:06:51.211262 8672 hierarchical_allocator_process.hpp:589] Using the default value of 'refuse_seconds' to create the refused resources filter because the input value is negative Here's what I see in the master INFO file: I0616 12:10:55.040024 8674 http.cpp:478] HTTP request for '/master/state.json' I0616 12:10:55.425833 8669 master.cpp:3843] Sending 1 offers to framework 20150511-140547-189138442-5051-8667-0831 (Savings) at scheduler-5a5e99d4-5e16-4a48-94d5-86f751615a04@10.6.71.203:47979 I0616 12:10:55.438303 8669 master.cpp:3843] Sending 1 offers to framework 20150304-134212-222692874-5051-2300-0054 (chronos-2.3.2_mesos-0.20.1-SNAPSHOT) at scheduler-c8f2acc2-d16e-44d5-b54f-7f88d3ab39a2@10.6.70.11:57549 I0616 12:10:55.441295 8669 master.cpp:3843] Sending 1 offers to framework 20150511-140547-189138442-5051-8667-0838 (Savings) at scheduler-8b4389df-109e-49f5-8064-dd263fbec9fe@10.6.71.202:53346 I0616 12:10:55.442204 8669 master.cpp:2344] Processing reply for offers: [ 20150511-140547-189138442-5051-8667-O9282037 ] on slave 20150511-140547-189138442-5051-8667-S4 at slave(1)@10.6.71.203:5151 (secasdb01-2) for framework 20150511-140547-189138442-5051-8667-0831 (Savings) at scheduler-5a5e99d4-5e16-4a48-94d5-86f751615a04@10.6.71.203:47979 I0616 12:10:55.443111 8669 master.cpp:2344] Processing reply for offers: [ 20150511-140547-189138442-5051-8667-O9282038 ] on slave 20150304-134111-205915658-5051-1595-S0 at slave(1)@10.6.71.206:5151 (secasdb01-3) for framework 20150304-134212-222692874-5051-2300-0054 (chronos-2.3.2_mesos-0.20.1-SNAPSHOT) at scheduler-c8f2acc2-d16e-44d5-b54f-7f88d3ab39a2@10.6.70.11:57549 I0616 12:10:55.444875 8671 hierarchical_allocator_process.hpp:563] Recovered mem(*):5305; disk(*):4744; ports(*):[25001-3] (total allocatable: mem(*):5305; disk(*):4744; ports(*):[25001-3]) on slave 20150511-140547-189138442-5051-8667-S4 from framework 20150511-140547-189138442-5051-8667-0831 I0616 12:10:55.445121 8669 master.cpp:2344] Processing reply for offers: [ 20150511-140547-189138442-5051-8667-O9282039 ] on slave 20150511-140547-189138442-5051-8667-S2 at slave(1)@10.6.71.202:5151 (secasdb01-1) for framework 20150511-140547-189138442-5051-8667-0838 (Savings) at scheduler-8b4389df-109e-49f5-8064-dd263fbec9fe@10.6.71.202:53346 I0616 12:10:55.445971 8670 hierarchical_allocator_process.hpp:563] Recovered mem(*):6329; disk(*):5000; ports(*):[25001-3] (total allocatable: mem(*):6329; disk(*):5000; ports(*):[25001-3]) on slave 20150304-134111-205915658-5051-1595-S0 from framework 20150304-134212-222692874-5051-2300-0054 I0616 12:10:55.446185 8674 hierarchical_allocator_process.hpp:563] Recovered mem(*):4672; disk(*):4488; ports(*):[25001-25667, 25669-3] (total allocatable: mem(*):4672; disk(*):4488; ports(*):[25001-25667, 25669-3]) on slave 20150511-140547-189138442-5051-8667-S2 from framework 20150511-140547-189138442-5051-8667-0838 There's two savings jobs and one weather job and they're all hung right now (all started from chronos). Here's what the frameworks tab looks like in mesos: IDHostUserNameActive TasksCPUsMemMax ShareRegisteredRe-Registered …5051-8667-0840 http://intmesosmaster01:5051/#/frameworks/20150511-140547-189138442-5051-8667-0840 secasdb01-1mesosWeather000 B0%4 hours ago-…5051-8667-0838 http://intmesosmaster01:5051/#/frameworks/20150511-140547-189138442-5051-8667-0838 secasdb01-1mesosSavings000 B0%4 hours ago-…5051-8667-0831 http://intmesosmaster01:5051/#/frameworks/20150511-140547-189138442-5051-8667-0831 secasdb01-2mesosSavings000 B0%7 hours ago-…5051-8667-0804 http://intmesosmaster01:5051/#/frameworks/20150511-140547-189138442-5051-8667-0804 secasdb01-1mesosAlertConsumer131.0 GB50%20 hours ago-…5051-2300-0090 http://intmesosmaster01:5051/#/frameworks/20150304-134212-222692874-5051-2300-0090 intMesosMaster02 mesosmarathon10.5128 MB8.333%a month agoa month ago…5051-2300-0054 http://intmesosmaster01:5051/#/frameworks/20150304-134212-222692874-5051-2300-0054 intMesosMaster01rootchronos-2.3.2_mesos-0.20.1-SNAPSHOT32.53.0 GB41.667%a month agoa month ago It seems that the chronos framework has reserved all the remaining cpu in the cluster but not given it to the jobs that need it (savings and weather). AlertConsumer is a marathon job that's always running and is working fine. On 16 June 2015 at 04:32, Akhil Das ak...@sigmoidanalytics.com wrote: Did you look inside all logs? Mesos logs and executor logs? Thanks Best Regards On Mon, Jun 15, 2015 at 7:09 PM, Gary Ogden gog...@gmail.com wrote: My Mesos cluster has 1.5 CPU and 17GB free. If I set: conf.set(spark.mesos.coarse, true); conf.set(spark.cores.max, 1); in the SparkConf object, the job will run in the mesos cluster fine. But if I comment out those settings above so that it defaults to fine grained, the task never finishes. It just shows as 0 for everything in the mesos frameworks (# of
Spark History Server pointing to S3
In Spark website it’s stated in the View After the Fact section (https://spark.apache.org/docs/latest/monitoring.html) that you can point the start-history-server.sh script to a directory in order do view the Web UI using the logs as data source. Is it possible to point that script to S3? Maybe from a EC2 instance? Thanks, Gianluca - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone mode and kerberized cluster
Thank you for the answer, it doesn't seem to work neither (I've not log into the machine as the spark user, but kinit inside the spark-env script), and also tried inside the job. I've notice when I run pyspark that the kerberos token is used for something, but this same behavior is not presented when I start a worker, so maybe those aren't think to use kerberos... On Tue, Jun 16, 2015 at 12:10 PM, Steve Loughran ste...@hortonworks.com wrote: On 15 Jun 2015, at 15:43, Borja Garrido Bear kazebo...@gmail.com wrote: I tried running the job in a standalone cluster and I'm getting this: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: worker-node/0.0.0.0; destination host is: hdfs:9000; Both nodes can access the HDFS running spark locally, and have valid kerberos credentials, I know for the moment keytab is not supported for standalone mode, but as long as the tokens I had when initiating the workers and masters are valid this should work, shouldn't it? I don't know anything about tokens on standalone. In YARN what we have to do is something called delegation tokens, the client asks (something) for tokens granting access to HDFS, and attaches that to the YARN container creation request, which is then handed off to the app master, which then gets to deal with (a) passing them down to launched workers and (b) dealing with token refresh (which is where keytabs come in to play) Why not try sshing in to the worker-node as the spark user and run kinit there to see if the problem goes away once you've logged in with Kerberos. If that works, you're going to have to automate that process across the cluster
RE: Optimizing Streaming from Websphere MQ
Thanks Akhil for taking this point, I am also talking about the MQ bottleneck. I am currently having 5 receivers for a unreliable Websphere MQ receiver implementations. Is there any proven way to convert this implementation to reliable one ? Regards, Umesh Chaudhary From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 16, 2015 12:44 PM To: Chaudhary, Umesh Cc: user@spark.apache.org Subject: Re: Optimizing Streaming from Websphere MQ Each receiver will run on 1 core. So if your network is not the bottleneck then to test the consumption speed of the receivers you can simply do a dstream.count.print to see how many records it can receive. (Also it will be available in the Streaming tab of the driver UI). If you spawn 10 receivers on 10 cores then possibly no processing will happen other than receiving. Now, on the other hand the MQ can also be the bottleneck (you could possibly configure it to achieve more parallelism) Thanks Best Regards On Mon, Jun 15, 2015 at 2:40 PM, Chaudhary, Umesh umesh.chaudh...@searshc.commailto:umesh.chaudh...@searshc.com wrote: Hi Akhil, Thanks for your response. I have 10 cores which sums of all my 3 machines and I am having 5-10 receivers. I have tried to test the processed number of records per second by varying number of receivers. If I am having 10 receivers (i.e. one receiver for each core), then I am not experiencing any performance benefit from it. Is it something related to the bottleneck of MQ or Reliable Receiver? From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Saturday, June 13, 2015 1:10 AM To: Chaudhary, Umesh Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Optimizing Streaming from Websphere MQ How many cores are you allocating for your job? And how many receivers are you having? It would be good if you can post your custom receiver code, it will help people to understand it better and shed some light. Thanks Best Regards On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh umesh.chaudh...@searshc.commailto:umesh.chaudh...@searshc.com wrote: Hi, I have created a Custom Receiver in Java which receives data from Websphere MQ and I am only writing the received records on HDFS. I have referred many forums for optimizing speed of spark streaming application. Here I am listing a few: • Spark Officialhttp://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning • VIrdatahttp://www.virdata.com/tuning-spark/ • TD’s Slide (A bit Old but Useful)http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 I got mainly two point for my applicability : • giving batch interval as 1 sec • Controlling “spark.streaming.blockInterval” =200ms • inputStream.repartition(3) But that did not improve my actual speed (records/sec) of receiver which is MAX 5-10 records /sec. This is way less from my expectation. Am I missing something? Regards, Umesh Chaudhary This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you. This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you. This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
stop streaming context of job failure
Hi all, Is there a way to stop streaming context when some batch processing failed? I want to set reasonable reties count, say 10, and if failed - stop context completely. Is that possible?
how to maintain the offset for spark streaming if HDFS is the source
Hi All, In my usecase HDFS file as source for Spark Stream, the job will process the data line by line but how will make sure to maintain the offset line number(data already processed) while restarting/new code push . Team can you please reply on this is there any configuration in Spark. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-maintain-the-offset-for-spark-streaming-if-HDFS-is-the-source-tp23336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: stop streaming context of job failure
https://spark.apache.org/docs/latest/monitoring.html also subscribe to various Listeners for various Metrcis Types e.g. Job Stats/Statuses - this will allow you (in the driver) to decide when to stop the context gracefully (the listening and stopping can be done from a completely separate thread in the driver) https://spark.apache.org/docs/latest/api/java/ org.apache.spark.ui.jobs Class JobProgressListener · Object · · org.apache.spark.ui.jobs.JobProgressListener · All Implemented Interfaces: Logging https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html , SparkListener https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html _ public class JobProgressListener extends Object implements SparkListener https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html , Logging https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html :: DeveloperApi :: Tracks task-level information to be displayed in the UI. All access to the data structures in this class must be synchronized on the class, since the UI thread and the EventBus loop may otherwise be reading and updating the internal data structures concurrently. · · From: Krot Viacheslav [mailto:krot.vyaches...@gmail.com] Sent: Tuesday, June 16, 2015 2:35 PM To: user@spark.apache.org Subject: stop streaming context of job failure Hi all, Is there a way to stop streaming context when some batch processing failed? I want to set reasonable reties count, say 10, and if failed - stop context completely. Is that possible?
Re: Spark SQL and Skewed Joins
a skew join (where the dominant key is spread across multiple executors) is pretty standard in other frameworks, see for example in scalding: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala this would be a great addition to spark, and ideally it belongs in spark core not sql. its also real a big data problem (single key is too large for executor), which makes it a hard sell in my experience. the interest in truly big data in spark community has been somewhat limited... On Tue, Jun 16, 2015 at 11:28 AM, Jon Walton jon.w.wal...@gmail.com wrote: On Fri, Jun 12, 2015 at 9:43 PM, Michael Armbrust mich...@databricks.com wrote: 2. Does 1.3.2 or 1.4 have any enhancements that can help? I tried to use 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is available, would any of the JOIN enhancements help this situation? I would try Spark 1.4 after running SET spark.sql.planner.sortMergeJoin=true. Please report back if this works for you. Hi Michael, This does help. The joins are faster and fewer executors are lost, but it seems the same core problem still exists - that a single executor is handling the majority of the join (the skewed key) and bottlenecking the job. One idea I had was to split the dimension table into two halves - a small half which can be broadcast, (with the skewed keys), and the other large half which could be sort merge joined, (with even distribution), and then performing two individual queries against the large fact table and union the results.Does this sound like a worthwhile approach? Thank you, Jon
Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space
Hi Josh It was great meeting u in person at the spark-summit SFO yesterday. Thanks for discussing potential solutions to the problem. I verified that 2 hive gateway nodes had not been configured correctly. My bad. I added hive-site.xml to the spark Conf directories for these 2 additional hive gateway nodes. Plus I increased the driver-memory parameter to 1gb. That solved the memory issue. So good news is I can get spark-SQL running in standalone mode (on a CDH 5.3.3 with spark 1.2 on YARN) Not so good news is that the following params have no effect --master yarn --deployment-mode client So the spark-SQL query runs with only ONE executor :-( I am planning on bugging u for 5-10 minutes at the Spark office hours :-) and hopefully we can solve this. Thanks Best regards Sanjay Sent from my iPhone On Jun 13, 2015, at 5:38 PM, Josh Rosen rosenvi...@gmail.com wrote: Try using Spark 1.4.0 with SQL code generation turned on; this should make a huge difference. On Sat, Jun 13, 2015 at 5:08 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: hey guys I tried the following settings as well. No luck --total-executor-cores 24 --executor-memory 4G BTW on the same cluster , impala absolutely kills it. same query 9 seconds. no memory issues. no issues. In fact I am pretty disappointed with Spark-SQL. I have worked with Hive during the 0.9.x stages and taken projects to production successfully and Hive actually very rarely craps out. Whether the spark folks like what I say or not, yes my expectations are pretty high of Spark-SQL if I were to change the ways we are doing things at my workplace. Until that time, we are going to be hugely dependent on Impala and Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow now). I want to clarify for those of u who may be asking - why I am not using spark with Scala and insisting on using spark-sql ? - I have already pipelined data from enterprise tables to Hive - I am using CDH 5.3.3 (Cloudera starving developers version) - I have close to 300 tables defined in Hive external tables. - Data if on HDFS - On an average we have 150 columns per table - One an everyday basis , we do crazy amounts of ad-hoc joining of new and old tables in getting datasets ready for supervised ML - I thought that quite simply I can point Spark to the Hive meta and do queries as I do - in fact the existing queries would work as is unless I am using some esoteric Hive/Impala function Anyway, if there are some settings I can use and get spark-sql to run even on standalone mode that will be huge help. On the pre-production cluster I have spark on YARN but could never get it to run fairly complex queries and I have no answers from this group of the CDH groups. So my assumption is that its possibly not solved , else I have always got very quick answers and responses :-) to my questions on all CDH groups, Spark, Hive best regards sanjay From: Josh Rosen rosenvi...@gmail.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Friday, June 12, 2015 7:15 AM Subject: Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space It sounds like this might be caused by a memory configuration problem. In addition to looking at the executor memory, I'd also bump up the driver memory, since it appears that your shell is running out of memory when collecting a large query result. Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys Using Hive and Impala daily intensively. Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster 32GB RAM per node 8 cores per node spark 1.2.0+cdh5.3.3+371 I am testing some stuff on one view and getting memory errors Possibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view aers.aers_demo_view (7 million+ records) === isr bigint case id event_dtbigint Event date age double age of patient age_cod string days,months years sex string M or F yearint quarter int VIEW DEFINITION CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`,
Re: Spark SQL and Skewed Joins
On Fri, Jun 12, 2015 at 9:43 PM, Michael Armbrust mich...@databricks.com wrote: 2. Does 1.3.2 or 1.4 have any enhancements that can help? I tried to use 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is available, would any of the JOIN enhancements help this situation? I would try Spark 1.4 after running SET spark.sql.planner.sortMergeJoin=true. Please report back if this works for you. Hi Michael, This does help. The joins are faster and fewer executors are lost, but it seems the same core problem still exists - that a single executor is handling the majority of the join (the skewed key) and bottlenecking the job. One idea I had was to split the dimension table into two halves - a small half which can be broadcast, (with the skewed keys), and the other large half which could be sort merge joined, (with even distribution), and then performing two individual queries against the large fact table and union the results.Does this sound like a worthwhile approach? Thank you, Jon
Re: Spark History Server pointing to S3
It gives me an exception with org.apache.spark.deploy.history.FsHistoryProvider , a problem with the file system. I can reproduce the exception if you want. It perfectly works if I give a local path, I tested it in 1.3.0 version. Gianluca On 16 Jun 2015, at 15:08, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: Not quiet sure, but try pointing the spark.history.fs.logDirectory to your s3 Thanks Best Regards On Tue, Jun 16, 2015 at 6:26 PM, Gianluca Privitera gianluca.privite...@studio.unibo.itmailto:gianluca.privite...@studio.unibo.it wrote: In Spark website it’s stated in the View After the Fact section (https://spark.apache.org/docs/latest/monitoring.html) that you can point the start-history-server.sh script to a directory in order do view the Web UI using the logs as data source. Is it possible to point that script to S3? Maybe from a EC2 instance? Thanks, Gianluca - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: ALS predictALL not completing
This is 1.3.1 Ayman Farahat -- View my research on my SSRN Author page: http://ssrn.com/author=1594571 From: Nick Pentreath nick.pentre...@gmail.com To: user@spark.apache.org user@spark.apache.org Sent: Tuesday, June 16, 2015 4:23 AM Subject: Re: ALS predictALL not completing Which version of Spark are you using? On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote: Hello; I have a data set of about 80 Million users and 12,000 items (very sparse ). I can get the training part working no problem. (model has 20 factors), However, when i try using Predict all for 80 Million x 10 items , the jib does not complete. When i use a smaller data set say 500k or a million it completes. Any ideas suggestions ? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does one decide no of executors/cores/memory allocation?
I realize that there are a lot of ways to configure my application in spark. The part that is not clear is that how do I decide say for example in how many partitions should I divide my data or how much ram should I have or how many workers should one initialize? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-tp23326p23339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The problem when share data inside Dstream
Hello guys, I faced one problem that I cannot pass my data inside rdd partition when I was trying to develop spark streaming feature.I'm the newcomer of Spark, could you please give me any suggestion on this problem? The figure in the attachment is the code I used in my program: After I run my code, I found these logs:Inside foreachRDD scope I can get the right indexGeneratorMap value which includes right kva instance, CassandraAccessDatastax@5adc7292 15/06/16 12:07:35 INFO streaming.AbstractSparkStreamingMain: [RDD][Index Generator Map]: {class com.worksap.company.framework.autoindex.edp.job.SparkStreamingTestEntity=AbstractIndexGenerator{kva=com.worksap.company.access.cassandra.CassandraAccessDatastax@5adc7292, indexName='spark_streaming_test_index'}} But in the foreachPartition scope, I found my kva instance is null. I totally have no idea about it. 15/06/16 12:07:28 INFO streaming.AbstractSparkStreamingMain: [Index Generator Map]: {class com.worksap.company.framework.autoindex.edp.job.SparkStreamingTestEntity=AbstractIndexGenerator{kva=null, indexName='spark_streaming_test_index'}} If someone can give me any tips, I will really appreciate your help. Thanks in advance. Best Regards, -- Gabriel Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unit Testing Spark Transformations/Actions
Hi there, I am looking to use Mockito to mock out some functionality while unit testing a Spark application. I currently have code that happily runs on a cluster, but fails when I try to run unit tests against it, throwing a SparkException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.f$14 of type org.apache.spark.api.java.function.VoidFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) (Full error/stacktrace and description on SO: http://stackoverflow.com/q/30871109/2687324). Has anyone experienced this error before while unit testing? Thanks, Mark
RE: How does one decide no of executors/cores/memory allocation?
Best is by measuring and recording how The Performance of your solution scales as The Workload scales - recording As In Data Points recording and then you can do some times series stat analysis and visualizations For example you can start with a single box with e.g. 8 CPU cores Use e.g. 1 or two partitions and 1 executor which would correspond to 1 CPU Core (JVM Thread) processing your workload - scale the workload and see how the performance scales and record all data points Then re[eat the same for more cpu cores, ram and boxes - you get the idea? Then analyze your performance datasets in the way explained Basically this stuff is known as Performance Engineering and has nothing to do with specific product - read something on PE as well -Original Message- From: shreesh [mailto:shreesh.la...@mail.com] Sent: Tuesday, June 16, 2015 4:22 PM To: user@spark.apache.org Subject: Re: How does one decide no of executors/cores/memory allocation? I realize that there are a lot of ways to configure my application in spark. The part that is not clear is that how do I decide say for example in how many partitions should I divide my data or how much ram should I have or how many workers should one initialize? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-o f-executors-cores-memory-allocation-tp23326p23339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating RDD from Iterable from groupByKey results
I updated code sample so people can understand better what are my inputs and outputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDD-from-Iterable-from-groupByKey-results-tp23328p23341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkR 1.4.0: read.df() function fails
Hi Esten, Looks like your sqlContext is connected to a Hadoop/Spark cluster, but the file path you specified is local?. mydf-read.df(sqlContext, /home/esten/ami/usaf.json, source=json”, Error below shows that the Input path you specified does not exist on the cluster. Pointing to the right hdfs path should be able to help here. Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json Guru Medasani gdm...@gmail.com On Jun 16, 2015, at 10:39 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: The error you are running into is that the input file does not exist -- You can see it from the following line Input path does not exist: hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json Thanks Shivaram On Tue, Jun 16, 2015 at 1:55 AM, esten erik.stens...@dnvgl.com mailto:erik.stens...@dnvgl.com wrote: Hi, In SparkR shell, I invoke: mydf-read.df(sqlContext, /home/esten/ami/usaf.json, source=json, header=false) I have tried various filetypes (csv, txt), all fail. RESPONSE: ERROR RBackendHandler: load on 1 failed BELOW THE WHOLE RESPONSE: 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(177600) called with curMem=0, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.4 KB, free 265.2 MB) 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(16545) called with curMem=177600, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.2 KB, free 265.2 MB) 15/06/16 08:09:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37142 (size: 16.2 KB, free: 265.4 MB) 15/06/16 08:09:13 INFO SparkContext: Created broadcast 0 from load at NativeMethodAccessorImpl.java:-2 15/06/16 08:09:16 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/06/16 08:09:17 ERROR RBackendHandler: load on 1 failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:127) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at
Re: SparkR 1.4.0: read.df() function fails
The error you are running into is that the input file does not exist -- You can see it from the following line Input path does not exist: hdfs://smalldata13.hdp:8020/ home/esten/ami/usaf.json Thanks Shivaram On Tue, Jun 16, 2015 at 1:55 AM, esten erik.stens...@dnvgl.com wrote: Hi, In SparkR shell, I invoke: mydf-read.df(sqlContext, /home/esten/ami/usaf.json, source=json, header=false) I have tried various filetypes (csv, txt), all fail. RESPONSE: ERROR RBackendHandler: load on 1 failed BELOW THE WHOLE RESPONSE: 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(177600) called with curMem=0, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.4 KB, free 265.2 MB) 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(16545) called with curMem=177600, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.2 KB, free 265.2 MB) 15/06/16 08:09:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37142 (size: 16.2 KB, free: 265.4 MB) 15/06/16 08:09:13 INFO SparkContext: Created broadcast 0 from load at NativeMethodAccessorImpl.java:-2 15/06/16 08:09:16 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/06/16 08:09:17 ERROR RBackendHandler: load on 1 failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:127) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at
HDFS not supported by databricks cloud :-(
hey guys After day one at the spark-summit SFO, I realized sadly that (indeed) HDFS is not supported by Databricks cloud.My speed bottleneck is to transfer ~1TB of snapshot HDFS data (250+ external hive tables) to S3 :-( I want to use databricks cloud but this to me is a starting disabler.The hard road for me will be (as I believe EVERYTHING is possible. The impossible just takes longer) - transfer all HDFS to S3- our org does not permit AWS server side encryption so I have figure out if AWS KMS encrypted S3 files can be read by Hive/Impala/Spark - modify all table locations in metadata to S3- modify all scripts to point and write to S3 instead of Any ideas / thoughts will be helpful. Till I can get the above figured out , I am going ahead and working hard to make spark-sql as the main workhorse for creating dataset (now its Hive and Impala) thanksregards sanjay
spark-sql CLI options does not work --master yarn --deploy-mode client
hey guys I have CDH 5.3.3 with Spark 1.2.0 (on Yarn) This does not work /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql --deploy-mode client --master yarn --driver-memory 1g -e select j.person_id, p.first_name, p.last_name, count(*) from (select person_id from cdr.cdr_mjp_joborder where person_id is not null) j join (select person_id, first_name, last_name from cdr.cdr_mjp_people where lower(last_name) like '%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, p.first_name, p.last_name This works but only one Executor is used/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql --driver-memory 1g -e select j.person_id, p.first_name, p.last_name, count(*) from (select person_id from cdr.cdr_mjp_joborder where person_id is not null) j join (select person_id, first_name, last_name from cdr.cdr_mjp_people where lower(last_name) like '%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, p.first_name, p.last_name Any thoughts ? I found a related link but I don't understand the language.http://blog.csdn.net/freedomboy319/article/details/46332009 thanks sanjay ERRORSError: JAVA_HOME is not set and could not be found.15/06/16 18:17:19 WARN Holder: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at org.eclipse.jetty.util.Loader.loadClass(Loader.java:100) at org.eclipse.jetty.util.Loader.loadClass(Loader.java:79) at org.eclipse.jetty.servlet.Holder.doStart(Holder.java:107) at org.eclipse.jetty.servlet.FilterHolder.doStart(FilterHolder.java:90) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:768) at org.eclipse.jetty.servlet.ServletHandler.updateMappings(ServletHandler.java:1357) at org.eclipse.jetty.servlet.ServletHandler.setFilterMappings(ServletHandler.java:1393) at org.eclipse.jetty.servlet.ServletHandler.addFilterMapping(ServletHandler.java:1113) at org.eclipse.jetty.servlet.ServletHandler.addFilterWithMapping(ServletHandler.java:979) at org.eclipse.jetty.servlet.ServletContextHandler.addFilter(ServletContextHandler.java:332) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:163) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:141) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.ui.JettyUtils$.addFilters(JettyUtils.scala:141) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter(YarnSchedulerBackend.scala:90) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:129) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)15/06/16 18:17:19 WARN AbstractLifeCycle: FAILED org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter-1c7ab89d: javax.servlet.UnavailableException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilterjavax.servlet.UnavailableException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at org.eclipse.jetty.servlet.Holder.doStart(Holder.java:114) at
Re: FW: MLLIB (Spark) Question.
+cc user@spark.apache.org Reply inline. On Tue, Jun 16, 2015 at 2:31 PM, Dhar Sauptik (CR/RTC1.3-NA) Sauptik.Dhar wrote: Hi DB, Thank you for the reply. That explains a lot. I however had a few points regarding this:- 1. Just to help with the debate of not regularizing the b parameter. A standard implementation argues against regularizing the b parameter. See Pg 64 para 1 : http://statweb.stanford.edu/~tibs/ElemStatLearn/ Agreed. We just worry about it will change behavior, but we actually have a PR to change the behavior to standard one, https://github.com/apache/spark/pull/6386 2. Further, is the regularization of b also applicable for the SGD implementation. Currently the SGD vs. BFGS implementations give different results (and both the implementations don't match the IRLS algorithm). Are the SGD/BFGS implemented for different loss functions? Can you please share your thoughts on this. In SGD implementation, we don't standardize the dataset before training. As a result, those columns with low standard deviation will be penalized more, and those with high standard deviation will be penalized less. Also, standardize will help the rate of convergence. As a result, in most of package, they standardize the data implicitly, and get the weights in the standardized space, and transform back to original space so it's transparent for users. 1) LORWithSGD: No standardization, and penalize the intercept. 2) LORWithLBFGS: With standardization but penalize the intercept. 3) New LOR implementation: With standardization without penalizing the intercept. As a result, only the new implementation in Spark ML handles everything correctly. We have tests to verify that the results match R. @Naveen: Please feel free to add/comment on the above points as you see necessary. Thanks, Sauptik. -Original Message- From: DB Tsai Sent: Tuesday, June 16, 2015 2:08 PM To: Ramakrishnan Naveen (CR/RTC1.3-NA) Cc: Dhar Sauptik (CR/RTC1.3-NA) Subject: Re: FW: MLLIB (Spark) Question. Hey, In the LORWithLBFGS api you use, the intercept is regularized while other implementations don't regularize the intercept. That's why you see the difference. The intercept should not be regularized, so we fix this in new Spark ML api in spark 1.4. Since this will change the behavior in the old api if we decide to not regularize the intercept in old version, we are still debating about this. See the following code for full running example in spark 1.4 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala And also check out my talk at spark summit. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Mon, Jun 15, 2015 at 11:58 AM, Ramakrishnan Naveen (CR/RTC1.3-NA) Naveen.Ramakrishnan wrote: Hi DB, Hope you are doing well! One of my colleagues, Sauptik, is working with MLLib and the logistic regression based on LBFGS and is having trouble reproducing the same results when compared to Matlab. Please see below for details. I did take a look into this but seems like there’s also discrepancy between the logistic regression with SGD and LBFGS implementations in MLLib. We have attached all the codes for your analysis – it’s in PySpark though. Let us know if you have any questions or concerns. We would very much appreciate your help whenever you get a chance. Best, Naveen. _ From: Dhar Sauptik (CR/RTC1.3-NA) Sent: Thursday, June 11, 2015 6:03 PM To: Ramakrishnan Naveen (CR/RTC1.3-NA) Subject: MLLIB (Spark) Question. Hi Naveen, I am writing this owing to some MLLIB issues I found while using Logistic Regression. Basically, I am trying to test the stability of the L1/L2 – Logistic Regression using SGD and BFGS. Unfortunately I am unable to confirm the correctness of the algorithms. For comparison I implemented the L2-Logistic regression algorithm (using IRLS algorithm Pg. 121) From the book http://web.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf . Unfortunately the solutions don’t match:- For example:- Using the Publicly available data (diabetes.csv) for L2 regularized Logistic Regression (with lamda = 0.1) we get, Solutions MATLAB CODE (IRLS):- w = 0.29429347080 0.550681766045083 0.0396336870148899 0.0641285712055971 0.101238592147879 0.261153541551578 0.178686710290069 b= -0.347396594061553 MLLIB (SGD):- (weights=[0.352873922589,0.420391294105,0.0100571908041,0.150724951988,0.238536959009,0.220329295188,0.269139932714], intercept=-0.0074992664631) MLLIB(LBFGS):- (weights=[0.787850211605,1.964589985,-0.209348425939,0.0278848173986,0.12729017522,1.58954647312,0.692671824394],
Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?
Hi Sujit, That's a good point. But 1-hot encoding will make our data changing from Terabytes to Petabytes, because we have tens of categorical attributes, and some of them contain thousands of categorical values. Is there any way to make a good balance of data size and right representation of categories? -Rex On Tue, Jun 16, 2015 at 1:27 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Rexx, In general (ie not Spark specific), its best to convert categorical data to 1-hot encoding rather than integers - that way the algorithm doesn't use the ordering implicit in the integer representation. -sujit On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote: Is it necessary to convert categorical data into integers? Any tips would be greatly appreciated! -Rex On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote: For clustering analysis, we need a way to measure distances. When the data contains different levels of measurement - *binary / categorical (nominal), counts (ordinal), and ratio (scale)* To be concrete, for example, working with attributes of *city, zip, satisfaction_level, price* In the meanwhile, the real data usually also contains string attributes, for example, book titles. The distance between two strings can be measured by minimum-edit-distance. In SPSS, it provides Two-Step Cluster, which can handle both ratio scale and ordinal numbers. What is right algorithm to do hierarchical clustering analysis with all these four-kind attributes above with *MLlib*? If we cannot find a right metric to measure the distance, an alternative solution is to do a topological data analysis (e.g. linkage, and etc). Can we do such kind of analysis with *GraphX*? -Rex
Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?
Hi Rexx, In general (ie not Spark specific), its best to convert categorical data to 1-hot encoding rather than integers - that way the algorithm doesn't use the ordering implicit in the integer representation. -sujit On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote: Is it necessary to convert categorical data into integers? Any tips would be greatly appreciated! -Rex On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote: For clustering analysis, we need a way to measure distances. When the data contains different levels of measurement - *binary / categorical (nominal), counts (ordinal), and ratio (scale)* To be concrete, for example, working with attributes of *city, zip, satisfaction_level, price* In the meanwhile, the real data usually also contains string attributes, for example, book titles. The distance between two strings can be measured by minimum-edit-distance. In SPSS, it provides Two-Step Cluster, which can handle both ratio scale and ordinal numbers. What is right algorithm to do hierarchical clustering analysis with all these four-kind attributes above with *MLlib*? If we cannot find a right metric to measure the distance, an alternative solution is to do a topological data analysis (e.g. linkage, and etc). Can we do such kind of analysis with *GraphX*? -Rex
Re: HDFS not supported by databricks cloud :-(
You could consider using Zeppelin and spark on yarn as an alternative. http://zeppelin.incubator.apache.org/ Simon On 16 Jun 2015, at 17:58, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys After day one at the spark-summit SFO, I realized sadly that (indeed) HDFS is not supported by Databricks cloud. My speed bottleneck is to transfer ~1TB of snapshot HDFS data (250+ external hive tables) to S3 :-( I want to use databricks cloud but this to me is a starting disabler. The hard road for me will be (as I believe EVERYTHING is possible. The impossible just takes longer) - transfer all HDFS to S3 - our org does not permit AWS server side encryption so I have figure out if AWS KMS encrypted S3 files can be read by Hive/Impala/Spark - modify all table locations in metadata to S3 - modify all scripts to point and write to S3 instead of Any ideas / thoughts will be helpful. Till I can get the above figured out , I am going ahead and working hard to make spark-sql as the main workhorse for creating dataset (now its Hive and Impala) thanks regards sanjay
Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?
Is it necessary to convert categorical data into integers? Any tips would be greatly appreciated! -Rex On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote: For clustering analysis, we need a way to measure distances. When the data contains different levels of measurement - *binary / categorical (nominal), counts (ordinal), and ratio (scale)* To be concrete, for example, working with attributes of *city, zip, satisfaction_level, price* In the meanwhile, the real data usually also contains string attributes, for example, book titles. The distance between two strings can be measured by minimum-edit-distance. In SPSS, it provides Two-Step Cluster, which can handle both ratio scale and ordinal numbers. What is right algorithm to do hierarchical clustering analysis with all these four-kind attributes above with *MLlib*? If we cannot find a right metric to measure the distance, an alternative solution is to do a topological data analysis (e.g. linkage, and etc). Can we do such kind of analysis with *GraphX*? -Rex
Pyspark Dense Matrix Multiply : One of them can fit in Memory
Hello I would like to Multiply two matrices C = A* B A is a m x k , B is a kxl k,l m so that B can easily fit in memory. Any ideas or suggestions how to do that in Pyspark? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Dense-Matrix-Multiply-One-of-them-can-fit-in-Memory-tp23344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EMR
That's great news. Can I assume spark on EMR supports kinesis to hbase pipeline? On 17 Jun 2015 05:29, kamatsuoka ken...@gmail.com wrote: Spark is now officially supported on Amazon Elastic Map Reduce: http://aws.amazon.com/elasticmapreduce/details/spark/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-EMR-tp23343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What happens when a streaming consumer job is killed then restarted?
I'd like to understand better what happens when a streaming consumer job (with direct streaming, but also with receiver-based streaming) is killed/terminated/crashes. Assuming it was processing a batch of RDD data, what happens when the job is restarted? How much state is maintained within Spark's checkpointing to allow for little or no data loss? For the direct streaming case, would we need to update offsets in Zookeeper to achieve more fault tolerance? I'm looking at https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html and it talks about the Write-Ahead Logs. Do they work with direct streaming? With write ahead logs in place, e.g. streaming from Kafka, where would a restarted consumer resume processing? E.g. it was processing Message# 25 out of 100 messages in the Kafka topic when it crashed or was terminated. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tp23348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom Spark metrics?
I'm looking at the doc here: https://spark.apache.org/docs/latest/monitoring.html. Is there a way to define custom metrics in Spark, via Coda Hale perhaps, and emit those? Can a custom metrics sink be defined? And, can such a sink collect some metrics, execute some metrics handling logic, then invoke a callback and notify the Spark consumers that had emitted the metrics that that logic has been executed? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Spark-metrics-tp23350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Suggestions for Posting on the User Mailing List
As discussed during the meetup, the following information should help while creating a topic on the User mailing list. 1) Version of Spark and Hadoop should be included to help reproduce the issue or understand if the issue is a version limitation 2) Explanation about the scenario in as much detail as possible. Specific to the purpose of the application and also an explanation of the pipeline (if applicable). 3) Specific log or stack traces for the issue that you are observing. A simple message with the error is good but a stack trace can help in abundance and add a lot of context. 4) Any miscellaneous/additional information about the environment. This is a broad suggestion and can be anything from hardware, environment setups, other factors that can possibly be responsible,etc. Thank you. Regards, Neelesh. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Suggestions-for-Posting-on-the-User-Mailing-List-tp23347.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is Spark's data retention policy?
What is Spark's data retention policy? As in, the jobs that are sent from the master to the worker nodes, how long do they persist on those nodes? What about the RDD data, how is that cleaned up? Are all RDD's cleaned up at GC time unless they've been .persist()'ed or .cache()'ed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-Spark-s-data-retention-policy-tp23349.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL and Skewed Joins
this would be a great addition to spark, and ideally it belongs in spark core not sql. I agree with the fact that this would be a great addition, but we would likely want a specialized SQL implementation for performance reasons.
Re: SparkR 1.4.0: read.df() function fails
Hello, Is the json file in HDFS or local? /home/esten/ami/usaf.json is this an HDFS path? Suggestions: 1) Specify file:/home/esten/ami/usaf.json 2) Or move the usaf.json file into HDFS since the application is looking for the file in HDFS. Please let me know if that helps. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-1-4-0-read-df-function-fails-tp2p23346.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame insertIntoJDBC parallelism while writing data into a DB table
I would really appreciate if someone could help me with this. On Monday, June 15, 2015, Mohammad Tariq donta...@gmail.com wrote: Hello list, The method *insertIntoJDBC(url: String, table: String, overwrite: Boolean)* provided by Spark DataFrame allows us to copy a DataFrame into a JDBC DB table. Similar functionality is provided by the *createJDBCTable(url: String, table: String, allowExisting: Boolean) *method. But if you look at the docs it says that *createJDBCTable *runs a *bunch of Insert statements* in order to copy the data. While the docs of *insertIntoJDBC *doesn't have any such statement. Could someone please shed some light on this? How exactly data gets inserted using *insertIntoJDBC *method? And if it is same as *createJDBCTable *method, then what exactly does *bunch of Insert statements* mean? What's the criteria to decide the number *inserts/bunch*? How are these bunches generated? *An example* could be creating a DataFrame by reading all the files stored in a given directory. If I just do *DataFrame.save()*, it'll create the same number of output files as the input files. What'll happen in case of *DataFrame.df.insertIntoJDBC()*? I'm really sorry to be pest of questions, but I could net get much help by Googling about this. Thank you so much for your valuable time. really appreciate it. [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti -- [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
ClassNotFound exception from closure
Hi folks, running into a pretty strange issue -- I have a ClassNotFound exception from a closure?! My code looks like this: val jRdd1 = table.map(cassRow={ val lst = List(cassRow.get[Option[Any]](0),cassRow.get[Option[Any]](1)) Row.fromSeq(lst) }) println(sThis one worked ...+jRdd1.first.toString()) println(SILLY ---) val sillyRDD=sc.parallelize(1 to 100) val jRdd2 = sillyRDD.map(value={ val cols = (0 to 2).map(i=foo).toList //3 foos per row println(sValus +cols.mkString(|)) Row.fromSeq(cols) }) println(sThis one worked too +jRdd2.first.toString()) and the exception I see goes: This one worked ...[Some(1234),Some(1434123162)] SILLY --- Exception in thread main java.lang.ClassNotFoundException: HardSparkJob$anonfun$3$anonfun$4 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.util.InnerClosureFinder$anon$4.visitMethodInsn(ClosureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at HardSparkJob$.testUnionViaRDD(SparkTest.scala:127) at HardSparkJob$.main(SparkTest.scala:104) at HardSparkJob.main(SparkTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I don't quite know what to make of this error. The stacktrace shows a problem with my code at sillyRDD.map(SparkTest.scala:127) I'm running Spark 1.4 CDH prebuilt with bin/spark-submit --class HardSparkJob --master mesos://$MESOS_MASTER ../MyJar.jar Any insight much appreciated
Re: DataFrame insertIntoJDBC parallelism while writing data into a DB table
When all else fails look at the source ;) Looks like createJDBCTable is deprecated, but otherwise goes to the same implementation as insertIntoJDBC... https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala You can also look at DataFrameWriter in the same package...Looks like all that code will eventually write via JDBCWriteDetails in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala...if I'm reading this correctly you'll have simultaneous writes from each partition but they don't appear to be otherwise batched (if you were thinking bulk inserts) On Mon, Jun 15, 2015 at 1:20 PM, Mohammad Tariq donta...@gmail.com wrote: Hello list, The method *insertIntoJDBC(url: String, table: String, overwrite: Boolean)* provided by Spark DataFrame allows us to copy a DataFrame into a JDBC DB table. Similar functionality is provided by the *createJDBCTable(url: String, table: String, allowExisting: Boolean) *method. But if you look at the docs it says that *createJDBCTable *runs a *bunch of Insert statements* in order to copy the data. While the docs of *insertIntoJDBC *doesn't have any such statement. Could someone please shed some light on this? How exactly data gets inserted using *insertIntoJDBC *method? And if it is same as *createJDBCTable *method, then what exactly does *bunch of Insert statements* mean? What's the criteria to decide the number *inserts/bunch*? How are these bunches generated? *An example* could be creating a DataFrame by reading all the files stored in a given directory. If I just do *DataFrame.save()*, it'll create the same number of output files as the input files. What'll happen in case of *DataFrame.df.insertIntoJDBC()*? I'm really sorry to be pest of questions, but I could net get much help by Googling about this. Thank you so much for your valuable time. really appreciate it. [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Spark or Storm
Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
If you run Spark on YARN, the simplest way is replace the $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run your application. The spark-submit script will upload this jar to YARN cluster automatically and then you can run your application as usual. It does not care about which version of Spark in your YARN cluster. 2015-06-17 10:42 GMT+08:00 Raghav Shankar raghav0110...@gmail.com: The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com wrote: If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark or Storm
I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster. TIA. Best Ayan On 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Incorrect ACL checking for partitioned table in Spark SQL-1.4
*Problem Statement:* While doing query on a partitioned table using Spark SQL (Version 1.4.0), access denied exception is observed on the partition the user doesn’t belong to (The user permission is controlled using HDF ACLs). The same works correctly in hive. *Usercase:* /To address Multitenancy/ Consider a table containing multiple customers and each customer with multiple facility. The table is partitioned by customer and facility. The user belonging to on facility will not have access to other facility. This is enforced using HDFS ACLs on corresponding directories. When querying on the table as ‘user1’ belonging to ‘facility1’ and ‘customer1’ on the particular partition (using ‘where’ clause) only the corresponding directory access should be verified and not the entire table. The above use case works as expected when using HIVE client, version 0.13.1 1.1.0. *The query used:* /select count(*) from customertable where customer=‘customer1’ and facility=‘facility1’/ *Below is the exception received in Spark-shell:* org.apache.hadoop.security.AccessControlException: Permission denied: user=user1, access=READ_EXECUTE, inode=/data/customertable/customer=customer2/facility=facility2”:root:supergroup:drwxrwx---:group::r-x,group:facility2:rwx at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkAccessAcl(FSPermissionChecker.java:351) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:253) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6512) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6494) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6419) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4954) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4915) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:826) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:612) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1971) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:693) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.org$apache$spark$sql$sources$HadoopFsRelation$FileStatusCache$$listLeafFilesAndDirs$1(interfaces.scala:390) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$2$$anonfun$apply$2.apply(interfaces.scala:402) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$2$$anonfun$apply$2.apply(interfaces.scala:402) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
Re: Spark or Storm
I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich eventUpstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.TIA. Best AyanOn 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
To clarify, I am using the spark standalone cluster. On Tuesday, June 16, 2015, Yanbo Liang yblia...@gmail.com wrote: If you run Spark on YARN, the simplest way is replace the $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run your application. The spark-submit script will upload this jar to YARN cluster automatically and then you can run your application as usual. It does not care about which version of Spark in your YARN cluster. 2015-06-17 10:42 GMT+08:00 Raghav Shankar raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com');: The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote: If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List
Re: FW: MLLIB (Spark) Question.
Hi Dhar, For standardization, we can disable it effectively by using different regularization on each component. Thus, we're solving the same problem but having better rate of convergence. This is one of the features I will implement. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 16, 2015 at 8:34 PM, Dhar Sauptik (CR/RTC1.3-NA) sauptik.d...@us.bosch.com wrote: Hi DB, Thank you for the reply. The answers makes sense. I do have just one more point to add. Note that it may be better to not implicitly standardize the data. Agreed that a number of algorithms benefit from such standardization, but for many applications with contextual information such standardization may not be desirable. Users can always perform the standardization themselves. However, that's just a suggestion. Again, thank you for the clarification. Thanks, Sauptik. -Original Message- From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Tuesday, June 16, 2015 2:49 PM To: Dhar Sauptik (CR/RTC1.3-NA); Ramakrishnan Naveen (CR/RTC1.3-NA) Cc: user@spark.apache.org Subject: Re: FW: MLLIB (Spark) Question. +cc user@spark.apache.org Reply inline. On Tue, Jun 16, 2015 at 2:31 PM, Dhar Sauptik (CR/RTC1.3-NA) Sauptik.Dhar wrote: Hi DB, Thank you for the reply. That explains a lot. I however had a few points regarding this:- 1. Just to help with the debate of not regularizing the b parameter. A standard implementation argues against regularizing the b parameter. See Pg 64 para 1 : http://statweb.stanford.edu/~tibs/ElemStatLearn/ Agreed. We just worry about it will change behavior, but we actually have a PR to change the behavior to standard one, https://github.com/apache/spark/pull/6386 2. Further, is the regularization of b also applicable for the SGD implementation. Currently the SGD vs. BFGS implementations give different results (and both the implementations don't match the IRLS algorithm). Are the SGD/BFGS implemented for different loss functions? Can you please share your thoughts on this. In SGD implementation, we don't standardize the dataset before training. As a result, those columns with low standard deviation will be penalized more, and those with high standard deviation will be penalized less. Also, standardize will help the rate of convergence. As a result, in most of package, they standardize the data implicitly, and get the weights in the standardized space, and transform back to original space so it's transparent for users. 1) LORWithSGD: No standardization, and penalize the intercept. 2) LORWithLBFGS: With standardization but penalize the intercept. 3) New LOR implementation: With standardization without penalizing the intercept. As a result, only the new implementation in Spark ML handles everything correctly. We have tests to verify that the results match R. @Naveen: Please feel free to add/comment on the above points as you see necessary. Thanks, Sauptik. -Original Message- From: DB Tsai Sent: Tuesday, June 16, 2015 2:08 PM To: Ramakrishnan Naveen (CR/RTC1.3-NA) Cc: Dhar Sauptik (CR/RTC1.3-NA) Subject: Re: FW: MLLIB (Spark) Question. Hey, In the LORWithLBFGS api you use, the intercept is regularized while other implementations don't regularize the intercept. That's why you see the difference. The intercept should not be regularized, so we fix this in new Spark ML api in spark 1.4. Since this will change the behavior in the old api if we decide to not regularize the intercept in old version, we are still debating about this. See the following code for full running example in spark 1.4 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala And also check out my talk at spark summit. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Mon, Jun 15, 2015 at 11:58 AM, Ramakrishnan Naveen (CR/RTC1.3-NA) Naveen.Ramakrishnan wrote: Hi DB, Hope you are doing well! One of my colleagues, Sauptik, is working with MLLib and the logistic regression based on LBFGS and is having trouble reproducing the same results when compared to Matlab. Please see below for details. I did take a look into this but seems like there’s also discrepancy between the logistic regression with SGD and LBFGS implementations in MLLib. We have attached all the codes for your analysis – it’s in PySpark though. Let us know if you have any questions or concerns. We would very much appreciate your help whenever you get a chance. Best, Naveen. _ From: Dhar Sauptik (CR/RTC1.3-NA) Sent: Thursday, June 11, 2015
Re: Spark or Storm
Probably overloading the question a bit. In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality possible with Spark streaming? During each phase of the data processing, the transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing How can this be achieved using Spark? On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich event Upstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster. TIA. Best Ayan On 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of partitions in join: Spark documentation misleading!
Please file a JIRA for it. On Mon, Jun 15, 2015 at 8:00 AM, mrm ma...@skimlinks.com wrote: Hi all, I was looking for an explanation on the number of partitions for a joined rdd. The documentation of Spark 1.3.1. says that: For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. https://spark.apache.org/docs/latest/configuration.html And the Partitioner.scala comments (line 51) state that: Unless spark.default.parallelism is set, the number of partitions will be the same as the number of partitions in the largest upstream RDD, as this should be least likely to cause out-of-memory errors. But this is misleading for the Python API where if you do rddA.join(rddB), the output number of partitions is the number of partitions of A plus the number of partitions of B! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/number-of-partitions-in-join-Spark-documentation-misleading-tp23316.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark or Storm
The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
questions on the waiting batches and scheduling delay in Streaming UI
Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the Waiting batches is 144. But the scheduling delay is 0. I am a bit confused. If the waiting batches is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
Re: Submitting Spark Applications using Spark Submit
If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not getting event logs = spark 1.3.1
Forgot to mention this is on standalone mode. Is my configuration wrong? Thanks, Liming On 15 Jun, 2015, at 11:26 pm, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, I have this in my spark-defaults.conf (same for hdfs): spark.eventLog.enabled true spark.eventLog.dir file:/tmp/spark-events spark.history.fs.logDirectory file:/tmp/spark-events While the app is running, there is a “.inprogress” directory. However when the job completes, the directory is always empty. I’m submitting the job like this, using either the Pi or world count examples: $ bin/spark-submit /opt/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/wordcount.py This used to be working in 1.2.1 and didn’t test 1.3.0. Regards, Liming - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com wrote: If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Submitting Spark Applications using Spark Submit
I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cassandra with jdbcRDD
I would suggest looking at https://github.com/datastax/spark-cassandra-connector On Tue, Jun 16, 2015 at 4:01 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: hi all! is there a way to connect cassandra with jdbcRDD ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cassandra-with-jdbcRDD-tp23335.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use DataFrame with MySQL
I just ran into this too. Thanks for the tip! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p23351.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to use more than 1 executor for spark streaming application with YARN
Hi, I am running a simple spark streaming application on hadoop 2.7.0/YARN (master: yarn-client) with 2 executors in different machines. However, while the app is running, I can see on the app web UI (tab executors) that only 1 executor keeps completing tasks over time, the other executor only works and completes tasks for some seconds. From the logs I can see an exception arising, though it is not clear what went wrong. Here is the yarn-nodemanager log: « 2015-06-17 00:29:50,967 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Starting resource-monitoring for container_1434391147618_0007_01_03 2015-06-17 00:29:50,977 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 286.5 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:29:53,991 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 463.7 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:29:57,009 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 465.7 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:00,024 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 467.6 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:03,032 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 474.0 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:06,041 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 480.2 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:09,053 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 540.9 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:12,068 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 550.9 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:15,075 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 551.1 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:18,090 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 30553 for container-id container_1434391147618_0007_01_03: 558.7 MB of 3 GB physical memory used; 2.7 GB of 6.3 GB virtual memory used 2015-06-17 00:30:20,157 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1434391147618_0007_01_03 is : 1 2015-06-17 00:30:20,157 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1434391147618_0007_01_03 and exit code: 1 ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:545) at org.apache.hadoop.util.Shell.run(Shell.java:456) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-06-17 00:30:20,157 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from container-launch. 2015-06-17 00:30:20,157 INFO
Re: What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?
Hi Peng, I got exactly same error! My shuffle data is also very large. Have you figured out a method to solve that? Thanks, Jia On Fri, Apr 24, 2015 at 7:59 AM, Peng Cheng pc...@uow.edu.au wrote: I'm deploying a Spark data processing job on an EC2 cluster, the job is small for the cluster (16 cores with 120G RAM in total), the largest RDD has only 76k+ rows. But heavily skewed in the middle (thus requires repartitioning) and each row has around 100k of data after serialization. The job always got stuck in repartitioning. Namely, the job will constantly get following errors and retries: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/spark-... I've tried to identify the problem but it seems like both memory and disk consumption of the machine throwing these errors are below 50%. I've also tried different configurations, including: let driver/executor memory use 60% of total memory. let netty to priortize JVM shuffling buffer. increase shuffling streaming buffer to 128m. use KryoSerializer and max out all buffers increase shuffling memoryFraction to 0.4 But none of them works. The small job always trigger the same series of errors and max out retries (upt to 1000 times). How to troubleshoot this thing in such situation? Thanks a lot if you have any clue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Configuration of spark.worker.cleanup.appDataTtl
Hi guys: I added a parameter spark.worker.cleanup.appDataTtl 3 * 24 * 3600 in my conf/spark-default.conf, then I start my spark cluster. However I got an exception: 15/06/16 14:25:14 INFO util.Utils: Successfully started service 'sparkWorker' on port 43344. 15/06/16 14:25:14 ERROR actor.OneForOneStrategy: exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NumberFormatException: For input string: 3 * 24 * 3600 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230) at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at scala.Option.map(Option.scala:145) at org.apache.spark.SparkConf.getLong(SparkConf.scala:194) at org.apache.spark.deploy.worker.Worker.init(Worker.scala:89) ... 18 more How to set this parameter correctly? BTW, I searched this property in http://spark.apache.org/docs/latest/configuration.html and got no match. This property was found in http://spark.apache.org/docs/latest/spark-standalone.html with a default value 7 * 24 * 3600 (7 days), which I also tried but also failed.Thanks Thanksamp;Best regards! San.Luo
Re: Spark Configuration of spark.worker.cleanup.appDataTtl
I think you have to using 604800 instead of 7 * 24 * 3600, obviously SparkConf will not do multiplication for you.. The exception is quite obvious: Caused by: java.lang.NumberFormatException: For input string: 3 * 24 * 3600 2015-06-16 14:52 GMT+08:00 luohui20...@sina.com: Hi guys: I added a parameter spark.worker.cleanup.appDataTtl 3 * 24 * 3600 in my conf/spark-default.conf, then I start my spark cluster. However I got an exception: 15/06/16 14:25:14 INFO util.Utils: Successfully started service 'sparkWorker' on port 43344. 15/06/16 14:25:14 ERROR actor.OneForOneStrategy: exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NumberFormatException: For input string: 3 * 24 * 3600 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230) at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at scala.Option.map(Option.scala:145) at org.apache.spark.SparkConf.getLong(SparkConf.scala:194) at org.apache.spark.deploy.worker.Worker.init(Worker.scala:89) ... 18 more How to set this parameter correctly? BTW, I searched this property in http://spark.apache.org/docs/latest/configuration.html and got no match. This property was found in http://spark.apache.org/docs/latest/spark-standalone.html with a default value 7 * 24 * 3600 (7 days), which I also tried but also failed. Thanks Thanksamp;Best regards! San.Luo
Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions
Hi all, Looks like data frame parquet writing is very broken in Spark 1.4.0. We had no problems with Spark 1.3. When trying to save a data frame with 569610608 rows. dfc.write.format(parquet).save(“/data/map_parquet_file) We get random results between runs. Caching the data frame in memory makes no difference. It looks like the write out misses some of the RDD partitions. We have an RDD with 6750 partitions. When we write out we get less files out than the number of partitions. When reading the data back in and running a count, we get smaller number of rows. I’ve tried counting the rows in all different ways. All return the same result, 560214031 rows, missing about 9.4 million rows (0.15%). qc.read.parquet(/data/map_parquet_file).count qc.read.parquet(/data/map_parquet_file).rdd.count qc.read.parquet(/data/map_parquet_file).mapPartitions{itr = var c = 0; itr.foreach(_ = c = c + 1); Seq(c).toIterator }.reduce(_ + _) Looking on HDFS the files, there are 6643 .parquet files. 107 missing partitions (about 0.15%). Then writing out the same cached DF again to a new file gives 6717 files on hdfs (about 33 files missing or 0.5%); dfc.write.parquet(“/data/map_parquet_file_2) And we get 566670107 rows back (about 3million missing ~0.5%); qc.read.parquet(/data/map_parquet_file_2).count Writing the same df out to json writes the expected number (6750) of parquet files and returns the right number of rows 569610608. dfc.write.format(json).save(/data/map_parquet_file_3) qc.read.format(json).load(/data/map_parquet_file_3).count One thing to note is that the parquet part files on HDFS are not the normal sequential part numbers like for the json output and parquet output in Spark 1.3. part-r-06151.gz.parquet part-r-118401.gz.parquet part-r-146249.gz.parquet part-r-196755.gz.parquet part-r-35811.gz.parquet part-r-55628.gz.parquet part-r-73497.gz.parquet part-r-97237.gz.parquet part-r-06161.gz.parquet part-r-118406.gz.parquet part-r-146254.gz.parquet part-r-196763.gz.parquet part-r-35826.gz.parquet part-r-55647.gz.parquet part-r-73500.gz.parquet _SUCCESS We are using MapR 4.0.2 for hdfs. Any ideas? Cheers, Nathan
Re: How does one decide no of executors/cores/memory allocation?
Hi Shreesh, You can definitely decide the how many partitions your data should break into by passing a, 'minPartition' argument in the method sc.textFile(input/path, minPartition) and 'numSlices' arg in method sc.parallelize(localCollection, numSlices). In fact there is always a option to specify the number of partitions you want with your RDD in all the method of creating a first hand RDD. moreover you can change the number of partitions any point of time by calling some of these methods on your RDD : 'coalesce(numPartitions)': Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. 'repartition(numPartitions)':Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. 'repartitionAndSortWithinPartitions(partitioner)': Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. You can set these property to tune your spark environment : spark.driver.cores Number of cores to use for the driver process, only in cluster mode. spark.executor.coresThe number of cores to use on each executor. spark.driver.memoryAmount of memory to use for the driver process, i.e. where SparkContext is initialized. spark.executor.memory Amount of memory to use per executor process, in the same format as JVM memory strings you can also set, the number of worker processes per node by initializing SPARK_WORKER_INSTANCES and the number of workers to start by initializing SPARK_EXECUTOR_INSTANCES in the spark_home/conf/spark-env.sh file. Thanks Himanshu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-tp23326p23330.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: If not stop StreamingContext gracefully, will checkpoint data be consistent?
Good question, with fileStream or textFileStream basically it will only takes in the files whose timestamp is the current timestamp https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172 and when checkpointing is enabled https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L324 it would restore the latest filenames from the checkpoint directory which i believe will kind of reprocess some files. Thanks Best Regards On Mon, Jun 15, 2015 at 2:49 PM, Haopu Wang hw...@qilinsoft.com wrote: Akhil, thank you for the response. I want to explore more. If the application is just monitoring a HDFS folder and output the word count of each streaming batch into also HDFS. When I kill the application _*before*_ spark takes a checkpoint, after recovery, spark will resume the processing from the timestamp of latest checkpoint. That means some files will be processed twice and duplicate results are generated. Please correct me if the understanding is wrong, thanks again! -- *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, June 15, 2015 3:48 PM *To:* Haopu Wang *Cc:* user *Subject:* Re: If not stop StreamingContext gracefully, will checkpoint data be consistent? I think it should be fine, that's the whole point of check-pointing (in case of driver failure etc). Thanks Best Regards On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang hw...@qilinsoft.com wrote: Hi, can someone help to confirm the behavior? Thank you! -Original Message- From: Haopu Wang Sent: Friday, June 12, 2015 4:57 PM To: user Subject: If not stop StreamingContext gracefully, will checkpoint data be consistent? This is a quick question about Checkpoint. The question is: if the StreamingContext is not stopped gracefully, will the checkpoint be consistent? Or I should always gracefully shutdown the application even in order to use the checkpoint? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: tasks won't run on mesos when using fine grained
Did you look inside all logs? Mesos logs and executor logs? Thanks Best Regards On Mon, Jun 15, 2015 at 7:09 PM, Gary Ogden gog...@gmail.com wrote: My Mesos cluster has 1.5 CPU and 17GB free. If I set: conf.set(spark.mesos.coarse, true); conf.set(spark.cores.max, 1); in the SparkConf object, the job will run in the mesos cluster fine. But if I comment out those settings above so that it defaults to fine grained, the task never finishes. It just shows as 0 for everything in the mesos frameworks (# of tasks, cpu, memory are all 0). There's nothing in the log files anywhere as to what's going on. Thanks
Spark+hive bucketing
Spark SQL document states: Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet What exactly does that mean?: - that writing to bucketed table wont respect this feature and data will be written in not bucketed manner? - that reading from bucketed table won't use this feature to improve performance? - both? Also, event if bucketing is not supported for reading - do we benefit from having bucketed table just because of the way data is stored in hdfs? If we read bucketed table in spark is it more likely that data from the same bucket will be processed by the same task/executor?
Re: Optimizing Streaming from Websphere MQ
Each receiver will run on 1 core. So if your network is not the bottleneck then to test the consumption speed of the receivers you can simply do a *dstream.count.print* to see how many records it can receive. (Also it will be available in the Streaming tab of the driver UI). If you spawn 10 receivers on 10 cores then possibly no processing will happen other than receiving. Now, on the other hand the MQ can also be the bottleneck (you could possibly configure it to achieve more parallelism) Thanks Best Regards On Mon, Jun 15, 2015 at 2:40 PM, Chaudhary, Umesh umesh.chaudh...@searshc.com wrote: Hi Akhil, Thanks for your response. I have 10 cores which sums of all my 3 machines and I am having 5-10 receivers. I have tried to test the processed number of records per second by varying number of receivers. If I am having 10 receivers (i.e. one receiver for each core), then I am not experiencing any performance benefit from it. Is it something related to the bottleneck of MQ or Reliable Receiver? *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Saturday, June 13, 2015 1:10 AM *To:* Chaudhary, Umesh *Cc:* user@spark.apache.org *Subject:* Re: Optimizing Streaming from Websphere MQ How many cores are you allocating for your job? And how many receivers are you having? It would be good if you can post your custom receiver code, it will help people to understand it better and shed some light. Thanks Best Regards On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh umesh.chaudh...@searshc.com wrote: Hi, I have created a Custom Receiver in Java which receives data from Websphere MQ and I am only writing the received records on HDFS. I have referred many forums for optimizing speed of spark streaming application. Here I am listing a few: · Spark Official http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning · VIrdata http://www.virdata.com/tuning-spark/ · TD’s Slide (A bit Old but Useful) http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 I got mainly two point for my applicability : · giving batch interval as 1 sec · Controlling “spark.streaming.blockInterval” =200ms · inputStream.repartition(3) But that did not improve my actual speed (records/sec) of receiver which is MAX 5-10 records /sec. This is way less from my expectation. Am I missing something? Regards, Umesh Chaudhary This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you. This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
Re: How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?
You can also look into https://spark.apache.org/docs/latest/tuning.html for performance tuning. Thanks Best Regards On Mon, Jun 15, 2015 at 10:28 PM, Rex X dnsr...@gmail.com wrote: Thanks very much, Akhil. That solved my problem. Best, Rex On Mon, Jun 15, 2015 at 2:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Something like this? val huge_data = sc.textFile(/path/to/first.csv).map(x = (x.split(\t)(1), x.split(\t)(0)) val gender_data = sc.textFile(/path/to/second.csv),map(x = (x.split(\t)(0), x)) val joined_data = huge_data.join(gender_data) joined_data.take(1000) Its scala btw, python api should also be similar. Thanks Best Regards On Sat, Jun 13, 2015 at 12:16 AM, Rex X dnsr...@gmail.com wrote: To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 1000 rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
Re: settings from props file seem to be ignored in mesos
Whats in your executor (that .tgz file) conf/spark-default.conf file? Thanks Best Regards On Mon, Jun 15, 2015 at 7:14 PM, Gary Ogden gog...@gmail.com wrote: I'm loading these settings from a properties file: spark.executor.memory=256M spark.cores.max=1 spark.shuffle.consolidateFiles=true spark.task.cpus=1 spark.deploy.defaultCores=1 spark.driver.cores=1 spark.scheduler.mode=FAIR Once the job is submitted to mesos, I can go to the spark UI for that job (hostname:4040) and on the environment tab. I see that those settings are there. If I then comment out all those settings and allow spark to use the defaults, it still appears to use the same settings in mesos. Under both runs, it still shows 1 task, 3 cpu, 1GB memory. Nothing seems to change no matter what is put in that props file, even if they show up in the spark environment tab.
Re: HiveContext saveAsTable create wrong partition
I found if I move the partitioned columns in schemaString and in Row to the end of the sequence, then it works correctly... On 16. juni 2015 11:14, patcharee wrote: Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet --- val schemaString = zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl val schema = StructType( schemaString.split( ).map(fieldName = if (fieldName.equals(zone) || fieldName.equals(z) || fieldName.equals(year) || fieldName.equals(month) || fieldName.equals(date) || fieldName.equals(hh) || fieldName.equals(x) || fieldName.equals(y)) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark) --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive hive select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032-96.6289125135.3652.6476808E-50.0 13195 000 hive select zone, z, year, month from test4dimBySpark; OK 13195000 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong zone=13195/z=0/year=0/month=0 Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复:Re: Spark Configuration of spark.worker.cleanup.appDataTtl
thanks saisai,I should try more times. I thought it will be caculated automatically as the default. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Saisai Shao sai.sai.s...@gmail.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Spark Configuration of spark.worker.cleanup.appDataTtl 日期:2015年06月16日 15点00分 I think you have to using 604800 instead of 7 * 24 * 3600, obviously SparkConf will not do multiplication for you.. The exception is quite obvious: Caused by: java.lang.NumberFormatException: For input string: 3 * 24 * 3600 2015-06-16 14:52 GMT+08:00 luohui20...@sina.com: Hi guys: I added a parameter spark.worker.cleanup.appDataTtl 3 * 24 * 3600 in my conf/spark-default.conf, then I start my spark cluster. However I got an exception: 15/06/16 14:25:14 INFO util.Utils: Successfully started service 'sparkWorker' on port 43344. 15/06/16 14:25:14 ERROR actor.OneForOneStrategy: exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NumberFormatException: For input string: 3 * 24 * 3600 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230) at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194) at scala.Option.map(Option.scala:145) at org.apache.spark.SparkConf.getLong(SparkConf.scala:194) at org.apache.spark.deploy.worker.Worker.init(Worker.scala:89) ... 18 more How to set this parameter correctly? BTW, I searched this property in http://spark.apache.org/docs/latest/configuration.html and got no match. This property was found in http://spark.apache.org/docs/latest/spark-standalone.html with a default value 7 * 24 * 3600 (7 days), which I also tried but also failed.Thanks Thanksamp;Best regards! San.Luo
SparkR 1.4.0: read.df() function fails
Hi, In SparkR shell, I invoke: mydf-read.df(sqlContext, /home/esten/ami/usaf.json, source=json, header=false) I have tried various filetypes (csv, txt), all fail. RESPONSE: ERROR RBackendHandler: load on 1 failed BELOW THE WHOLE RESPONSE: 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(177600) called with curMem=0, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.4 KB, free 265.2 MB) 15/06/16 08:09:13 INFO MemoryStore: ensureFreeSpace(16545) called with curMem=177600, maxMem=278302556 15/06/16 08:09:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.2 KB, free 265.2 MB) 15/06/16 08:09:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37142 (size: 16.2 KB, free: 265.4 MB) 15/06/16 08:09:13 INFO SparkContext: Created broadcast 0 from load at NativeMethodAccessorImpl.java:-2 15/06/16 08:09:16 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/06/16 08:09:17 ERROR RBackendHandler: load on 1 failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:127) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://smalldata13.hdp:8020/home/esten/ami/usaf.json at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120)
HiveContext saveAsTable create wrong partition
Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet --- val schemaString = zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl val schema = StructType( schemaString.split( ).map(fieldName = if (fieldName.equals(zone) || fieldName.equals(z) || fieldName.equals(year) || fieldName.equals(month) || fieldName.equals(date) || fieldName.equals(hh) || fieldName.equals(x) || fieldName.equals(y)) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark) --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive hive select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032-96.6289125135.3652.6476808E-50.0 131950 00 hive select zone, z, year, month from test4dimBySpark; OK 13195000 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong zone=13195/z=0/year=0/month=0 Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
cassandra with jdbcRDD
hi all! is there a way to connect cassandra with jdbcRDD ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cassandra-with-jdbcRDD-tp23335.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone mode and kerberized cluster
On 15 Jun 2015, at 15:43, Borja Garrido Bear kazebo...@gmail.commailto:kazebo...@gmail.com wrote: I tried running the job in a standalone cluster and I'm getting this: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: worker-node/0.0.0.0http://0.0.0.0/; destination host is: hdfs:9000; Both nodes can access the HDFS running spark locally, and have valid kerberos credentials, I know for the moment keytab is not supported for standalone mode, but as long as the tokens I had when initiating the workers and masters are valid this should work, shouldn't it? I don't know anything about tokens on standalone. In YARN what we have to do is something called delegation tokens, the client asks (something) for tokens granting access to HDFS, and attaches that to the YARN container creation request, which is then handed off to the app master, which then gets to deal with (a) passing them down to launched workers and (b) dealing with token refresh (which is where keytabs come in to play) Why not try sshing in to the worker-node as the spark user and run kinit there to see if the problem goes away once you've logged in with Kerberos. If that works, you're going to have to automate that process across the cluster
Re: Limit Spark Shuffle Disk Usage
Hi Al M, You should try proving more main memory to shuffle process and it might reduce spill on disk. The default configuration for shuffle memory fraction is 20% of the safe memory that means 16% of the overall heap memory. so when we set executor memory only a small fraction of it is used in the shuffle process which induces more n more spillage on disk but great thing here, we can actually change that fraction and provide more memory to shuffle you just need to set two properties: 1 : set 'spark.storage.memoryFraction' to 0.4 which is by default 0.6 2 : set 'spark.shuffle.memoryFraction' to 0.4 which is by default 0.2 this should make a significant difference in disk use of shuffle. Thank you - Himanshu Mehra -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279p23334.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS predictALL not completing
Which version of Spark are you using? On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote: Hello; I have a data set of about 80 Million users and 12,000 items (very sparse ). I can get the training part working no problem. (model has 20 factors), However, when i try using Predict all for 80 Million x 10 items , the jib does not complete. When i use a smaller data set say 500k or a million it completes. Any ideas suggestions ? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org