Porting R code to SparkR
Hi guys This is possibly going to sound like a vague, stupid question but I have a problem to solve and I need help. So any which way I go is only up :-) I have a bunch of R scripts (I am not a R expert) and we are currently evaluating how to translate these R scripts to SparkR data frame syntax. The goal is to use the Spark R parallel-ization As an example we are using say Corpus, tm_map , DocumentTermMatrix from the library("tm") How do we translate these to SparkR syntax ? Any pointers would be helpful. thanks sanjay
Re: Is it possible Running SparkR on 2 nodes without HDFS
Cool thanksI have a CDH 5.4.8 (Cloudera Starving Developers Version) with 1 NN and 4 DN and SPark is running but its 1.3.xI want to leverage this HDFS hive cluster for SparkR because we do all data munging here and produce datasets for ML. I am thinking of the following idea 1. Add 2 datanodes to the existing HDFS cluster thru Cloudera Manager2. Dont add any Spark Service to these two new nodes3. Download and install latest 1.5.1 Spark on these two datanodes4. Download and Install R on these 2 datanodes5. Configure spark as 1 master and 1 slave on one node . On second node, configure slave will report back if this works ! thanks sanjay From: shenLiu To: Sanjay Subramanian ; User Sent: Monday, November 9, 2015 10:23 PM Subject: RE: Is it possible Running SparkR on 2 nodes without HDFS #yiv4791623997 #yiv4791623997 --.yiv4791623997hmmessage P{margin:0px;padding:0px;}#yiv4791623997 body.yiv4791623997hmmessage{font-size:12pt;}#yiv4791623997 Hi Sanjay, It's better to use HDFS. otherwise you should have copies of the csv file on all worker node with same path. regardsShawn Date: Tue, 10 Nov 2015 02:06:16 + From: sanjaysubraman...@yahoo.com.INVALID To: user@spark.apache.org Subject: Is it possible Running SparkR on 2 nodes without HDFS hey guys I have a 2 node SparkR (1 master 1 slave)cluster on AWS using spark-1.5.1-bin-without-hadoop.tgz Running the SparkR job on the master node /opt/spark-1.5.1-bin-hadoop2.6/bin/sparkR --master spark://ip-xx-ppp-vv-ddd:7077 --packages com.databricks:spark-csv_2.10:1.2.0 --executor-cores 16 --num-executors 8 --executor-memory 8G --driver-memory 8g myRprogram.R org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 1.0 failed 4 times, most recent failure: Lost task 17.3 in stage 1.0 (TID 103, xx.ff.rr.tt): java.io.FileNotFoundException: File file:/mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at org.apache.hadoop.mapred.LineRecordReader.(LineRecord myRprogram.R library(SparkR) sc <- sparkR.init(appName="SparkR-CancerData-example")sqlContext <- sparkRSQL.init(sc) lds <- read.df(sqlContext, "file:///mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv", "com.databricks.spark.csv", header="true")sink("file:///mnt/local/1024gbxvdf1/leads_new_data_analyis.txt")summary(lds) This used to run when we had a single node SparkR installation regards sanjay
Is it possible Running SparkR on 2 nodes without HDFS
hey guys I have a 2 node SparkR (1 master 1 slave)cluster on AWS using spark-1.5.1-bin-without-hadoop.tgz Running the SparkR job on the master node /opt/spark-1.5.1-bin-hadoop2.6/bin/sparkR --master spark://ip-xx-ppp-vv-ddd:7077 --packages com.databricks:spark-csv_2.10:1.2.0 --executor-cores 16 --num-executors 8 --executor-memory 8G --driver-memory 8g myRprogram.R org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 1.0 failed 4 times, most recent failure: Lost task 17.3 in stage 1.0 (TID 103, xx.ff.rr.tt): java.io.FileNotFoundException: File file:/mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at org.apache.hadoop.mapred.LineRecordReader.(LineRecord myRprogram.R library(SparkR) sc <- sparkR.init(appName="SparkR-CancerData-example")sqlContext <- sparkRSQL.init(sc) lds <- read.df(sqlContext, "file:///mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv", "com.databricks.spark.csv", header="true")sink("file:///mnt/local/1024gbxvdf1/leads_new_data_analyis.txt")summary(lds) This used to run when we had a single node SparkR installation regards sanjay
Re: Spark-sql versus Impala versus Hive
Hi guys I am using CDH 5.3.3 and that comes with Hive 0.13.1 and Spark 1.2 So to answer your question its not Tez (that I believe comes with HortonWorks) This Hive query was run with hive defaults. I used additional hive params right now to improve the timingsSET mapreduce.job.reduces=16;SET mapreduce.tasktracker.map.tasks.maximum=24;SET mapreduce.tasktracker.reduce.tasks.maximum=16;SET mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;SET mapreduce.map.output.compress=true; Now Time taken: 140.139 seconds, Fetched: 29597 row(s)(surprisingly close to spark-sql now LOL. Time to tweak spark-sql now) EARLIER RESULTS Hive – 326.021 seconds, Fetched: 29597 row(s) Impala – Fetched 27625 row(s) in 17.02s spark-sql – Time taken: 120.236 seconds I don't have the bandwidth to manage individual components on the cluster :-) since I am solo doing all this and delivering ML solutions to production LOL.So I depend on distribution such as CDH. The downside is that one is always couple of versions behind. Thanks for your questions. regards sanjay From: Michael Armbrust To: user Sent: Thursday, June 18, 2015 3:25 PM Subject: Re: Spark-sql versus Impala versus Hive I would also love to see a more recent version of Spark SQL. There have been a lot of performance improvements between 1.2 and 1.4 :) On Thu, Jun 18, 2015 at 3:18 PM, Steve Nunez wrote: Interesting. What where the Hive settings? Specifically it would be useful to know if this was Hive on Tez. - Steve From: Sanjay Subramanian Reply-To: Sanjay Subramanian Date: Thursday, June 18, 2015 at 11:08 To: "user@spark.apache.org" Subject: Spark-sql versus Impala versus Hive I just published results of my findings herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/
Spark-sql versus Impala versus Hive
I just published results of my findings herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/
Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space
ok solved. Looks like breathing the the spark-summit SFO air for 3 days helped a lot ! Piping the 7 million records to local disk still runs out of memory.So piped the results into another Hive table. I can live with that :-) /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "use aers; create table unique_aers_demo as select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view " --driver-memory 4G --total-executor-cores 12 --executor-memory 4G thanks From: Sanjay Subramanian To: "user@spark.apache.org" Sent: Thursday, June 11, 2015 8:43 AM Subject: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space hey guys Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster32GB RAM per node8 cores per node | spark | 1.2.0+cdh5.3.3+371 | I am testing some stuff on one view and getting memory errorsPossibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view" aers.aers_demo_view (7 million+ records)===isr bigint case idevent_dt bigint Event dateage double age of patientage_cod string days,months yearssex string M or Fyear intquarter int VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter`FROM `aers`.`aers_demo_v1`UNION ALLSELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter`FROM `aers`.`aers_demo_v2`UNION ALLSELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter`FROM `aers`.`aers_demo_v3`UNION ALLSELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter`FROM `aers`.`aers_demo_v4`UNION ALLSELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter`FROM `aers`.`aers_demo_v5`UNION ALLSELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`, `aers_demo_v6`.`sex` AS `GNDR_COD`, `aers_demo_v6`.`year`, `aers_demo_v6`.`quarter`FROM `aers`.`aers_demo_v6`) `aers_demo_view` 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x01b99855, /10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)java.lang.OutOfMemoryError: Java heap space at org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42) at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34) at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507) at org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.
spark-sql CLI options does not work --master yarn --deploy-mode client
hey guys I have CDH 5.3.3 with Spark 1.2.0 (on Yarn) This does not work /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql --deploy-mode client --master yarn --driver-memory 1g -e "select j.person_id, p.first_name, p.last_name, count(*) from (select person_id from cdr.cdr_mjp_joborder where person_id is not null) j join (select person_id, first_name, last_name from cdr.cdr_mjp_people where lower(last_name) like '%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, p.first_name, p.last_name" This works but only one Executor is used/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql --driver-memory 1g -e "select j.person_id, p.first_name, p.last_name, count(*) from (select person_id from cdr.cdr_mjp_joborder where person_id is not null) j join (select person_id, first_name, last_name from cdr.cdr_mjp_people where lower(last_name) like '%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, p.first_name, p.last_name" Any thoughts ? I found a related link but I don't understand the language.http://blog.csdn.net/freedomboy319/article/details/46332009 thanks sanjay ERRORSError: JAVA_HOME is not set and could not be found.15/06/16 18:17:19 WARN Holder: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at org.eclipse.jetty.util.Loader.loadClass(Loader.java:100) at org.eclipse.jetty.util.Loader.loadClass(Loader.java:79) at org.eclipse.jetty.servlet.Holder.doStart(Holder.java:107) at org.eclipse.jetty.servlet.FilterHolder.doStart(FilterHolder.java:90) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:768) at org.eclipse.jetty.servlet.ServletHandler.updateMappings(ServletHandler.java:1357) at org.eclipse.jetty.servlet.ServletHandler.setFilterMappings(ServletHandler.java:1393) at org.eclipse.jetty.servlet.ServletHandler.addFilterMapping(ServletHandler.java:1113) at org.eclipse.jetty.servlet.ServletHandler.addFilterWithMapping(ServletHandler.java:979) at org.eclipse.jetty.servlet.ServletContextHandler.addFilter(ServletContextHandler.java:332) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:163) at org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:141) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.ui.JettyUtils$.addFilters(JettyUtils.scala:141) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter(YarnSchedulerBackend.scala:90) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:129) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)15/06/16 18:17:19 WARN AbstractLifeCycle: FAILED org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter-1c7ab89d: javax.servlet.UnavailableException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilterjavax.servlet.UnavailableException: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at org.eclipse.jetty.servlet.Holder.doStart(Holder.java:114) at org.eclipse.jetty.servlet.
HDFS not supported by databricks cloud :-(
hey guys After day one at the spark-summit SFO, I realized sadly that (indeed) HDFS is not supported by Databricks cloud.My speed bottleneck is to transfer ~1TB of snapshot HDFS data (250+ external hive tables) to S3 :-( I want to use databricks cloud but this to me is a starting disabler.The hard road for me will be (as I believe EVERYTHING is possible. The impossible just takes longer) - transfer all HDFS to S3- our org does not permit AWS server side encryption so I have figure out if AWS KMS encrypted S3 files can be read by Hive/Impala/Spark - modify all table locations in metadata to S3- modify all scripts to point and write to S3 instead of Any ideas / thoughts will be helpful. Till I can get the above figured out , I am going ahead and working hard to make spark-sql as the main workhorse for creating dataset (now its Hive and Impala) thanksregards sanjay
Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space
Hi Josh It was great meeting u in person at the spark-summit SFO yesterday. Thanks for discussing potential solutions to the problem. I verified that 2 hive gateway nodes had not been configured correctly. My bad. I added hive-site.xml to the spark Conf directories for these 2 additional hive gateway nodes. Plus I increased the driver-memory parameter to 1gb. That solved the memory issue. So good news is I can get spark-SQL running in standalone mode (on a CDH 5.3.3 with spark 1.2 on YARN) Not so good news is that the following params have no effect --master yarn --deployment-mode client So the spark-SQL query runs with only ONE executor :-( I am planning on bugging u for 5-10 minutes at the Spark office hours :-) and hopefully we can solve this. Thanks Best regards Sanjay Sent from my iPhone > On Jun 13, 2015, at 5:38 PM, Josh Rosen wrote: > > Try using Spark 1.4.0 with SQL code generation turned on; this should make a > huge difference. > >> On Sat, Jun 13, 2015 at 5:08 PM, Sanjay Subramanian >> wrote: >> hey guys >> >> I tried the following settings as well. No luck >> >> --total-executor-cores 24 --executor-memory 4G >> >> >> BTW on the same cluster , impala absolutely kills it. same query 9 seconds. >> no memory issues. no issues. >> >> In fact I am pretty disappointed with Spark-SQL. >> I have worked with Hive during the 0.9.x stages and taken projects to >> production successfully and Hive actually very rarely craps out. >> >> Whether the spark folks like what I say or not, yes my expectations are >> pretty high of Spark-SQL if I were to change the ways we are doing things at >> my workplace. >> Until that time, we are going to be hugely dependent on Impala and >> Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow >> now). >> >> I want to clarify for those of u who may be asking - why I am not using >> spark with Scala and insisting on using spark-sql ? >> >> - I have already pipelined data from enterprise tables to Hive >> - I am using CDH 5.3.3 (Cloudera starving developers version) >> - I have close to 300 tables defined in Hive external tables. >> - Data if on HDFS >> - On an average we have 150 columns per table >> - One an everyday basis , we do crazy amounts of ad-hoc joining of new and >> old tables in getting datasets ready for supervised ML >> - I thought that quite simply I can point Spark to the Hive meta and do >> queries as I do - in fact the existing queries would work as is unless I am >> using some esoteric Hive/Impala function >> >> Anyway, if there are some settings I can use and get spark-sql to run even >> on standalone mode that will be huge help. >> >> On the pre-production cluster I have spark on YARN but could never get it to >> run fairly complex queries and I have no answers from this group of the CDH >> groups. >> >> So my assumption is that its possibly not solved , else I have always got >> very quick answers and responses :-) to my questions on all CDH groups, >> Spark, Hive >> >> best regards >> >> sanjay >> >> >> >> From: Josh Rosen >> To: Sanjay Subramanian >> Cc: "user@spark.apache.org" >> Sent: Friday, June 12, 2015 7:15 AM >> Subject: Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: >> Java heap space >> >> It sounds like this might be caused by a memory configuration problem. In >> addition to looking at the executor memory, I'd also bump up the driver >> memory, since it appears that your shell is running out of memory when >> collecting a large query result. >> >> Sent from my phone >> >> >> >>> On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian >>> wrote: >>> >>> hey guys >>> >>> Using Hive and Impala daily intensively. >>> Want to transition to spark-sql in CLI mode >>> >>> Currently in my sandbox I am using the Spark (standalone mode) in the CDH >>> distribution (starving developer version 5.3.3) >>> 3 datanode hadoop cluster >>> 32GB RAM per node >>> 8 cores per node >>> >>> spark >>> 1.2.0+cdh5.3.3+371 >>> >>> >>> I am testing some stuff on one view and getting memory errors >>> Possibly reason is default memory per executor showing on 18080 is >>> 512M >>> >>> These options when used to start the spark-sql CLI does not seem to have >>> any effect >>> --to
Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space
hey guys I tried the following settings as well. No luck --total-executor-cores 24 --executor-memory 4G BTW on the same cluster , impala absolutely kills it. same query 9 seconds. no memory issues. no issues. In fact I am pretty disappointed with Spark-SQL.I have worked with Hive during the 0.9.x stages and taken projects to production successfully and Hive actually very rarely craps out. Whether the spark folks like what I say or not, yes my expectations are pretty high of Spark-SQL if I were to change the ways we are doing things at my workplace.Until that time, we are going to be hugely dependent on Impala and Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow now). I want to clarify for those of u who may be asking - why I am not using spark with Scala and insisting on using spark-sql ? - I have already pipelined data from enterprise tables to Hive- I am using CDH 5.3.3 (Cloudera starving developers version)- I have close to 300 tables defined in Hive external tables. - Data if on HDFS- On an average we have 150 columns per table- One an everyday basis , we do crazy amounts of ad-hoc joining of new and old tables in getting datasets ready for supervised ML- I thought that quite simply I can point Spark to the Hive meta and do queries as I do - in fact the existing queries would work as is unless I am using some esoteric Hive/Impala function Anyway, if there are some settings I can use and get spark-sql to run even on standalone mode that will be huge help. On the pre-production cluster I have spark on YARN but could never get it to run fairly complex queries and I have no answers from this group of the CDH groups. So my assumption is that its possibly not solved , else I have always got very quick answers and responses :-) to my questions on all CDH groups, Spark, Hive best regards sanjay From: Josh Rosen To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Friday, June 12, 2015 7:15 AM Subject: Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space It sounds like this might be caused by a memory configuration problem. In addition to looking at the executor memory, I'd also bump up the driver memory, since it appears that your shell is running out of memory when collecting a large query result. Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian wrote: hey guys Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster32GB RAM per node8 cores per node | spark | 1.2.0+cdh5.3.3+371 | I am testing some stuff on one view and getting memory errorsPossibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view" aers.aers_demo_view (7 million+ records)===isr bigint case idevent_dt bigint Event dateage double age of patientage_cod string days,months yearssex string M or Fyear intquarter int VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter`FROM `aers`.`aers_demo_v1`UNION ALLSELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter`FROM `aers`.`aers_demo_v2`UNION ALLSELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter`FROM `aers`.`aers_demo_v3`UNION ALLSELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter`FROM `aers`.`aers_demo_v4`UNION ALLSELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter`FROM `aers`.`aers_demo_v5`UNION ALLSELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`, `aers_demo_v6`.`sex` AS `GNDR_COD`, `aers_demo_v6`.`year`,
spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space
hey guys Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster32GB RAM per node8 cores per node | spark | 1.2.0+cdh5.3.3+371 | I am testing some stuff on one view and getting memory errorsPossibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view" aers.aers_demo_view (7 million+ records)===isr bigint case idevent_dt bigint Event dateage double age of patientage_cod string days,months yearssex string M or Fyear intquarter int VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter`FROM `aers`.`aers_demo_v1`UNION ALLSELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter`FROM `aers`.`aers_demo_v2`UNION ALLSELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter`FROM `aers`.`aers_demo_v3`UNION ALLSELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter`FROM `aers`.`aers_demo_v4`UNION ALLSELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter`FROM `aers`.`aers_demo_v5`UNION ALLSELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`, `aers_demo_v6`.`sex` AS `GNDR_COD`, `aers_demo_v6`.`year`, `aers_demo_v6`.`quarter`FROM `aers`.`aers_demo_v6`) `aers_demo_view` 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x01b99855, /10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)java.lang.OutOfMemoryError: Java heap space at org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42) at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34) at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507) at org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)15/06/11 08:36:40 ERROR Utils: Uncaught exception in thread task-result-getter-0java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Long.valueOf(Long.java:577) at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:113) at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:103) at com.esotericsoftware.kryo.Kry
Cant figure out spark-sql errors - switching to Impala - sorry guys
Cant figure out spark-sql errors - switching to Hive and Impala for now - sorry guys, no hard feelings From: Sanjay Subramanian To: Sanjay Subramanian ; user Sent: Saturday, May 30, 2015 1:52 PM Subject: Re: spark-sql errors any ideas guys ? how to solve this ? From: Sanjay Subramanian To: user Sent: Friday, May 29, 2015 5:29 PM Subject: spark-sql errors https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
Re: spark-sql errors
any ideas guys ? how to solve this ? From: Sanjay Subramanian To: user Sent: Friday, May 29, 2015 5:29 PM Subject: spark-sql errors https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
Re: Is anyone using Amazon EC2? (second attempt!)
I use spark on EC2 but it's a CDH 5.3.3 distribution (starving developer version) installed thru Cloudera Manager. Spark is configured to run on Yarn. Regards Sanjay Sent from my iPhone > On May 29, 2015, at 6:16 PM, roni wrote: > > Hi , > Any update on this? > I am not sure if the issue I am seeing is related .. > I have 8 slaves and when I created the cluster I specified ebs volume with > 100G. > I see on Ec2 8 volumes created and each attached to the corresponding slave. > But when I try to copy data on it , it complains that > /root/ephemeral-hdfs/bin/hadoop fs -cp /intersection > hdfs://ec2-54-149-112-136.us-west-2.compute.amazonaws.com:9010/ > > 2015-05-28 23:40:35,447 WARN hdfs.DFSClient (DFSOutputStream.java:run(562)) > - DataStreamer Exception > > org.apache.hadoop.ipc.RemoteException(java.io.IOException): File > /intersection/kmer150/commonGoodKmers/_temporary/_attempt_201504010056_0004_m_000428_3948/part-00428._COPYING_ > could only be replicated to 0 nodes instead of minReplication (=1). There > are 1 datanode(s) running and no node(s) are excluded in this operation. > > > > It shows only 1 datanode , but for ephermal-hdfs it shows 8 datanodes. > > Any thoughts? > > Thanks > > _R > > >> On Sat, May 23, 2015 at 7:24 AM, Joe Wass wrote: >> I used Spark on EC2 a while ago, but recent revisions seem to have broken >> the functionality. >> >> Is anyone actually using Spark on EC2 at the moment? >> >> The bug in question is: >> >> https://issues.apache.org/jira/browse/SPARK-5008 >> >> It makes it impossible to use persistent HDFS without a workround on each >> slave node. >> >> No-one seems to be interested in the bug, so I wonder if other people aren't >> actually having this problem. If this is the case, any suggestions? >> >> Joe >
spark-sql errors
https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS
ok guys , finally figured out how to get it running. I have detailed out the steps I did. Perhaps its clear to all you folks. To me it was not :-) Our Hadoop development environment - 3 node development hadoop cluster - Current version CDH 5.3.3 - Hive 0.13.1 - Spark 1.2.0 (standalone mode) - node1 (worker1, master) - node2 (worker2) - node3 (worker3) - Cloudera Manager to manage and update(using parcels) Steps to get spark-sql running - On every node(node1, node2, node3 above) - sudo cp -avi /etc/hive/conf/hive-site.xml /etc/spark/conf - Edit and add a line - sudo vi /opt/cloudera/parcels/CDH/lib/spark/bin/compute-classpath.sh - # added by sanjay for running Spark using hive metadata - CLASSPATH="$CLASSPATH:/opt/cloudera/parcels/CDH/lib/hive/lib/*" - Run spark SQL in CLI mode - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql - Run spark SQL in async mode - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select * from band.beatles where upper(first_name) like '%GEORGE%' " - Run spark SQL in "SQL File" mode - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -f get_names.hql From: Andrew Otto To: Sanjay Subramanian Cc: user Sent: Thursday, May 28, 2015 7:26 AM Subject: Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS val sqlContext = new HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL") It mostly works, but have been having issues with tables that contains a large amount of data: https://issues.apache.org/jira/browse/SPARK-6910 On May 27, 2015, at 20:52, Sanjay Subramanian wrote: hey guys On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , there are about 300+ hive tables.The data is stored an text (moving slowly to Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be able to define JOINS etc using a programming structure like this import org.apache.spark.sql.hive.HiveContextval sqlContext = new HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL") Is that the way to go ? Some guidance will be great. thanks sanjay
Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS
hey guys On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , there are about 300+ hive tables.The data is stored an text (moving slowly to Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be able to define JOINS etc using a programming structure like this import org.apache.spark.sql.hive.HiveContextval sqlContext = new HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL") Is that the way to go ? Some guidance will be great. thanks sanjay
Re: MappedRDD signature
Thanks Sean. that works and I started the join of this mappedRDD to another one I have.I have to internalize the use of Map versus FlatMap. Thinking Map Reduce Java Hadoop code often blinds me :-) From: Sean Owen To: Sanjay Subramanian Cc: Cheng Lian ; Jorge Lopez-Malla ; "user@spark.apache.org" Sent: Wednesday, January 28, 2015 11:44 AM Subject: Re: MappedRDD signature I think it's clear if you format your function reasonably: mjpJobOrderRDD.map(line => { val tokens = line.split("\t"); if (tokens.length == 164 && tokens(23) != null) { (tokens(23),tokens(7)) } }) In some cases the function returns nothing, in some cases a tuple. The return type is therefore Any. If you just mean to output a result in some cases and not others, you must use flatMap + Some + None: mjpJobOrderRDD.flatMap { line => val tokens = line.split("\t") if (tokens.length == 164 && tokens(23) != null) { Some((tokens(23),tokens(7))) } else { None } } On Wed, Jan 28, 2015 at 7:37 PM, Sanjay Subramanian wrote: > hey guys > > I am not following why this happens > > DATASET > === > Tab separated values (164 columns) > > Spark command 1 > > val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw") > val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens = > line.split("\t");(tokens(23),tokens(7))}) > mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[(String, String)] = > MappedRDD[18] at map at :14 > > > Spark command 2 > > val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw") > scala> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val > tokens = line.split("\t"); if (tokens.length == 164 && tokens(23) != null) > {(tokens(23),tokens(7))} }) > mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[Any] = MappedRDD[19] at > map at :14 > > > In the second case above , why does it say org.apache.spark.rdd.RDD[Any] and > not org.apache.spark.rdd.RDD[(String, String)] > > > thanks > > sanjay >
MappedRDD signature
hey guys I am not following why this happens DATASET===Tab separated values (164 columns) Spark command 1val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens = line.split("\t");(tokens(23),tokens(7))})mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[18] at map at :14 Spark command 2val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")scala> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens = line.split("\t"); if (tokens.length == 164 && tokens(23) != null) {(tokens(23),tokens(7))} }) mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[Any] = MappedRDD[19] at map at :14 In the second case above , why does it say org.apache.spark.rdd.RDD[Any] and not org.apache.spark.rdd.RDD[(String, String)] thanks sanjay
Re: FlatMapValues
cool let me adapt that. thanks a tonregardssanjay From: Sean Owen To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Monday, January 5, 2015 3:19 AM Subject: Re: FlatMapValues For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input => val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key => (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen wrote: > From the clarification below, the problem is that you are calling > flatMapValues, which is only available on an RDD of key-value tuples. > Your map function returns a tuple in one case but a String in the > other, so your RDD is a bunch of Any, which is not at all what you > want. You need to return a tuple in both cases, which is what Kapil > pointed out. > > However it's still not quite what you want. Your input is basically > [key value1 value2 value3] so you want to flatMap that to (key,value1) > (key,value2) (key,value3). flatMapValues does not come into play. > > On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian > wrote: >> My understanding is as follows >> >> STEP 1 (This would create a pair RDD) >> === >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }) >> >> STEP 2 >> === >> Since previous step created a pair RDD, I thought flatMapValues method will >> be applicable. >> But the code does not even compile saying that flatMapValues is not >> applicable to RDD :-( >> >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }).flatMapValues(skus => >> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) >> >> >> SUMMARY >> === >> when a dataset looks like the following >> >> 1,red,blue,green >> 2,yellow,violet,pink >> >> I want to output the following and I am asking how do I do that ? Perhaps my >> code is 100% wrong. Please correct me and educate me :-) >> >> 1,red >> 1,blue >> 1,green >> 2,yellow >> 2,violet >> 2,pink
Re: A spark newbie question
val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-CassandraLogsMessageTypeCount") val sc = new SparkContext(sconf)val inputDir = "/path/to/cassandralogs.txt" sc.textFile(inputDir).map(line => line.replace("\"", "")).map(line => (line.split(' ')(0) + " " + line.split(' ')(2), 1)).reduceByKey((v1,v2) => v1+v2).collect().foreach(println) If u want to save the file ==val outDir = "/path/to/output/dir/cassandra_logs" var outFile = outDir+"/"+"sparkout_" + System.currentTimeMillis sc.textFile(inputDir).map(line => line.replace("\"", "")).map(line => (line.split(' ')(0) + " " + line.split(' ')(2), 1)).reduceByKey((v1,v2) => v1+v2).saveToTextFile(outFile) The code is here (not elegant :-) but works) https://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/CassandraLogsMessageTypeCount.scala OUTPUT===(2014-06-27 PAUSE,1)(2014-06-27 START,2)(2014-06-27 STOP,1)(2014-06-25 STOP,1)(2014-06-27 RESTART,1)(2014-06-27 REWIND,2)(2014-06-25 START,3)(2014-06-25 PAUSE,1) hope this helps. Since u r new to Spark , it may help to learn using an IDE. I use IntelliJ and have many examples posted here.https://github.com/sanjaysubramanian/msfx_scala.git These are simple silly examples of my learning process :-) Plus IMHO , if u r planning on learning Spark, I would say YES to Scala and NO to Java. Yes its a diff paradigm but being a Java and Hadoop programmer for many years, I am excited to learn Scala as the language and use Spark. Its exciting. regards sanjay From: Aniket Bhatnagar To: Dinesh Vallabhdas ; "user@spark.apache.org" Sent: Sunday, January 4, 2015 11:07 AM Subject: Re: A spark newbie question Go through spark API documentation. Basically you have to do group by (date, message_type) and then do a count. On Sun, Jan 4, 2015, 9:58 PM Dinesh Vallabhdas wrote: A spark cassandra newbie question. Thanks in advance for the help.I have a cassandra table with 2 columns message_timestamp(timestamp) and message_type(text). The data is of the form2014-06-25 12:01:39 "START" 2014-06-25 12:02:39 "START" 2014-06-25 12:02:39 "PAUSE" 2014-06-25 14:02:39 "STOP" 2014-06-25 15:02:39 "START" 2014-06-27 12:01:39 "START" 2014-06-27 11:03:39 "STOP" 2014-06-27 12:03:39 "REWIND" 2014-06-27 12:04:39 "RESTART" 2014-06-27 12:05:39 "PAUSE" 2014-06-27 13:03:39 "REWIND" 2014-06-27 14:03:39 "START" I want to use spark(using java) to calculate counts of a message_type on a per day basis and store it back in cassandra in a new table with 3 columns (date,message_type,count).The result table should look like this2014-06-25 START 3 2014-06-25 STOP 1 2014-06-25 PAUSE 1 2014-06-27 START 2 2014-06-27 STOP 1 2014-06-27 PAUSE 1 2014-06-27 REWIND 2 2014-06-27 RESTART 1 I'm not proficient in scala and would like to use java.
Re: Joining by values
so I changed the code tordd1InvIndex.join(rdd2Pair).map(str => str._2).groupByKey().map(str => (str._1,str._2.toList)).collect().foreach(println) Now it prints. Don't worry I will work on this to not output as List(...) But I am hoping that the JOIN question that @Dilip asked is hopefully answered :-) (2,List(1001,1000,1002,1003, 1004,1001,1006,1007))(3,List(1011,1012,1013,1010, 1007,1009,1005,1008))(1,List(1001,1000,1002,1003, 1011,1012,1013,1010, 1004,1001,1006,1007, 1007,1009,1005,1008)) From: Shixiong Zhu To: Sanjay Subramanian Cc: dcmovva ; "user@spark.apache.org" Sent: Saturday, January 3, 2015 8:15 PM Subject: Re: Joining by values call `map(_.toList)` to convert `CompactBuffer` to `List` Best Regards,Shixiong Zhu 2015-01-04 12:08 GMT+08:00 Sanjay Subramanian : hi Take a look at the code here I wrotehttps://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/PairRddJoin.scala /*rdd1.txt 1~4,5,6,7 2~4,5 3~6,7 rdd2.txt 4~1001,1000,1002,1003 5~1004,1001,1006,1007 6~1007,1009,1005,1008 7~1011,1012,1013,1010 */ val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-PairRddJoin") val sc = new SparkContext(sconf) val rdd1 = "/path/to/rdd1.txt" val rdd2 = "/path/to/rdd2.txt" val rdd1InvIndex = sc.textFile(rdd1).map(x => (x.split('~')(0), x.split('~')(1))).flatMapValues(str => str.split(',')).map(str => (str._2, str._1)) val rdd2Pair = sc.textFile(rdd2).map(str => (str.split('~')(0), str.split('~')(1))) rdd1InvIndex.join(rdd2Pair).map(str => str._2).groupByKey().collect().foreach(println) This outputs the following . I think this may be essentially what u r looking for(I have to understand how to NOT print as CompactBuffer)(2,CompactBuffer(1001,1000,1002,1003, 1004,1001,1006,1007)) (3,CompactBuffer(1011,1012,1013,1010, 1007,1009,1005,1008)) (1,CompactBuffer(1001,1000,1002,1003, 1011,1012,1013,1010, 1004,1001,1006,1007, 1007,1009,1005,1008)) From: Sanjay Subramanian To: dcmovva ; "user@spark.apache.org" Sent: Saturday, January 3, 2015 12:19 PM Subject: Re: Joining by values This is my design. Now let me try and code it in Spark. rdd1.txt =1~4,5,6,72~4,53~6,7 rdd2.txt 4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010 TRANSFORM 1===map each value to key (like an inverted index)4~15~16~17~15~24~26~37~3 TRANSFORM 2===Join keys in transform 1 and rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010 TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010 TRANSFORM 4===join by key 1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010 From: dcmovva To: user@spark.apache.org Sent: Saturday, January 3, 2015 10:10 AM Subject: Joining by values I have a two pair RDDs in spark like this rdd1 = (1 -> [4,5,6,7]) (2 -> [4,5]) (3 -> [6,7]) rdd2 = (4 -> [1001,1000,1002,1003]) (5 -> [1004,1001,1006,1007]) (6 -> [1007,1009,1005,1008]) (7 -> [1011,1012,1013,1010]) I would like to combine them to look like this. joinedRdd = (1 -> [1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013]) (2 -> [1000,1001,1002,1003,1004,1006,1007]) (3 -> [1005,1007,1008,1009,1010,1011,1012,1013]) Can someone suggest me how to do this. Thanks Dilip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Joining by values
hi Take a look at the code here I wrotehttps://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/PairRddJoin.scala /*rdd1.txt 1~4,5,6,7 2~4,5 3~6,7 rdd2.txt 4~1001,1000,1002,1003 5~1004,1001,1006,1007 6~1007,1009,1005,1008 7~1011,1012,1013,1010 */ val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-PairRddJoin") val sc = new SparkContext(sconf) val rdd1 = "/path/to/rdd1.txt" val rdd2 = "/path/to/rdd2.txt" val rdd1InvIndex = sc.textFile(rdd1).map(x => (x.split('~')(0), x.split('~')(1))).flatMapValues(str => str.split(',')).map(str => (str._2, str._1)) val rdd2Pair = sc.textFile(rdd2).map(str => (str.split('~')(0), str.split('~')(1))) rdd1InvIndex.join(rdd2Pair).map(str => str._2).groupByKey().collect().foreach(println) This outputs the following . I think this may be essentially what u r looking for(I have to understand how to NOT print as CompactBuffer)(2,CompactBuffer(1001,1000,1002,1003, 1004,1001,1006,1007)) (3,CompactBuffer(1011,1012,1013,1010, 1007,1009,1005,1008)) (1,CompactBuffer(1001,1000,1002,1003, 1011,1012,1013,1010, 1004,1001,1006,1007, 1007,1009,1005,1008)) From: Sanjay Subramanian To: dcmovva ; "user@spark.apache.org" Sent: Saturday, January 3, 2015 12:19 PM Subject: Re: Joining by values This is my design. Now let me try and code it in Spark. rdd1.txt =1~4,5,6,72~4,53~6,7 rdd2.txt 4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010 TRANSFORM 1===map each value to key (like an inverted index)4~15~16~17~15~24~26~37~3 TRANSFORM 2===Join keys in transform 1 and rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010 TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010 TRANSFORM 4===join by key 1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010 From: dcmovva To: user@spark.apache.org Sent: Saturday, January 3, 2015 10:10 AM Subject: Joining by values I have a two pair RDDs in spark like this rdd1 = (1 -> [4,5,6,7]) (2 -> [4,5]) (3 -> [6,7]) rdd2 = (4 -> [1001,1000,1002,1003]) (5 -> [1004,1001,1006,1007]) (6 -> [1007,1009,1005,1008]) (7 -> [1011,1012,1013,1010]) I would like to combine them to look like this. joinedRdd = (1 -> [1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013]) (2 -> [1000,1001,1002,1003,1004,1006,1007]) (3 -> [1005,1007,1008,1009,1010,1011,1012,1013]) Can someone suggest me how to do this. Thanks Dilip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Joining by values
This is my design. Now let me try and code it in Spark. rdd1.txt =1~4,5,6,72~4,53~6,7 rdd2.txt 4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010 TRANSFORM 1===map each value to key (like an inverted index)4~15~16~17~15~24~26~37~3 TRANSFORM 2===Join keys in transform 1 and rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010 TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010 TRANSFORM 4===join by key 1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010 From: dcmovva To: user@spark.apache.org Sent: Saturday, January 3, 2015 10:10 AM Subject: Joining by values I have a two pair RDDs in spark like this rdd1 = (1 -> [4,5,6,7]) (2 -> [4,5]) (3 -> [6,7]) rdd2 = (4 -> [1001,1000,1002,1003]) (5 -> [1004,1001,1006,1007]) (6 -> [1007,1009,1005,1008]) (7 -> [1011,1012,1013,1010]) I would like to combine them to look like this. joinedRdd = (1 -> [1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013]) (2 -> [1000,1001,1002,1003,1004,1006,1007]) (3 -> [1005,1007,1008,1009,1010,1011,1012,1013]) Can someone suggest me how to do this. Thanks Dilip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
@lailaBased on the error u mentioned in the nabble link below, it seems like there are no permissions to write to HDFS. So this is possibly why saveAsTextFile is failing. From: Pankaj Narang To: user@spark.apache.org Sent: Saturday, January 3, 2015 4:07 AM Subject: Re: saveAsTextFile If you can paste the code here I can certainly help. Also confirm the version of spark you are using Regards Pankaj Infoshore Software India -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
OK this is how I solved it. Not elegant at all but works and I need to move ahead at this time.Converting to pair RDD is now not required. reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 10 && !fields(0).contains("VAERS_ID")) { ((fields(0)+","+fields(1)+"\t"+fields(0)+","+fields(3)+"\t"+fields(0)+","+fields(5)+"\t"+fields(0)+","+fields(7)+"\t"+fields(0)+","+fields(9))) } else { ("") } }).flatMap(str => str.split('\t')).filter(line => line.toString.length() > 0).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) From: Sanjay Subramanian To: Hitesh Khamesra Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 12:39 PM Subject: Re: FlatMapValues thanks let me try that out From: Hitesh Khamesra To: Sanjay Subramanian Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+f
Re: FlatMapValues
thanks let me try that out From: Hitesh Khamesra To: Sanjay Subramanian Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
Hey Kapil, Fernando Thanks for your mail. [1] Fernando, if I don't use an "if" logic inside the "map" then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the "if" is to safeguard against that. [2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying that flatMapValues is not defined. In fact when I used your snippet , the code still does not compile Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) ^ My pom.xml looks like this org.apache.spark spark-core_2.10 1.2.0 org.apache.spark spark-sql_2.10 1.2.0 [3] To summarize all I want is to convert SUMMARY===when a dataset looks like the following 1,red,blue,green2,yellow,violet,pink I want to output the following and currently not able to 1,red1,blue1,green2,yellow2,violet2,pink thanks regards sanjay From: Fernando O. To: Kapil Malik Cc: Sanjay Subramanian ; "user@spark.apache.org" Sent: Wednesday, December 31, 2014 6:06 AM Subject: Re: FlatMapValues Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))// Returns a pair (String, String), good } else { ""// Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { ("","") } }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) thanks sanjay
FlatMapValues
hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) thanks sanjay
Re: How to identify erroneous input record ?
Although not elegantly I got the output via my code but totally agree on the parsing 5 times (thats really bad).Will add your suggested logic and check it out. I have a "long" way to the finish line. I am re-architecting my entire hadoop code and getting it onto spark. Check out what I do at www.medicalsidefx.orgPrimarily an iPhone app but underlying is Lucene, Hadoop and hopefully soon in 2015 - Spark :-) From: Sean Owen To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 24, 2014 8:56 AM Subject: Re: How to identify erroneous input record ? I don't believe that works since your map function does not return a value for lines shorter than 13 tokens. You should use flatMap and Some/None. (You probably want to not parse the string 5 times too.) val demoRddFilterMap = demoRddFilter.flatMap { line => val tokens = line.split('$') if (tokens.length >= 13) { val parsed = tokens(0) + "~" + tokens(5) + "~" + tokens(11) + "~" + tokens(12) Some(parsed) } else { None } } On Wed, Dec 24, 2014 at 4:35 PM, Sanjay Subramanian wrote: > DOH Looks like I did not have enough coffee before I asked this :-) > I added the if statement... > > var demoRddFilter = demoRdd.filter(line => > !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || > !line.contains("primaryid$caseid$caseversion")) > var demoRddFilterMap = demoRddFilter.map(line => { > if (line.split('$').length >= 13){ > line.split('$')(0) + "~" + line.split('$')(5) + "~" + > line.split('$')(11) + "~" + line.split('$')(12) > } > }) > > > > From: Sanjay Subramanian > To: "user@spark.apache.org" > Sent: Wednesday, December 24, 2014 8:28 AM > Subject: How to identify erroneous input record ? > > hey guys > > One of my input records has an problem that makes the code fail. > > var demoRddFilter = demoRdd.filter(line => > !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || > !line.contains("primaryid$caseid$caseversion")) > > var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + > line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12)) > > demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile) > > > This is possibly happening because perhaps one input record may not have 13 > fields. > > If this were Hadoop mapper code , I have 2 ways to solve this > > 1. test the number of fields of each line before applying the map function > > 2. enclose the mapping function in a try catch block so that the mapping > function only fails for the erroneous record > > How do I implement 1. or 2. in the Spark code ? > > Thanks > > > sanjay > > > > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to identify erroneous input record ?
DOH Looks like I did not have enough coffee before I asked this :-) I added the if statement...var demoRddFilter = demoRdd.filter(line => !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || !line.contains("primaryid$caseid$caseversion")) var demoRddFilterMap = demoRddFilter.map(line => { if (line.split('$').length >= 13){ line.split('$')(0) + "~" + line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12) } }) From: Sanjay Subramanian To: "user@spark.apache.org" Sent: Wednesday, December 24, 2014 8:28 AM Subject: How to identify erroneous input record ? hey guys One of my input records has an problem that makes the code fail. var demoRddFilter = demoRdd.filter(line => !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || !line.contains("primaryid$caseid$caseversion")) var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12))demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile) This is possibly happening because perhaps one input record may not have 13 fields.If this were Hadoop mapper code , I have 2 ways to solve this 1. test the number of fields of each line before applying the map function2. enclose the mapping function in a try catch block so that the mapping function only fails for the erroneous recordHow do I implement 1. or 2. in the Spark code ?Thanks sanjay #yiv8750085330 #yiv8750085330 -- filtered {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;}#yiv8750085330 filtered {panose-1:2 4 5 3 5 4 6 3 2 4;}#yiv8750085330 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv8750085330 p.yiv8750085330MsoNormal, #yiv8750085330 li.yiv8750085330MsoNormal, #yiv8750085330 div.yiv8750085330MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv8750085330 a:link, #yiv8750085330 span.yiv8750085330MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv8750085330 a:visited, #yiv8750085330 span.yiv8750085330MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv8750085330 p.yiv8750085330MsoListParagraph, #yiv8750085330 li.yiv8750085330MsoListParagraph, #yiv8750085330 div.yiv8750085330MsoListParagraph {margin-top:0cm;margin-right:0cm;margin-bottom:0cm;margin-left:36.0pt;margin-bottom:.0001pt;font-size:11.0pt;}#yiv8750085330 span.yiv8750085330EstiloCorreo17 {color:windowtext;}#yiv8750085330 .yiv8750085330MsoChpDefault {}#yiv8750085330 filtered {margin:70.85pt 3.0cm 70.85pt 3.0cm;}#yiv8750085330 div.yiv8750085330WordSection1 {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 ol {margin-bottom:0cm;}#yiv8750085330 ul {margin-bottom:0cm;}#yiv8750085330
How to identify erroneous input record ?
hey guys One of my input records has an problem that makes the code fail. var demoRddFilter = demoRdd.filter(line => !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || !line.contains("primaryid$caseid$caseversion")) var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12))demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile) This is possibly happening because perhaps one input record may not have 13 fields.If this were Hadoop mapper code , I have 2 ways to solve this 1. test the number of fields of each line before applying the map function2. enclose the mapping function in a try catch block so that the mapping function only fails for the erroneous recordHow do I implement 1. or 2. in the Spark code ?Thanks sanjay
Re: Spark or MR, Scala or Java?
Thanks a ton Ashishsanjay From: Ashish Rangole To: Sanjay Subramanian Cc: Krishna Sankar ; Sean Owen ; Guillermo Ortiz ; user Sent: Sunday, November 23, 2014 11:03 AM Subject: Re: Spark or MR, Scala or Java? This being a very broad topic, a discussion can quickly get subjective. I'll try not to deviate from my experiences and observations to keep this thread useful to those looking for answers. I have used Hadoop MR (with Hive, MR Java apis, Cascading and Scalding) as well as Spark (since v 0.6) in Scala. I learnt Scala for using Spark. My observations are below. Spark and Hadoop MR:1. There doesn't have to be a dichotomy between Hadoop ecosystem and Spark since Spark is a part of it. 2. Spark or Hadoop MR, there is no getting away from learning how partitioning, input splits, and shuffle process work. In order to optimize performance, troubleshoot and design software one must know these. I recommend reading first 6-7 chapters of "Hadoop The definitive Guide" book to develop initial understanding. Indeed knowing a couple of divide and conquer algorithms is a pre-requisite and I assume everyone on this mailing list is very familiar :) 3. Having used a lot of different APIs and layers of abstraction for Hadoop MR, my experience progressing from MR Java API --> Cascading --> Scalding is that each new API looks "simpler" than the previous one. However, Spark API and abstraction has been simplest. Not only for me but those who I have seen start with Hadoop MR or Spark first. It is easiest to get started and become productive with Spark with the exception of Hive for those who are already familiar with SQL. Spark's ease of use is critical for teams starting out with Big Data. 4. It is also extremely simple to chain multi-stage jobs in Spark, you do it without even realizing by operating over RDDs. In Hadoop MR, one has to handle it explicitly. 5. Spark has built-in support for graph algorithms (including Bulk Synchronous Parallel processing BSP algorithms e.g. Pregel), Machine Learning and Stream processing. In Hadoop MR you need a separate library/Framework for each and it is non-trivial to combine multiple of these in the same application. This is huge! 6. In Spark one does have to learn how to configure the memory and other parameters of their cluster. Just to be clear, similar parameters exist in MR as well (e.g. shuffle memory parameters) but you don't *have* to learn about tuning them until you have jobs with larger data size jobs. In Spark you learn this by reading the configuration and tuning documentation followed by experimentation. This is an area of Spark where things can be better. Java or Scala : I knew Java already yet I learnt Scala when I came across Spark. As others have said, you can get started with a little bit of Scala and learn more as you progress. Once you have started using Scala for a few weeks you would want to stay with it instead of going back to Java. Scala is arguably more elegant and less verbose than Java which translates into higher developer productivity and more maintainable code. Myth: Spark is for in-memory processing *only*. This is a common beginner misunderstanding. Sanjay: Spark uses Hadoop API for performing I/O from file systems (local, HDFS, S3 etc). Therefore you can use the same Hadoop InputFormat and RecordReader with Spark that you use with Hadoop for your multi-line record format. See SparkContext APIs. Just like Hadoop, you will need to make sure that your files are split at record boundaries. Hope this is helpful. On Sun, Nov 23, 2014 at 8:35 AM, Sanjay Subramanian wrote: I am a newbie as well to Spark. Been Hadoop/Hive/Oozie programming extensively before this. I use Hadoop(Java MR code)/Hive/Impala/Presto on a daily basis. To get me jumpstarted into Spark I started this gitHub where there is "IntelliJ-ready-To-run" code (simple examples of jon, sparksql etc) and I will keep adding to that. I dont know scala and I am learning that too to help me use Spark better.https://github.com/sanjaysubramanian/msfx_scala.git Philosophically speaking its possibly not a good idea to take an either/or approach to technology...Like its never going to be either RDBMS or NOSQL (If the Cassandra behind FB shows 100 fewer likes instead of 1000 on you Photo a day for some reason u may not be as upset...but if the Oracle/Db2 systems behind Wells Fargo show $100 LESS in your account due to an database error, you will be PANIC-ing). So its the same case with Spark or Hadoop. I can speak for myself. I have a usecase for processing old logs that are multiline (i.e. they have a [begin_timestamp_logid] and [end_timestamp_logid] and have many lines in between. In Java Hadoop I created custom RecordReaders to solve this. I still dont know how to do this in Spark. Till that time I am possibly gonna run the Hadoop code within Oozie in production. Also my
Re: Spark or MR, Scala or Java?
I am a newbie as well to Spark. Been Hadoop/Hive/Oozie programming extensively before this. I use Hadoop(Java MR code)/Hive/Impala/Presto on a daily basis. To get me jumpstarted into Spark I started this gitHub where there is "IntelliJ-ready-To-run" code (simple examples of jon, sparksql etc) and I will keep adding to that. I dont know scala and I am learning that too to help me use Spark better.https://github.com/sanjaysubramanian/msfx_scala.git Philosophically speaking its possibly not a good idea to take an either/or approach to technology...Like its never going to be either RDBMS or NOSQL (If the Cassandra behind FB shows 100 fewer likes instead of 1000 on you Photo a day for some reason u may not be as upset...but if the Oracle/Db2 systems behind Wells Fargo show $100 LESS in your account due to an database error, you will be PANIC-ing). So its the same case with Spark or Hadoop. I can speak for myself. I have a usecase for processing old logs that are multiline (i.e. they have a [begin_timestamp_logid] and [end_timestamp_logid] and have many lines in between. In Java Hadoop I created custom RecordReaders to solve this. I still dont know how to do this in Spark. Till that time I am possibly gonna run the Hadoop code within Oozie in production. Also my current task is evangelizing Big Data at my company. So the tech people I can educate with Hadoop and Spark and they would learn that but not the business intelligence analysts. They love SQL so I have to educate them using Hive , Presto, Impala...so the question is what is your task or tasks ? Sorry , a long non technical answer to your question... Make sense ? sanjay From: Krishna Sankar To: Sean Owen Cc: Guillermo Ortiz ; user Sent: Saturday, November 22, 2014 4:53 PM Subject: Re: Spark or MR, Scala or Java? Adding to already interesting answers: - "Is there any case where MR is better than Spark? I don't know what cases I should be used Spark by MR. When is MR faster than Spark?" - Many. MR would be better (am not saying faster ;o)) for - Very large dataset, - Multistage map-reduce flows, - Complex map-reduce semantics - Spark is definitely better for the classic iterative,interactive workloads. - Spark is very effective for implementing the concepts of in-memory datasets & real time analytics - Take a look at the Lambda architecture - Also checkout how Ooyala is using Spark in multiple layers & configurations. They also have MR in many places - In our case, we found Spark very effective for ELT - we would have used MR earlier - "I know Java, is it worth it to learn Scala for programming to Spark or it's okay just with Java?" - Java will work fine. Especially when Java 8 becomes the norm, we will get back some of the elegance - I, personally, like Scala & Python lot better than Java. Scala is a lot more elegant, but compilations, IDE integration et al are still clunky - One word of caution - stick with one language as much as possible-shuffling between Java & Scala is not fun Cheers & HTH On Sat, Nov 22, 2014 at 8:26 AM, Sean Owen wrote: MapReduce is simpler and narrower, which also means it is generally lighter weight, with less to know and configure, and runs more predictably. If you have a job that is truly just a few maps, with maybe one reduce, MR will likely be more efficient. Until recently its shuffle has been more developed and offers some semantics the Spark shuffle does not.I suppose it integrates with tools like Oozie, that Spark does not. I suggest learning enough Scala to use Spark in Scala. The amount you need to know is not large.(Mahout MR based implementations do not run on Spark and will not. They have been removed instead.)On Nov 22, 2014 3:36 PM, "Guillermo Ortiz" wrote: Hello, I'm a newbie with Spark but I've been working with Hadoop for a while. I have two questions. Is there any case where MR is better than Spark? I don't know what cases I should be used Spark by MR. When is MR faster than Spark? The other question is, I know Java, is it worth it to learn Scala for programming to Spark or it's okay just with Java? I have done a little piece of code with Java because I feel more confident with it,, but I seems that I'm missed something - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extracting values from a Collecion
I could not iterate thru the set but changed the code to get what I was looking for(Not elegant but gets me going) package org.medicalsidefx.common.utils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import scala.collection.mutable.ArrayBuffer /** * Created by sansub01 on 11/19/14. */ object TwoWayJoin2 { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: TwoWayJoinCount") System.exit(12) } val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-TwoWayJoin") val sc = new SparkContext(sconf) val file1 = args(0) val file2 = args(1) val file1Rdd = sc.textFile(file1).map(x => (x.split(",")(0), x.split(",")(1))) val file2Rdd = sc.textFile(file2).map(x => (x.split(",")(0), x.split(",")(1))).reduceByKey((v1,v2) => v1+"|"+v2) file1Rdd.collect().foreach(println) file2Rdd.collect().foreach(println) file1Rdd.join(file2Rdd).collect().foreach( e => println(e.toString.replace("(","").replace(")",""))) } } From: Jey Kottalam To: Sanjay Subramanian Cc: Arun Ahuja ; Andrew Ash ; user Sent: Friday, November 21, 2014 10:07 PM Subject: Extracting values from a Collecion Hi Sanjay, These are instances of the standard Scala collection type "Set", and its documentation can be found by googling the phrase "scala set". Hope that helps, -Jey On Fri, Nov 21, 2014 at 10:41 AM, Sanjay Subramanian wrote: > hey guys > > names.txt > = > 1,paul > 2,john > 3,george > 4,ringo > > > songs.txt > = > 1,Yesterday > 2,Julia > 3,While My Guitar Gently Weeps > 4,With a Little Help From My Friends > 1,Michelle > 2,Nowhere Man > 3,Norwegian Wood > 4,Octopus's Garden > > What I want to do is real simple > > Desired Output > == > (4,(With a Little Help From My Friends, Octopus's Garden)) > (2,(Julia, Nowhere Man)) > (3,(While My Guitar Gently Weeps, Norwegian Wood)) > (1,(Yesterday, Michelle)) > > > My Code > === > val file1Rdd = > sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => > (x.split(",")(0), x.split(",")(1))) > val file2Rdd = > sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => > (x.split(",")(0), x.split(",")(1))) > val file2RddGrp = file2Rdd.groupByKey() > file2Rdd.groupByKey().mapValues(names => > names.toSet).collect().foreach(println) > > Result > === > (4,Set(With a Little Help From My Friends, Octopus's Garden)) > (2,Set(Julia, Nowhere Man)) > (3,Set(While My Guitar Gently Weeps, Norwegian Wood)) > (1,Set(Yesterday, Michelle)) > > > How can I extract values from the Set ? > > Thanks > > sanjay >
Re: Extracting values from a Collecion
Thanks Jeyregardssanjay From: Jey Kottalam To: Sanjay Subramanian Cc: Arun Ahuja ; Andrew Ash ; user Sent: Friday, November 21, 2014 10:07 PM Subject: Extracting values from a Collecion Hi Sanjay, These are instances of the standard Scala collection type "Set", and its documentation can be found by googling the phrase "scala set". Hope that helps, -Jey On Fri, Nov 21, 2014 at 10:41 AM, Sanjay Subramanian wrote: > hey guys > > names.txt > = > 1,paul > 2,john > 3,george > 4,ringo > > > songs.txt > = > 1,Yesterday > 2,Julia > 3,While My Guitar Gently Weeps > 4,With a Little Help From My Friends > 1,Michelle > 2,Nowhere Man > 3,Norwegian Wood > 4,Octopus's Garden > > What I want to do is real simple > > Desired Output > == > (4,(With a Little Help From My Friends, Octopus's Garden)) > (2,(Julia, Nowhere Man)) > (3,(While My Guitar Gently Weeps, Norwegian Wood)) > (1,(Yesterday, Michelle)) > > > My Code > === > val file1Rdd = > sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => > (x.split(",")(0), x.split(",")(1))) > val file2Rdd = > sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => > (x.split(",")(0), x.split(",")(1))) > val file2RddGrp = file2Rdd.groupByKey() > file2Rdd.groupByKey().mapValues(names => > names.toSet).collect().foreach(println) > > Result > === > (4,Set(With a Little Help From My Friends, Octopus's Garden)) > (2,Set(Julia, Nowhere Man)) > (3,Set(While My Guitar Gently Weeps, Norwegian Wood)) > (1,Set(Yesterday, Michelle)) > > > How can I extract values from the Set ? > > Thanks > > sanjay >
Re: Extracting values from a Collecion
I am sorry the last line in the code is file1Rdd.join(file2RddGrp.mapValues(names => names.toSet)).collect().foreach(println) so My Code===val file1Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2RddGrp = file2Rdd.groupByKey()file1Rdd.join(file2RddGrp.mapValues(names => names.toSet)).collect().foreach(println) Result===(4,(ringo,Set(With a Little Help From My Friends, Octopus's Garden)))(2,(john,Set(Julia, Nowhere Man)))(3,(george,Set(While My Guitar Gently Weeps, Norwegian Wood)))(1,(paul,Set(Yesterday, Michelle))) Again the question is how do I extract values from the Set ? thanks sanjay From: Sanjay Subramanian To: Arun Ahuja ; Andrew Ash Cc: user Sent: Friday, November 21, 2014 10:41 AM Subject: Extracting values from a Collecion hey guys names.txt= 1,paul2,john3,george4,ringo songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's Garden What I want to do is real simple Desired Output ==(4,(With a Little Help From My Friends, Octopus's Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian Wood))(1,(Yesterday, Michelle)) My Code===val file1Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2RddGrp = file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => names.toSet).collect().foreach(println) Result===(4,Set(With a Little Help From My Friends, Octopus's Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, Norwegian Wood))(1,Set(Yesterday, Michelle)) How can I extract values from the Set ? Thanks sanjay
Extracting values from a Collecion
hey guys names.txt= 1,paul2,john3,george4,ringo songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's Garden What I want to do is real simple Desired Output ==(4,(With a Little Help From My Friends, Octopus's Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian Wood))(1,(Yesterday, Michelle)) My Code===val file1Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2Rdd = sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => (x.split(",")(0), x.split(",")(1)))val file2RddGrp = file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => names.toSet).collect().foreach(println) Result===(4,Set(With a Little Help From My Friends, Octopus's Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, Norwegian Wood))(1,Set(Yesterday, Michelle)) How can I extract values from the Set ? Thanks sanjay
Re: Code works in Spark-Shell but Fails inside IntelliJ
Not using SBT...I have been creating and adapting various Spark Scala examples and put it here and all u have to do is git clone and import as maven project into IntelliJhttps://github.com/sanjaysubramanian/msfx_scala.git Sidenote , IMHO, IDEs encourage the "new to Spark/Scala developers" to quickly test , experiment and debug code. From: Jay Vyas To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Thursday, November 20, 2014 4:53 PM Subject: Re: Code works in Spark-Shell but Fails inside IntelliJ This seems pretty standard: your IntelliJ classpath isn't matched to the correct ones that are used in spark shell Are you using the SBT plugin? If not how are you putting deps into IntelliJ? On Nov 20, 2014, at 7:35 PM, Sanjay Subramanian wrote: hey guys I am at AmpCamp 2014 at UCB right now :-) Funny Issue... This code works in Spark-Shell but throws a funny exception in IntelliJ CODE val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect().foreach(println) RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot NG,353] INTELLIJ CODE=object ParquetSql { def main(args: Array[String]) { val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql") val sc = new SparkContext(sconf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet") wikiData.registerTempTable("wikiData") val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10") results.collect().foreach(println) } } INTELLIJ ERROR==Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.spark.serializer.Serializer, but class was expected at org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244) at org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Re: Code works in Spark-Shell but Fails inside IntelliJ
Awesome that was it...Hit me with with a hockey stick :-) unmatched Spark Core (1.0.0) and SparkSql (1.1.1) versionsCorrected that to 1.1.0 on both org.apache.spark spark-core_2.10 1.0.0 org.apache.spark spark-sql_2.10 1.1.0 From: Michael Armbrust To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Thursday, November 20, 2014 4:49 PM Subject: Re: Code works in Spark-Shell but Fails inside IntelliJ Looks like intelij might be trying to load the wrong version of spark? On Thu, Nov 20, 2014 at 4:35 PM, Sanjay Subramanian wrote: hey guys I am at AmpCamp 2014 at UCB right now :-) Funny Issue... This code works in Spark-Shell but throws a funny exception in IntelliJ CODE val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect().foreach(println) RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot NG,353] INTELLIJ CODE=object ParquetSql { def main(args: Array[String]) { val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql") val sc = new SparkContext(sconf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet") wikiData.registerTempTable("wikiData") val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10") results.collect().foreach(println) } } INTELLIJ ERROR==Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.spark.serializer.Serializer, but class was expected at org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244) at org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Code works in Spark-Shell but Fails inside IntelliJ
hey guys I am at AmpCamp 2014 at UCB right now :-) Funny Issue... This code works in Spark-Shell but throws a funny exception in IntelliJ CODE val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect().foreach(println) RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot NG,353] INTELLIJ CODE=object ParquetSql { def main(args: Array[String]) { val sconf = new SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql") val sc = new SparkContext(sconf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") val wikiData = sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet") wikiData.registerTempTable("wikiData") val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10") results.collect().foreach(println) } } INTELLIJ ERROR==Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.spark.serializer.Serializer, but class was expected at org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244) at org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Cant start spark-shell in CDH Spark Standalone 1.1.0+cdh5.2.0+56
hey guys Anyone using CDH Spark StandaloneI installed Spark standalone thru Cloudera Manager $ spark-shell --total-executor-cores 8 /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/bin/../lib/spark/bin/spark-shell: line 44: /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/bin/utils.sh: No such file or directory It used to all work in the 5.1.x version of CDH sanjay
Re: Spark inside Eclipse
So some progress but still errors object WordCount { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: WordCount ") System.exit(1) } val conf = new SparkConf().setMaster("local").setAppName(s"Whatever") val sc = new SparkContext(conf); val file = args(0) val counts = sc.textFile(file). flatMap(line => line.split("\\W")). map(word => (word,1)). reduceByKey((v1,v2) => v1+v2) counts.take(10).foreach(println) }} The errors I am getting are 14/10/03 18:09:17 INFO spark.SecurityManager: Changing view acls to: sansub0114/10/03 18:09:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sansub01)Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at akka.util.Collections$EmptyImmutableSeq$.(Collections.scala:15) at akka.util.Collections$EmptyImmutableSeq$.(Collections.scala) at akka.japi.Util$.immutableSeq(JavaAPI.scala:209) at akka.actor.ActorSystem$Settings.(ActorSystem.scala:150) at akka.actor.ActorSystemImpl.(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:104) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:152) at org.apache.spark.SparkContext.(SparkContext.scala:202) at com.roberthalf.common.utils.WordCount$.main(WordCount.scala:14) at com.roberthalf.common.utils.WordCount.main(WordCount.scala)Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 12 more I am gonna keep working to solve this. Meanwhile if u can provide some guidance that would be cool sanjay From: Daniel Siegmann To: Ashish Jain Cc: Sanjay Subramanian ; "user@spark.apache.org" Sent: Thursday, October 2, 2014 6:52 AM Subject: Re: Spark inside Eclipse You don't need to do anything special to run in local mode from within Eclipse. Just create a simple SparkConf and create a SparkContext from that. I have unit tests which execute on a local SparkContext, and they work from inside Eclipse as well as SBT. val conf = new SparkConf().setMaster("local").setAppName(s"Whatever") val sc = new SparkContext(sparkConf) Keep in mind you can only have one local SparkContext at a time, otherwise you will get some weird errors. If you have tests running sequentially, make sure to close the SparkContext in your tear down method. If tests run in parallel you'll need to share the SparkContext between tests. For unit testing, you can make use of SparkContext.parallelize to set up your test inputs and RDD.collect to retrieve the outputs. On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain wrote: Hello Sanjay,This can be done, and is a very effective way to debug.1) Compile and package your project to get a fat jar 2) In your SparkConf use setJars and give location of this jar. Also set your master here as local in SparkConf 3) Use this SparkConf when creating JavaSparkContext 4) Debug your program like you would any normal program.Hope this helps.Thanks AshishOn Oct 1, 2014 4:35 PM, "Sanjay Subramanian" wrote: hey guys Is there a way to run Spark in local mode from within Eclipse.I am running Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop map/reduce applications from within Eclipse and debug and learn. thanks sanjay -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Spark inside Eclipse
cool thanks will set this up and report back how things wentregardssanjay From: Daniel Siegmann To: Ashish Jain Cc: Sanjay Subramanian ; "user@spark.apache.org" Sent: Thursday, October 2, 2014 6:52 AM Subject: Re: Spark inside Eclipse You don't need to do anything special to run in local mode from within Eclipse. Just create a simple SparkConf and create a SparkContext from that. I have unit tests which execute on a local SparkContext, and they work from inside Eclipse as well as SBT. val conf = new SparkConf().setMaster("local").setAppName(s"Whatever") val sc = new SparkContext(sparkConf) Keep in mind you can only have one local SparkContext at a time, otherwise you will get some weird errors. If you have tests running sequentially, make sure to close the SparkContext in your tear down method. If tests run in parallel you'll need to share the SparkContext between tests. For unit testing, you can make use of SparkContext.parallelize to set up your test inputs and RDD.collect to retrieve the outputs. On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain wrote: Hello Sanjay,This can be done, and is a very effective way to debug.1) Compile and package your project to get a fat jar 2) In your SparkConf use setJars and give location of this jar. Also set your master here as local in SparkConf 3) Use this SparkConf when creating JavaSparkContext 4) Debug your program like you would any normal program.Hope this helps.Thanks AshishOn Oct 1, 2014 4:35 PM, "Sanjay Subramanian" wrote: hey guys Is there a way to run Spark in local mode from within Eclipse.I am running Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop map/reduce applications from within Eclipse and debug and learn. thanks sanjay -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Multiple spark shell sessions
Awesome thanks a TON. It works There is a clash in the UI port initially but looks like it creates a second UI port at 4041 for the second user wanting to use the spark-shell 14/10/01 17:34:38 INFO JettyUtils: Failed to create UI at port, 4040. Trying again.14/10/01 17:34:38 INFO JettyUtils: Error was: Failure(java.net.BindException: Address already in use)14/10/01 17:34:38 INFO SparkUI: Started SparkUI at http://hadoop02:4041 sanjay From: Matei Zaharia To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, October 1, 2014 5:19 PM Subject: Re: Multiple spark shell sessions You need to set --total-executor-cores to limit how many total cores it grabs on the cluster. --executor-cores is just for each individual executor, but it will try to launch many of them. Matei On Oct 1, 2014, at 4:29 PM, Sanjay Subramanian wrote: hey guys I am using spark 1.0.0+cdh5.1.0+41 When two users try to run "spark-shell" , the first guy's spark-shell shows active in the 18080 Web UI but the second user shows WAITING and the shell has a bunch of errors but does go the spark-shell and "sc.master" seems to point to the correct master. I tried controlling the number of cores in the "spark-shell" command --executor-cores 8 Does not work thanks sanjay
Spark inside Eclipse
hey guys Is there a way to run Spark in local mode from within Eclipse.I am running Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop map/reduce applications from within Eclipse and debug and learn. thanks sanjay
Multiple spark shell sessions
hey guys I am using spark 1.0.0+cdh5.1.0+41 When two users try to run "spark-shell" , the first guy's spark-shell shows active in the 18080 Web UI but the second user shows WAITING and the shell has a bunch of errors but does go the spark-shell and "sc.master" seems to point to the correct master. I tried controlling the number of cores in the "spark-shell" command --executor-cores 8 Does not work thanks sanjay