Porting R code to SparkR

2015-11-11 Thread Sanjay Subramanian
Hi guys
This is possibly going to sound like a vague, stupid question but I have a 
problem to solve and I need help. So any which way I go is only up :-) 
I have a bunch of R scripts (I am not a R expert) and we are currently 
evaluating how to translate these R scripts to SparkR data frame syntax. The 
goal is to use the Spark R parallel-ization
As an example we are using say Corpus, tm_map , DocumentTermMatrix from the 
library("tm")
How do we translate these to SparkR syntax ?
Any pointers would be helpful.
thanks
sanjay 
  


Re: Is it possible Running SparkR on 2 nodes without HDFS

2015-11-10 Thread Sanjay Subramanian
Cool thanksI have a CDH 5.4.8 (Cloudera Starving Developers Version) with 1 NN 
and 4 DN and SPark is running but its 1.3.xI want to leverage this HDFS hive 
cluster for SparkR because we do all data munging here and produce datasets for 
ML.
I am thinking of the following idea 
1. Add 2 datanodes to the existing HDFS cluster thru Cloudera Manager2. Dont 
add any Spark Service to these two new nodes3. Download and install latest 
1.5.1 Spark on these two datanodes4. Download and Install R on these 2 
datanodes5. Configure spark as 1 master and 1 slave on one node . On second 
node, configure slave
will report back if this works !
thanks
sanjay   From: shenLiu 
 To: Sanjay Subramanian ; User 
 
 Sent: Monday, November 9, 2015 10:23 PM
 Subject: RE: Is it possible Running SparkR on 2 nodes without HDFS
   
#yiv4791623997 #yiv4791623997 --.yiv4791623997hmmessage 
P{margin:0px;padding:0px;}#yiv4791623997 
body.yiv4791623997hmmessage{font-size:12pt;}#yiv4791623997 Hi Sanjay,
It's better to use HDFS. otherwise you should have copies of the csv file on 
all worker node with same path.
regardsShawn



Date: Tue, 10 Nov 2015 02:06:16 +
From: sanjaysubraman...@yahoo.com.INVALID
To: user@spark.apache.org
Subject: Is it possible Running SparkR on 2 nodes without HDFS

hey guys
I have a 2 node SparkR (1 master 1 slave)cluster on AWS using 
spark-1.5.1-bin-without-hadoop.tgz
Running the SparkR job on the master node 
/opt/spark-1.5.1-bin-hadoop2.6/bin/sparkR --master  
spark://ip-xx-ppp-vv-ddd:7077 --packages com.databricks:spark-csv_2.10:1.2.0  
--executor-cores 16 --num-executors 8 --executor-memory 8G --driver-memory 8g   
myRprogram.R

  org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in 
stage 1.0 failed 4 times, most recent failure: Lost task 17.3 in stage 1.0 (TID 
103, xx.ff.rr.tt): java.io.FileNotFoundException: File 
file:/mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv 
does not exist at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
 at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
 at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
 at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) 
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140)
 at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) 
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at 
org.apache.hadoop.mapred.LineRecordReader.(LineRecord




myRprogram.R
library(SparkR)
sc <- sparkR.init(appName="SparkR-CancerData-example")sqlContext <- 
sparkRSQL.init(sc)
lds <- read.df(sqlContext, 
"file:///mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv",
 "com.databricks.spark.csv", 
header="true")sink("file:///mnt/local/1024gbxvdf1/leads_new_data_analyis.txt")summary(lds)

This used to run when we had a single node SparkR installation
regards
sanjay

 

  

Is it possible Running SparkR on 2 nodes without HDFS

2015-11-09 Thread Sanjay Subramanian
hey guys
I have a 2 node SparkR (1 master 1 slave)cluster on AWS using 
spark-1.5.1-bin-without-hadoop.tgz
Running the SparkR job on the master node 
/opt/spark-1.5.1-bin-hadoop2.6/bin/sparkR --master  
spark://ip-xx-ppp-vv-ddd:7077 --packages com.databricks:spark-csv_2.10:1.2.0  
--executor-cores 16 --num-executors 8 --executor-memory 8G --driver-memory 8g   
myRprogram.R

  org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in 
stage 1.0 failed 4 times, most recent failure: Lost task 17.3 in stage 1.0 (TID 
103, xx.ff.rr.tt): java.io.FileNotFoundException: File 
file:/mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv 
does not exist at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
 at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
 at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
 at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) 
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140)
 at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) 
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at 
org.apache.hadoop.mapred.LineRecordReader.(LineRecord




myRprogram.R
library(SparkR)
sc <- sparkR.init(appName="SparkR-CancerData-example")sqlContext <- 
sparkRSQL.init(sc)
lds <- read.df(sqlContext, 
"file:///mnt/local/1024gbxvdf1/all_adleads_cleaned_commas_in_quotes_good_file.csv",
 "com.databricks.spark.csv", 
header="true")sink("file:///mnt/local/1024gbxvdf1/leads_new_data_analyis.txt")summary(lds)

This used to run when we had a single node SparkR installation
regards
sanjay



Re: Spark-sql versus Impala versus Hive

2015-06-19 Thread Sanjay Subramanian
Hi guys
I am using CDH 5.3.3 and that comes with Hive 0.13.1 and Spark 1.2
So to answer your question its not Tez (that I believe comes with HortonWorks)
This Hive query was run with hive defaults.
I used additional hive params right now to improve the timingsSET 
mapreduce.job.reduces=16;SET mapreduce.tasktracker.map.tasks.maximum=24;SET 
mapreduce.tasktracker.reduce.tasks.maximum=16;SET 
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;SET
 mapreduce.map.output.compress=true;

Now Time taken: 140.139 seconds, Fetched: 29597 row(s)(surprisingly close to 
spark-sql now LOL. Time to tweak spark-sql now) 
EARLIER RESULTS
Hive – 326.021 seconds, Fetched: 29597 row(s)
Impala – Fetched 27625 row(s) in 17.02s
spark-sql – Time taken: 120.236 seconds 

I don't have the bandwidth to manage individual components on the cluster :-) 
since I am solo doing all this and delivering ML solutions to production LOL.So 
I depend on distribution such as CDH. The downside is that one is always couple 
of versions behind.
Thanks for your questions.
regards
sanjay
  From: Michael Armbrust 
 To: user  
 Sent: Thursday, June 18, 2015 3:25 PM
 Subject: Re: Spark-sql versus Impala versus Hive
   
I would also love to see a more recent version of Spark SQL.  There have been a 
lot of performance improvements between 1.2 and 1.4 :)


On Thu, Jun 18, 2015 at 3:18 PM, Steve Nunez  wrote:

Interesting. What where the Hive settings? Specifically it would be useful to 
know if this was Hive on Tez.
- Steve
From: Sanjay Subramanian
Reply-To: Sanjay Subramanian
Date: Thursday, June 18, 2015 at 11:08
To: "user@spark.apache.org"
Subject: Spark-sql versus Impala versus Hive


I just published results of my findings 
herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/







  

Spark-sql versus Impala versus Hive

2015-06-18 Thread Sanjay Subramanian
I just published results of my findings 
herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/




Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-17 Thread Sanjay Subramanian
ok solved. Looks like breathing the the spark-summit SFO air for 3 days helped 
a lot !
Piping the 7 million records to local disk still runs out of memory.So piped 
the results into another Hive table. I can live with that :-) 
/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "use aers; create table 
unique_aers_demo as select distinct isr,event_dt,age,age_cod,sex,year,quarter 
from aers.aers_demo_view " --driver-memory 4G --total-executor-cores 12 
--executor-memory 4G

thanks

  From: Sanjay Subramanian 
 To: "user@spark.apache.org"  
 Sent: Thursday, June 11, 2015 8:43 AM
 Subject: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java 
heap space
   
hey guys
Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI 
mode
Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
distribution (starving developer version 5.3.3)
3 datanode hadoop cluster32GB RAM per node8 cores per node



| spark | 1.2.0+cdh5.3.3+371 |



I am testing some stuff on one view and getting memory errorsPossibly reason is 
default memory per executor showing on 18080 is 512M

These options when used to start the spark-sql CLI does not seem to have any 
effect --total-executor-cores 12 --executor-memory 4G



/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  "select distinct 
isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view"

aers.aers_demo_view (7 million+ records)===isr     bigint  case 
idevent_dt        bigint  Event dateage     double  age of patientage_cod 
string  days,months yearssex     string  M or Fyear    intquarter int

VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT 
`isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS 
`age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM 
(SELECT   `aers_demo_v1`.`isr`,   `aers_demo_v1`.`event_dt`,   
`aers_demo_v1`.`age`,   `aers_demo_v1`.`age_cod`,   `aers_demo_v1`.`gndr_cod`,  
 `aers_demo_v1`.`year`,   `aers_demo_v1`.`quarter`FROM  
`aers`.`aers_demo_v1`UNION ALLSELECT   `aers_demo_v2`.`isr`,   
`aers_demo_v2`.`event_dt`,   `aers_demo_v2`.`age`,   `aers_demo_v2`.`age_cod`,  
 `aers_demo_v2`.`gndr_cod`,   `aers_demo_v2`.`year`,   
`aers_demo_v2`.`quarter`FROM  `aers`.`aers_demo_v2`UNION ALLSELECT   
`aers_demo_v3`.`isr`,   `aers_demo_v3`.`event_dt`,   `aers_demo_v3`.`age`,   
`aers_demo_v3`.`age_cod`,   `aers_demo_v3`.`gndr_cod`,   `aers_demo_v3`.`year`, 
  `aers_demo_v3`.`quarter`FROM  `aers`.`aers_demo_v3`UNION ALLSELECT   
`aers_demo_v4`.`isr`,   `aers_demo_v4`.`event_dt`,   `aers_demo_v4`.`age`,   
`aers_demo_v4`.`age_cod`,   `aers_demo_v4`.`gndr_cod`,   `aers_demo_v4`.`year`, 
  `aers_demo_v4`.`quarter`FROM  `aers`.`aers_demo_v4`UNION ALLSELECT   
`aers_demo_v5`.`primaryid` AS `ISR`,   `aers_demo_v5`.`event_dt`,   
`aers_demo_v5`.`age`,   `aers_demo_v5`.`age_cod`,   `aers_demo_v5`.`gndr_cod`,  
 `aers_demo_v5`.`year`,   `aers_demo_v5`.`quarter`FROM  
`aers`.`aers_demo_v5`UNION ALLSELECT   `aers_demo_v6`.`primaryid` AS `ISR`,   
`aers_demo_v6`.`event_dt`,   `aers_demo_v6`.`age`,   `aers_demo_v6`.`age_cod`,  
 `aers_demo_v6`.`sex` AS `GNDR_COD`,   `aers_demo_v6`.`year`,   
`aers_demo_v6`.`quarter`FROM  `aers`.`aers_demo_v6`) `aers_demo_view`






15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
user handler while handling an exception event ([id: 0x01b99855, 
/10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
Java heap space)java.lang.OutOfMemoryError: Java heap space        at 
org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42)      
  at 
org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34)
        at 
org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)        at 
org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
        at 
org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
        at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)        
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)      
  at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)       
 at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        at org.jboss.netty.channel.socket.

spark-sql CLI options does not work --master yarn --deploy-mode client

2015-06-16 Thread Sanjay Subramanian
hey guys 
I have CDH 5.3.3 with Spark 1.2.0 (on Yarn)
This does not work /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql 
--deploy-mode client --master yarn --driver-memory 1g -e "select j.person_id, 
p.first_name, p.last_name, count(*) from (select person_id from 
cdr.cdr_mjp_joborder where person_id is not null) j join (select person_id, 
first_name, last_name from cdr.cdr_mjp_people where lower(last_name) like 
'%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, 
p.first_name, p.last_name"
This works but only one Executor is 
used/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql  --driver-memory 1g -e 
"select j.person_id, p.first_name, p.last_name, count(*) from (select person_id 
from cdr.cdr_mjp_joborder where person_id is not null) j join (select 
person_id, first_name, last_name from cdr.cdr_mjp_people where lower(last_name) 
like '%subramanian%') p on j.person_id = p.person_id GROUP BY j.person_id, 
p.first_name, p.last_name"
Any thoughts ?

I found a related link but I don't understand the 
language.http://blog.csdn.net/freedomboy319/article/details/46332009

thanks
sanjay


ERRORSError: JAVA_HOME is not set and could not be found.15/06/16 18:17:19 WARN 
Holder: java.lang.ClassNotFoundException: 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at 
java.net.URLClassLoader$1.run(URLClassLoader.java:202) at 
java.security.AccessController.doPrivileged(Native Method) at 
java.net.URLClassLoader.findClass(URLClassLoader.java:190) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:306) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:247) at 
org.eclipse.jetty.util.Loader.loadClass(Loader.java:100) at 
org.eclipse.jetty.util.Loader.loadClass(Loader.java:79) at 
org.eclipse.jetty.servlet.Holder.doStart(Holder.java:107) at 
org.eclipse.jetty.servlet.FilterHolder.doStart(FilterHolder.java:90) at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at 
org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:768) at 
org.eclipse.jetty.servlet.ServletHandler.updateMappings(ServletHandler.java:1357)
 at 
org.eclipse.jetty.servlet.ServletHandler.setFilterMappings(ServletHandler.java:1393)
 at 
org.eclipse.jetty.servlet.ServletHandler.addFilterMapping(ServletHandler.java:1113)
 at 
org.eclipse.jetty.servlet.ServletHandler.addFilterWithMapping(ServletHandler.java:979)
 at 
org.eclipse.jetty.servlet.ServletContextHandler.addFilter(ServletContextHandler.java:332)
 at 
org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163)
 at 
org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1$$anonfun$apply$6.apply(JettyUtils.scala:163)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at 
org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:163)
 at 
org.apache.spark.ui.JettyUtils$$anonfun$addFilters$1.apply(JettyUtils.scala:141)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at 
org.apache.spark.ui.JettyUtils$.addFilters(JettyUtils.scala:141) at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90)
 at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter$3.apply(YarnSchedulerBackend.scala:90)
 at scala.Option.foreach(Option.scala:236) at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend.org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$addWebUIFilter(YarnSchedulerBackend.scala:90)
 at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:129)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at 
akka.actor.ActorCell.invoke(ActorCell.scala:456) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at 
akka.dispatch.Mailbox.run(Mailbox.scala:219) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)15/06/16
 18:17:19 WARN AbstractLifeCycle: FAILED 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter-1c7ab89d: 
javax.servlet.UnavailableException: 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilterjavax.servlet.UnavailableException:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter at 
org.eclipse.jetty.servlet.Holder.doStart(Holder.java:114) at 
org.eclipse.jetty.servlet.

HDFS not supported by databricks cloud :-(

2015-06-16 Thread Sanjay Subramanian
hey guys
After day one at the spark-summit SFO, I realized sadly that (indeed) HDFS is 
not supported by Databricks cloud.My speed bottleneck is to transfer ~1TB of 
snapshot HDFS data (250+ external hive tables) to S3 :-( 
I want to use databricks cloud but this to me is a starting disabler.The hard 
road for me will be (as I believe EVERYTHING is possible. The impossible just 
takes longer) - transfer all HDFS to S3- our org does not permit AWS server 
side encryption so I have figure out if AWS KMS encrypted S3 files can be read 
by Hive/Impala/Spark  - modify all table locations in metadata to S3- modify 
all scripts to point and write to S3 instead of   
Any ideas / thoughts will be helpful.
Till I can get the above figured out , I am going ahead and working hard to 
make spark-sql as the main workhorse for creating dataset (now its Hive and 
Impala)

thanksregards
sanjay 


Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-16 Thread Sanjay Subramanian
Hi Josh

It was great meeting u in person at the spark-summit SFO yesterday.
Thanks for discussing potential solutions to the problem.
I verified that 2 hive gateway nodes had not been configured correctly. My bad.
I added hive-site.xml to the spark Conf directories for these 2 additional hive 
gateway nodes. 

Plus I increased the driver-memory parameter to 1gb. That solved the memory 
issue. 

So good news is I can get spark-SQL running in standalone mode (on a CDH 5.3.3 
with spark 1.2 on YARN)

Not so good news is that the following params have no effect

--master yarn   --deployment-mode client

So the spark-SQL query runs with only ONE executor :-(

I am planning on bugging u for 5-10 minutes at the Spark office hours :-) and 
hopefully we can solve this. 

Thanks 
Best regards 
Sanjay 

Sent from my iPhone

> On Jun 13, 2015, at 5:38 PM, Josh Rosen  wrote:
> 
> Try using Spark 1.4.0 with SQL code generation turned on; this should make a 
> huge difference.
> 
>> On Sat, Jun 13, 2015 at 5:08 PM, Sanjay Subramanian 
>>  wrote:
>> hey guys
>> 
>> I tried the following settings as well. No luck
>> 
>> --total-executor-cores 24 --executor-memory 4G
>> 
>> 
>> BTW on the same cluster , impala absolutely kills it. same query 9 seconds. 
>> no memory issues. no issues.
>> 
>> In fact I am pretty disappointed with Spark-SQL.
>> I have worked with Hive during the 0.9.x stages and taken projects to 
>> production successfully and Hive actually very rarely craps out.
>> 
>> Whether the spark folks like what I say or not, yes my expectations are 
>> pretty high of Spark-SQL if I were to change the ways we are doing things at 
>> my workplace.
>> Until that time, we are going to be hugely dependent on Impala and  
>> Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow 
>> now).
>> 
>> I want to clarify for those of u who may be asking - why I am not using 
>> spark with Scala and insisting on using spark-sql ?
>> 
>> - I have already pipelined data from enterprise tables to Hive
>> - I am using CDH 5.3.3 (Cloudera starving developers version)
>> - I have close to 300 tables defined in Hive external tables.
>> - Data if on HDFS
>> - On an average we have 150 columns per table
>> - One an everyday basis , we do crazy amounts of ad-hoc joining of new and 
>> old tables in getting datasets ready for supervised ML
>> - I thought that quite simply I can point Spark to the Hive meta and do 
>> queries as I do - in fact the existing queries would work as is unless I am 
>> using some esoteric Hive/Impala function
>> 
>> Anyway, if there are some settings I can use and get spark-sql to run even 
>> on standalone mode that will be huge help.
>> 
>> On the pre-production cluster I have spark on YARN but could never get it to 
>> run fairly complex queries and I have no answers from this group of the CDH 
>> groups.
>> 
>> So my assumption is that its possibly not solved , else I have always got 
>> very quick answers and responses :-) to my questions on all CDH groups, 
>> Spark, Hive
>> 
>> best regards
>> 
>> sanjay
>> 
>>  
>> 
>> From: Josh Rosen 
>> To: Sanjay Subramanian  
>> Cc: "user@spark.apache.org"  
>> Sent: Friday, June 12, 2015 7:15 AM
>> Subject: Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: 
>> Java heap space
>> 
>> It sounds like this might be caused by a memory configuration problem.  In 
>> addition to looking at the executor memory, I'd also bump up the driver 
>> memory, since it appears that your shell is running out of memory when 
>> collecting a large query result.
>> 
>> Sent from my phone
>> 
>> 
>> 
>>> On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
>>>  wrote:
>>> 
>>> hey guys
>>> 
>>> Using Hive and Impala daily intensively.
>>> Want to transition to spark-sql in CLI mode
>>> 
>>> Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
>>> distribution (starving developer version 5.3.3)
>>> 3 datanode hadoop cluster
>>> 32GB RAM per node
>>> 8 cores per node
>>> 
>>> spark   
>>> 1.2.0+cdh5.3.3+371
>>> 
>>> 
>>> I am testing some stuff on one view and getting memory errors
>>> Possibly reason is default memory per executor showing on 18080 is 
>>> 512M
>>> 
>>> These options when used to start the spark-sql CLI does not seem to have 
>>> any effect 
>>> --to

Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-13 Thread Sanjay Subramanian
hey guys
I tried the following settings as well. No luck
--total-executor-cores 24 --executor-memory 4G

BTW on the same cluster , impala absolutely kills it. same query 9 seconds. no 
memory issues. no issues.
In fact I am pretty disappointed with Spark-SQL.I have worked with Hive during 
the 0.9.x stages and taken projects to production successfully and Hive 
actually very rarely craps out.
Whether the spark folks like what I say or not, yes my expectations are pretty 
high of Spark-SQL if I were to change the ways we are doing things at my 
workplace.Until that time, we are going to be hugely dependent on Impala and  
Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow 
now).
I want to clarify for those of u who may be asking - why I am not using spark 
with Scala and insisting on using spark-sql ?
- I have already pipelined data from enterprise tables to Hive- I am using CDH 
5.3.3 (Cloudera starving developers version)- I have close to 300 tables 
defined in Hive external tables.
- Data if on HDFS- On an average we have 150 columns per table- One an everyday 
basis , we do crazy amounts of ad-hoc joining of new and old tables in getting 
datasets ready for supervised ML- I thought that quite simply I can point Spark 
to the Hive meta and do queries as I do - in fact the existing queries would 
work as is unless I am using some esoteric Hive/Impala function
Anyway, if there are some settings I can use and get spark-sql to run even on 
standalone mode that will be huge help.
On the pre-production cluster I have spark on YARN but could never get it to 
run fairly complex queries and I have no answers from this group of the CDH 
groups.
So my assumption is that its possibly not solved , else I have always got very 
quick answers and responses :-) to my questions on all CDH groups, Spark, Hive
best regards
sanjay
 
  From: Josh Rosen 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Friday, June 12, 2015 7:15 AM
 Subject: Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: 
Java heap space
   
It sounds like this might be caused by a memory configuration problem.  In 
addition to looking at the executor memory, I'd also bump up the driver memory, 
since it appears that your shell is running out of memory when collecting a 
large query result.

Sent from my phone


On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
 wrote:


hey guys
Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI 
mode
Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
distribution (starving developer version 5.3.3)
3 datanode hadoop cluster32GB RAM per node8 cores per node

| spark | 1.2.0+cdh5.3.3+371 |



I am testing some stuff on one view and getting memory errorsPossibly reason is 
default memory per executor showing on 18080 is 512M

These options when used to start the spark-sql CLI does not seem to have any 
effect --total-executor-cores 12 --executor-memory 4G



/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  "select distinct 
isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view"

aers.aers_demo_view (7 million+ records)===isr     bigint  case 
idevent_dt        bigint  Event dateage     double  age of patientage_cod 
string  days,months yearssex     string  M or Fyear    intquarter int

VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT 
`isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS 
`age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM 
(SELECT   `aers_demo_v1`.`isr`,   `aers_demo_v1`.`event_dt`,   
`aers_demo_v1`.`age`,   `aers_demo_v1`.`age_cod`,   `aers_demo_v1`.`gndr_cod`,  
 `aers_demo_v1`.`year`,   `aers_demo_v1`.`quarter`FROM  
`aers`.`aers_demo_v1`UNION ALLSELECT   `aers_demo_v2`.`isr`,   
`aers_demo_v2`.`event_dt`,   `aers_demo_v2`.`age`,   `aers_demo_v2`.`age_cod`,  
 `aers_demo_v2`.`gndr_cod`,   `aers_demo_v2`.`year`,   
`aers_demo_v2`.`quarter`FROM  `aers`.`aers_demo_v2`UNION ALLSELECT   
`aers_demo_v3`.`isr`,   `aers_demo_v3`.`event_dt`,   `aers_demo_v3`.`age`,   
`aers_demo_v3`.`age_cod`,   `aers_demo_v3`.`gndr_cod`,   `aers_demo_v3`.`year`, 
  `aers_demo_v3`.`quarter`FROM  `aers`.`aers_demo_v3`UNION ALLSELECT   
`aers_demo_v4`.`isr`,   `aers_demo_v4`.`event_dt`,   `aers_demo_v4`.`age`,   
`aers_demo_v4`.`age_cod`,   `aers_demo_v4`.`gndr_cod`,   `aers_demo_v4`.`year`, 
  `aers_demo_v4`.`quarter`FROM  `aers`.`aers_demo_v4`UNION ALLSELECT   
`aers_demo_v5`.`primaryid` AS `ISR`,   `aers_demo_v5`.`event_dt`,   
`aers_demo_v5`.`age`,   `aers_demo_v5`.`age_cod`,   `aers_demo_v5`.`gndr_cod`,  
 `aers_demo_v5`.`year`,   `aers_demo_v5`.`quarter`FROM  
`aers`.`aers_demo_v5`UNION ALLSELECT   `aers_demo_v6`.`primaryid` AS `ISR`,   
`aers_demo_v6`.`event_dt`,   `aers_demo_v6`.`age`,   `aers_demo_v6`.`age_cod`,  
 `aers_demo_v6`.`sex` AS `GNDR_COD`,   `aers_demo_v6`.`year`,   

spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-11 Thread Sanjay Subramanian
hey guys
Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI 
mode
Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
distribution (starving developer version 5.3.3)
3 datanode hadoop cluster32GB RAM per node8 cores per node

| spark | 1.2.0+cdh5.3.3+371 |



I am testing some stuff on one view and getting memory errorsPossibly reason is 
default memory per executor showing on 18080 is 512M

These options when used to start the spark-sql CLI does not seem to have any 
effect --total-executor-cores 12 --executor-memory 4G



/opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  "select distinct 
isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view"

aers.aers_demo_view (7 million+ records)===isr     bigint  case 
idevent_dt        bigint  Event dateage     double  age of patientage_cod 
string  days,months yearssex     string  M or Fyear    intquarter int

VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT 
`isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS 
`age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM 
(SELECT   `aers_demo_v1`.`isr`,   `aers_demo_v1`.`event_dt`,   
`aers_demo_v1`.`age`,   `aers_demo_v1`.`age_cod`,   `aers_demo_v1`.`gndr_cod`,  
 `aers_demo_v1`.`year`,   `aers_demo_v1`.`quarter`FROM  
`aers`.`aers_demo_v1`UNION ALLSELECT   `aers_demo_v2`.`isr`,   
`aers_demo_v2`.`event_dt`,   `aers_demo_v2`.`age`,   `aers_demo_v2`.`age_cod`,  
 `aers_demo_v2`.`gndr_cod`,   `aers_demo_v2`.`year`,   
`aers_demo_v2`.`quarter`FROM  `aers`.`aers_demo_v2`UNION ALLSELECT   
`aers_demo_v3`.`isr`,   `aers_demo_v3`.`event_dt`,   `aers_demo_v3`.`age`,   
`aers_demo_v3`.`age_cod`,   `aers_demo_v3`.`gndr_cod`,   `aers_demo_v3`.`year`, 
  `aers_demo_v3`.`quarter`FROM  `aers`.`aers_demo_v3`UNION ALLSELECT   
`aers_demo_v4`.`isr`,   `aers_demo_v4`.`event_dt`,   `aers_demo_v4`.`age`,   
`aers_demo_v4`.`age_cod`,   `aers_demo_v4`.`gndr_cod`,   `aers_demo_v4`.`year`, 
  `aers_demo_v4`.`quarter`FROM  `aers`.`aers_demo_v4`UNION ALLSELECT   
`aers_demo_v5`.`primaryid` AS `ISR`,   `aers_demo_v5`.`event_dt`,   
`aers_demo_v5`.`age`,   `aers_demo_v5`.`age_cod`,   `aers_demo_v5`.`gndr_cod`,  
 `aers_demo_v5`.`year`,   `aers_demo_v5`.`quarter`FROM  
`aers`.`aers_demo_v5`UNION ALLSELECT   `aers_demo_v6`.`primaryid` AS `ISR`,   
`aers_demo_v6`.`event_dt`,   `aers_demo_v6`.`age`,   `aers_demo_v6`.`age_cod`,  
 `aers_demo_v6`.`sex` AS `GNDR_COD`,   `aers_demo_v6`.`year`,   
`aers_demo_v6`.`quarter`FROM  `aers`.`aers_demo_v6`) `aers_demo_view`






15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
user handler while handling an exception event ([id: 0x01b99855, 
/10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
Java heap space)java.lang.OutOfMemoryError: Java heap space        at 
org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42)      
  at 
org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34)
        at 
org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)        at 
org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
        at 
org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
        at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)        
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)      
  at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)       
 at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
       at java.lang.Thread.run(Thread.java:745)15/06/11 08:36:40 ERROR Utils: 
Uncaught exception in thread task-result-getter-0java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at java.lang.Long.valueOf(Long.java:577)        
at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:113)
        at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:103)
        at com.esotericsoftware.kryo.Kry

Cant figure out spark-sql errors - switching to Impala - sorry guys

2015-06-02 Thread Sanjay Subramanian
Cant figure out spark-sql errors - switching to Hive and Impala for now - sorry 
guys, no hard feelings
  From: Sanjay Subramanian 
 To: Sanjay Subramanian ; user 
 
 Sent: Saturday, May 30, 2015 1:52 PM
 Subject: Re: spark-sql errors
   
any ideas guys ? how to solve this ?
 

 From: Sanjay Subramanian 
 To: user  
 Sent: Friday, May 29, 2015 5:29 PM
 Subject: spark-sql errors
   
https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc


 


   

  

Re: spark-sql errors

2015-05-30 Thread Sanjay Subramanian
any ideas guys ? how to solve this ?
  From: Sanjay Subramanian 
 To: user  
 Sent: Friday, May 29, 2015 5:29 PM
 Subject: spark-sql errors
   
https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc


 


   

Re: Is anyone using Amazon EC2? (second attempt!)

2015-05-29 Thread Sanjay Subramanian
I use spark on EC2 but it's a CDH 5.3.3 distribution (starving developer 
version) installed thru Cloudera Manager. Spark is configured to run on Yarn. 

Regards
Sanjay

Sent from my iPhone

> On May 29, 2015, at 6:16 PM, roni  wrote:
> 
> Hi ,
> Any update on this? 
> I am not sure if the issue I am seeing is related ..
> I have 8 slaves and when I created the cluster I specified ebs volume with 
> 100G.
> I see on Ec2 8 volumes created and each attached to the corresponding slave.
> But when I try to copy data on it , it complains that 
> /root/ephemeral-hdfs/bin/hadoop fs -cp /intersection 
> hdfs://ec2-54-149-112-136.us-west-2.compute.amazonaws.com:9010/
> 
> 2015-05-28 23:40:35,447 WARN  hdfs.DFSClient (DFSOutputStream.java:run(562)) 
> - DataStreamer Exception
> 
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
> /intersection/kmer150/commonGoodKmers/_temporary/_attempt_201504010056_0004_m_000428_3948/part-00428._COPYING_
>  could only be replicated to 0 nodes instead of minReplication (=1).  There 
> are 1 datanode(s) running and no node(s) are excluded in this operation.
> 
> 
> 
> It shows only 1 datanode , but for ephermal-hdfs it shows 8 datanodes.
> 
> Any thoughts?
> 
> Thanks
> 
> _R
> 
> 
>> On Sat, May 23, 2015 at 7:24 AM, Joe Wass  wrote:
>> I used Spark on EC2 a while ago, but recent revisions seem to have broken 
>> the functionality.
>> 
>> Is anyone actually using Spark on EC2 at the moment?
>> 
>> The bug in question is:
>> 
>> https://issues.apache.org/jira/browse/SPARK-5008
>> 
>> It makes it impossible to use persistent HDFS without a workround on each 
>> slave node.
>> 
>> No-one seems to be interested in the bug, so I wonder if other people aren't 
>> actually having this problem. If this is the case, any suggestions? 
>> 
>> Joe
> 


spark-sql errors

2015-05-29 Thread Sanjay Subramanian
https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
 


Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-28 Thread Sanjay Subramanian
ok guys , finally figured out how to get it running. I have detailed out the 
steps I did. Perhaps its clear to all you folks. To me it was not :-) Our 
Hadoop development environment   
   - 3 node development hadoop cluster
   - Current version CDH 5.3.3
   - Hive 0.13.1
   - Spark 1.2.0 (standalone mode)  
  - node1 (worker1, master)
  - node2 (worker2)
  - node3 (worker3)

   - Cloudera Manager to manage and update(using parcels)
Steps to get spark-sql running   
   - On every node(node1, node2, node3 above)  
  - sudo cp -avi /etc/hive/conf/hive-site.xml /etc/spark/conf

   - Edit and add a line  
  - sudo vi /opt/cloudera/parcels/CDH/lib/spark/bin/compute-classpath.sh
 
 - # added by sanjay for running Spark using hive metadata
 - CLASSPATH="$CLASSPATH:/opt/cloudera/parcels/CDH/lib/hive/lib/*"


   - Run spark SQL in CLI mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql

   - Run spark SQL in async mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select * from 
band.beatles where upper(first_name) like '%GEORGE%' "

   - Run spark SQL in "SQL File" mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -f   get_names.hql


  From: Andrew Otto 
 To: Sanjay Subramanian  
Cc: user  
 Sent: Thursday, May 28, 2015 7:26 AM
 Subject: Re: Pointing SparkSQL to existing Hive Metadata with data file 
locations in HDFS
   

val sqlContext = new HiveContext(sc)val schemaRdd = sqlContext.sql("some 
complex SQL")

It mostly works, but have been having issues with tables that contains a large 
amount of data:
https://issues.apache.org/jira/browse/SPARK-6910




On May 27, 2015, at 20:52, Sanjay Subramanian 
 wrote:
hey guys
On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.The data is stored an text (moving slowly to 
Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be 
able to define JOINS etc using a programming structure like this 
import org.apache.spark.sql.hive.HiveContextval sqlContext = new 
HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL")

Is that the way to go ? Some guidance will be great.
thanks
sanjay 






  

Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread Sanjay Subramanian
hey guys
On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.The data is stored an text (moving slowly to 
Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be 
able to define JOINS etc using a programming structure like this 
import org.apache.spark.sql.hive.HiveContextval sqlContext = new 
HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL")

Is that the way to go ? Some guidance will be great.
thanks
sanjay 




Re: MappedRDD signature

2015-01-28 Thread Sanjay Subramanian
Thanks Sean. that works and I started the join of this mappedRDD to another one 
I have.I have to internalize the use of Map versus FlatMap. Thinking Map Reduce 
Java Hadoop code often blinds me :-) 
  From: Sean Owen 
 To: Sanjay Subramanian  
Cc: Cheng Lian ; Jorge Lopez-Malla 
; "user@spark.apache.org"  
 Sent: Wednesday, January 28, 2015 11:44 AM
 Subject: Re: MappedRDD signature
   
I think it's clear if you format your function reasonably:

mjpJobOrderRDD.map(line => {
  val tokens = line.split("\t");
  if (tokens.length == 164 && tokens(23) != null) {
    (tokens(23),tokens(7))
  }
})

In some cases the function returns nothing, in some cases a tuple. The
return type is therefore Any. If you just mean to output a result in
some cases and not others, you must use flatMap + Some + None:

mjpJobOrderRDD.flatMap { line =>
  val tokens = line.split("\t")
  if (tokens.length == 164 && tokens(23) != null) {
    Some((tokens(23),tokens(7)))
  } else {
    None
  }
}



On Wed, Jan 28, 2015 at 7:37 PM, Sanjay Subramanian
 wrote:
> hey guys
>
> I am not following why this happens
>
> DATASET
> ===
> Tab separated values (164 columns)
>
> Spark command 1
> 
> val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")
> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens =
> line.split("\t");(tokens(23),tokens(7))})
> mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[(String, String)] =
> MappedRDD[18] at map at :14
>
>
> Spark command 2
> 
> val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")
> scala> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val
> tokens = line.split("\t"); if (tokens.length == 164 && tokens(23) != null)
> {(tokens(23),tokens(7))} })
> mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[Any] = MappedRDD[19] at
> map at :14
>
>
> In the second case above , why does it say org.apache.spark.rdd.RDD[Any] and
> not org.apache.spark.rdd.RDD[(String, String)]
>
>
> thanks
>
> sanjay
>


  

MappedRDD signature

2015-01-28 Thread Sanjay Subramanian
hey guys 
I am not following why this happens
DATASET===Tab separated values (164 columns)
Spark command 1val mjpJobOrderRDD = 
sc.textFile("/data/cdr/cdr_mjp_joborder_raw")val mjpJobOrderColsPairedRDD = 
mjpJobOrderRDD.map(line => { val tokens = 
line.split("\t");(tokens(23),tokens(7))})mjpJobOrderColsPairedRDD: 
org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[18] at map at 
:14

Spark command 2val mjpJobOrderRDD = 
sc.textFile("/data/cdr/cdr_mjp_joborder_raw")scala> val 
mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens = 
line.split("\t"); if (tokens.length == 164 && tokens(23) != null) 
{(tokens(23),tokens(7))} }) mjpJobOrderColsPairedRDD: 
org.apache.spark.rdd.RDD[Any] = MappedRDD[19] at map at :14

In the second case above , why does it say org.apache.spark.rdd.RDD[Any] and 
not org.apache.spark.rdd.RDD[(String, String)]

thanks
sanjay


Re: FlatMapValues

2015-01-05 Thread Sanjay Subramanian
cool let me adapt that. thanks a tonregardssanjay
  From: Sean Owen 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Monday, January 5, 2015 3:19 AM
 Subject: Re: FlatMapValues
   
For the record, the solution I was suggesting was about like this:

inputRDD.flatMap { input =>
  val tokens = input.split(',')
  val id = tokens(0)
  val keyValuePairs = tokens.tail.grouped(2)
  val keys = keyValuePairs.map(_(0))
  keys.map(key => (id, key))
}

This is much more efficient.



On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen  wrote:
> From the clarification below, the problem is that you are calling
> flatMapValues, which is only available on an RDD of key-value tuples.
> Your map function returns a tuple in one case but a String in the
> other, so your RDD is a bunch of Any, which is not at all what you
> want. You need to return a tuple in both cases, which is what Kapil
> pointed out.
>
> However it's still not quite what you want. Your input is basically
> [key value1 value2 value3] so you want to flatMap that to (key,value1)
> (key,value2) (key,value3). flatMapValues does not come into play.
>
> On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian
>  wrote:
>> My understanding is as follows
>>
>> STEP 1 (This would create a pair RDD)
>> ===
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>  }
>>  else {
>>    ""
>>  }
>>  })
>>
>> STEP 2
>> ===
>> Since previous step created a pair RDD, I thought flatMapValues method will
>> be applicable.
>> But the code does not even compile saying that flatMapValues is not
>> applicable to RDD :-(
>>
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>  }
>>  else {
>>    ""
>>  }
>>  }).flatMapValues(skus =>
>> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>>
>>
>> SUMMARY
>> ===
>> when a dataset looks like the following
>>
>> 1,red,blue,green
>> 2,yellow,violet,pink
>>
>> I want to output the following and I am asking how do I do that ? Perhaps my
>> code is 100% wrong. Please correct me and educate me :-)
>>
>> 1,red
>> 1,blue
>> 1,green
>> 2,yellow
>> 2,violet
>> 2,pink


  

Re: A spark newbie question

2015-01-04 Thread Sanjay Subramanian
val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-CassandraLogsMessageTypeCount")
val sc = new SparkContext(sconf)val inputDir = "/path/to/cassandralogs.txt"

sc.textFile(inputDir).map(line => line.replace("\"", "")).map(line => 
(line.split(' ')(0) + " " + line.split(' ')(2), 1)).reduceByKey((v1,v2) => 
v1+v2).collect().foreach(println)
If u want to save the file ==val outDir = 
"/path/to/output/dir/cassandra_logs"
var outFile = outDir+"/"+"sparkout_" + System.currentTimeMillis

sc.textFile(inputDir).map(line => line.replace("\"", "")).map(line => 
(line.split(' ')(0) + " " + line.split(' ')(2), 1)).reduceByKey((v1,v2) => 
v1+v2).saveToTextFile(outFile)
The code is here (not elegant :-) but works) 
https://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/CassandraLogsMessageTypeCount.scala
OUTPUT===(2014-06-27 PAUSE,1)(2014-06-27 START,2)(2014-06-27 
STOP,1)(2014-06-25 STOP,1)(2014-06-27 RESTART,1)(2014-06-27 
REWIND,2)(2014-06-25 START,3)(2014-06-25 PAUSE,1)
hope this helps. 
Since u r new to Spark , it may help to learn using an IDE. I use IntelliJ and 
have many examples posted 
here.https://github.com/sanjaysubramanian/msfx_scala.git 
These are simple silly examples of my learning process :-)
Plus IMHO , if u r planning on learning Spark, I would say YES to Scala and NO 
to Java. Yes its a diff paradigm but being a Java and Hadoop programmer for 
many years, I am excited to learn Scala as the language and use Spark. Its 
exciting.  
regards
sanjay
  From: Aniket Bhatnagar 
 To: Dinesh Vallabhdas ; "user@spark.apache.org" 
 
 Sent: Sunday, January 4, 2015 11:07 AM
 Subject: Re: A spark newbie question
   
Go through spark API documentation. Basically you have to do group by (date, 
message_type) and then do a count. 


On Sun, Jan 4, 2015, 9:58 PM Dinesh Vallabhdas  
wrote:

A spark cassandra newbie question. Thanks in advance for the help.I have a 
cassandra table with 2 columns message_timestamp(timestamp) and 
message_type(text). The data is of the form2014-06-25 12:01:39 "START"
2014-06-25 12:02:39 "START"
2014-06-25 12:02:39 "PAUSE"
2014-06-25 14:02:39 "STOP"
2014-06-25 15:02:39 "START"
2014-06-27 12:01:39 "START"
2014-06-27 11:03:39 "STOP"
2014-06-27 12:03:39 "REWIND"
2014-06-27 12:04:39 "RESTART"
2014-06-27 12:05:39 "PAUSE"
2014-06-27 13:03:39 "REWIND"
2014-06-27 14:03:39 "START"
I want to use spark(using java) to calculate counts of a message_type on a per 
day basis and store it back in cassandra in a new table with 3 columns 
(date,message_type,count).The result table should look like this2014-06-25 
START 3
2014-06-25 STOP 1
2014-06-25 PAUSE 1
2014-06-27 START 2
2014-06-27 STOP 1
2014-06-27 PAUSE 1
2014-06-27 REWIND 2
2014-06-27 RESTART 1
I'm not proficient in scala and would like to use java.




  

Re: Joining by values

2015-01-03 Thread Sanjay Subramanian
so I changed the code tordd1InvIndex.join(rdd2Pair).map(str => 
str._2).groupByKey().map(str => 
(str._1,str._2.toList)).collect().foreach(println)
Now it prints. Don't worry I will work on this to not output as List(...) But I 
am hoping that the JOIN question that @Dilip asked is hopefully answered :-) 
(2,List(1001,1000,1002,1003, 1004,1001,1006,1007))(3,List(1011,1012,1013,1010, 
1007,1009,1005,1008))(1,List(1001,1000,1002,1003, 1011,1012,1013,1010, 
1004,1001,1006,1007, 1007,1009,1005,1008))
  From: Shixiong Zhu 
 To: Sanjay Subramanian  
Cc: dcmovva ; "user@spark.apache.org" 
 
 Sent: Saturday, January 3, 2015 8:15 PM
 Subject: Re: Joining by values
   
call `map(_.toList)` to convert `CompactBuffer` to `List`
Best Regards,Shixiong Zhu
2015-01-04 12:08 GMT+08:00 Sanjay Subramanian 
:



hi Take a look at the code here I 
wrotehttps://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/PairRddJoin.scala

/*rdd1.txt

1~4,5,6,7
2~4,5
3~6,7

rdd2.txt

4~1001,1000,1002,1003
5~1004,1001,1006,1007
6~1007,1009,1005,1008
7~1011,1012,1013,1010

*/
val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-PairRddJoin")
val sc = new SparkContext(sconf)


val rdd1 = "/path/to/rdd1.txt"
val rdd2 = "/path/to/rdd2.txt"

val rdd1InvIndex = sc.textFile(rdd1).map(x => (x.split('~')(0), 
x.split('~')(1))).flatMapValues(str => str.split(',')).map(str => (str._2, 
str._1))
val rdd2Pair = sc.textFile(rdd2).map(str => (str.split('~')(0), 
str.split('~')(1)))
rdd1InvIndex.join(rdd2Pair).map(str => 
str._2).groupByKey().collect().foreach(println)

This outputs the following . I think this may be essentially what u r looking 
for(I have to understand how to NOT print as 
CompactBuffer)(2,CompactBuffer(1001,1000,1002,1003, 1004,1001,1006,1007))
(3,CompactBuffer(1011,1012,1013,1010, 1007,1009,1005,1008))
(1,CompactBuffer(1001,1000,1002,1003, 1011,1012,1013,1010, 1004,1001,1006,1007, 
1007,1009,1005,1008))

  From: Sanjay Subramanian 
 To: dcmovva ; "user@spark.apache.org" 
 
 Sent: Saturday, January 3, 2015 12:19 PM
 Subject: Re: Joining by values
   
This is my design. Now let me try and code it in Spark.
rdd1.txt =1~4,5,6,72~4,53~6,7
rdd2.txt 
4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010
TRANSFORM 1===map each value to key (like an inverted 
index)4~15~16~17~15~24~26~37~3
TRANSFORM 2===Join keys in transform 1 and 
rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010
TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 
1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010
TRANSFORM 4===join by key 
1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010

 

 From: dcmovva 
 To: user@spark.apache.org 
 Sent: Saturday, January 3, 2015 10:10 AM
 Subject: Joining by values
   
I have a two pair RDDs in spark like this

rdd1 = (1 -> [4,5,6,7])
  (2 -> [4,5])
  (3 -> [6,7])


rdd2 = (4 -> [1001,1000,1002,1003])
  (5 -> [1004,1001,1006,1007])
  (6 -> [1007,1009,1005,1008])
  (7 -> [1011,1012,1013,1010])
I would like to combine them to look like this.

joinedRdd = (1 ->
[1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013])
        (2 -> [1000,1001,1002,1003,1004,1006,1007])
        (3 -> [1005,1007,1008,1009,1010,1011,1012,1013])


Can someone suggest me how to do this.

Thanks Dilip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   

   



  

Re: Joining by values

2015-01-03 Thread Sanjay Subramanian
hi Take a look at the code here I 
wrotehttps://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/PairRddJoin.scala

/*rdd1.txt

1~4,5,6,7
2~4,5
3~6,7

rdd2.txt

4~1001,1000,1002,1003
5~1004,1001,1006,1007
6~1007,1009,1005,1008
7~1011,1012,1013,1010

*/
val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-PairRddJoin")
val sc = new SparkContext(sconf)


val rdd1 = "/path/to/rdd1.txt"
val rdd2 = "/path/to/rdd2.txt"

val rdd1InvIndex = sc.textFile(rdd1).map(x => (x.split('~')(0), 
x.split('~')(1))).flatMapValues(str => str.split(',')).map(str => (str._2, 
str._1))
val rdd2Pair = sc.textFile(rdd2).map(str => (str.split('~')(0), 
str.split('~')(1)))
rdd1InvIndex.join(rdd2Pair).map(str => 
str._2).groupByKey().collect().foreach(println)

This outputs the following . I think this may be essentially what u r looking 
for(I have to understand how to NOT print as 
CompactBuffer)(2,CompactBuffer(1001,1000,1002,1003, 1004,1001,1006,1007))
(3,CompactBuffer(1011,1012,1013,1010, 1007,1009,1005,1008))
(1,CompactBuffer(1001,1000,1002,1003, 1011,1012,1013,1010, 1004,1001,1006,1007, 
1007,1009,1005,1008))

  From: Sanjay Subramanian 
 To: dcmovva ; "user@spark.apache.org" 
 
 Sent: Saturday, January 3, 2015 12:19 PM
 Subject: Re: Joining by values
   
This is my design. Now let me try and code it in Spark.
rdd1.txt =1~4,5,6,72~4,53~6,7
rdd2.txt 
4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010
TRANSFORM 1===map each value to key (like an inverted 
index)4~15~16~17~15~24~26~37~3
TRANSFORM 2===Join keys in transform 1 and 
rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010
TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 
1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010
TRANSFORM 4===join by key 
1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010

 

 From: dcmovva 
 To: user@spark.apache.org 
 Sent: Saturday, January 3, 2015 10:10 AM
 Subject: Joining by values
   
I have a two pair RDDs in spark like this

rdd1 = (1 -> [4,5,6,7])
  (2 -> [4,5])
  (3 -> [6,7])


rdd2 = (4 -> [1001,1000,1002,1003])
  (5 -> [1004,1001,1006,1007])
  (6 -> [1007,1009,1005,1008])
  (7 -> [1011,1012,1013,1010])
I would like to combine them to look like this.

joinedRdd = (1 ->
[1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013])
        (2 -> [1000,1001,1002,1003,1004,1006,1007])
        (3 -> [1005,1007,1008,1009,1010,1011,1012,1013])


Can someone suggest me how to do this.

Thanks Dilip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   

  

Re: Joining by values

2015-01-03 Thread Sanjay Subramanian
This is my design. Now let me try and code it in Spark.
rdd1.txt =1~4,5,6,72~4,53~6,7
rdd2.txt 
4~1001,1000,1002,10035~1004,1001,1006,10076~1007,1009,1005,10087~1011,1012,1013,1010
TRANSFORM 1===map each value to key (like an inverted 
index)4~15~16~17~15~24~26~37~3
TRANSFORM 2===Join keys in transform 1 and 
rdd24~1,1001,1000,1002,10034~2,1001,1000,1002,10035~1,1004,1001,1006,10075~2,1004,1001,1006,10076~1,1007,1009,1005,10086~3,1007,1009,1005,10087~1,1011,1012,1013,10107~3,1011,1012,1013,1010
TRANSFORM 3===Split key in transform 2 with "~" and keep key(1) i.e. 
1,2,31~1001,1000,1002,10032~1001,1000,1002,10031~1004,1001,1006,10072~1004,1001,1006,10071~1007,1009,1005,10083~1007,1009,1005,10081~1011,1012,1013,10103~1011,1012,1013,1010
TRANSFORM 4===join by key 
1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,10102~1001,1000,1002,1003,1004,1001,1006,10073~1007,1009,1005,1008,1011,1012,1013,1010

  From: dcmovva 
 To: user@spark.apache.org 
 Sent: Saturday, January 3, 2015 10:10 AM
 Subject: Joining by values
   
I have a two pair RDDs in spark like this

rdd1 = (1 -> [4,5,6,7])
  (2 -> [4,5])
  (3 -> [6,7])


rdd2 = (4 -> [1001,1000,1002,1003])
  (5 -> [1004,1001,1006,1007])
  (6 -> [1007,1009,1005,1008])
  (7 -> [1011,1012,1013,1010])
I would like to combine them to look like this.

joinedRdd = (1 ->
[1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013])
        (2 -> [1000,1001,1002,1003,1004,1006,1007])
        (3 -> [1005,1007,1008,1009,1010,1011,1012,1013])


Can someone suggest me how to do this.

Thanks Dilip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Re: saveAsTextFile

2015-01-03 Thread Sanjay Subramanian

@lailaBased on the error u mentioned in the nabble link below, it seems like 
there are no permissions to write to HDFS. So this is possibly why 
saveAsTextFile is failing.

  From: Pankaj Narang 
 To: user@spark.apache.org 
 Sent: Saturday, January 3, 2015 4:07 AM
 Subject: Re: saveAsTextFile
   
If you can paste the code here I can certainly help.

Also confirm the version of spark you are using

Regards
Pankaj 
Infoshore Software 
India



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   

Re: FlatMapValues

2015-01-02 Thread Sanjay Subramanian
OK this is how I solved it. Not elegant at all but works and I need to move 
ahead at this time.Converting to pair RDD is now not required.
reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 10 && !fields(0).contains("VAERS_ID")) {
((fields(0)+","+fields(1)+"\t"+fields(0)+","+fields(3)+"\t"+fields(0)+","+fields(5)+"\t"+fields(0)+","+fields(7)+"\t"+fields(0)+","+fields(9)))
  }
  else {
("")
  }
}).flatMap(str => str.split('\t')).filter(line => 
line.toString.length() > 0).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)

  From: Sanjay Subramanian 
 To: Hitesh Khamesra  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 12:39 PM
 Subject: Re: FlatMapValues
   
thanks let me try that out
 

 From: Hitesh Khamesra 
 To: Sanjay Subramanian  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 9:46 AM
 Subject: Re: FlatMapValues
   
How about this..apply flatmap on per line. And in that function, parse each 
line and return all the colums as per your need.


On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian 
 wrote:

hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+f

Re: FlatMapValues

2015-01-01 Thread Sanjay Subramanian
thanks let me try that out
  From: Hitesh Khamesra 
 To: Sanjay Subramanian  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 9:46 AM
 Subject: Re: FlatMapValues
   
How about this..apply flatmap on per line. And in that function, parse each 
line and return all the colums as per your need.


On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian 
 wrote:

hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? 
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   



  

Re: FlatMapValues

2014-12-31 Thread Sanjay Subramanian
hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? 
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Re: FlatMapValues

2014-12-31 Thread Sanjay Subramanian
Hey Kapil, Fernando
Thanks for your mail.
[1] Fernando, if I don't use an "if" logic inside the "map" then if I have 
lines of input data that have less fields than I am expecting I get 
ArrayOutOfBounds exception. so the "if" is to safeguard against that. 
[2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying 
that flatMapValues is not defined.
In fact when I used your snippet , the code still does not compile 
Error:(36, 57) value flatMapValues is not a member of 
org.apache.spark.rdd.RDD[(String, String)]                }).filter(pair => 
pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)            
                                            ^ 

My pom.xml looks like this 

   org.apache.spark
   spark-core_2.10
   1.2.0


   org.apache.spark
   spark-sql_2.10
   1.2.0


[3] To summarize all I want is to convert 

SUMMARY===when a dataset looks like the following 
1,red,blue,green2,yellow,violet,pink
I want to output the following and currently not able to
1,red1,blue1,green2,yellow2,violet2,pink

thanks

regards
sanjay

  From: Fernando O. 
 To: Kapil Malik  
Cc: Sanjay Subramanian ; "user@spark.apache.org" 
 
 Sent: Wednesday, December 31, 2014 6:06 AM
 Subject: Re: FlatMapValues
   
Hi Sanjay,
Doing an if inside a Map sounds like a bad idea, it seems like you actually 
want to filter and then apply map



On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik  wrote:

Hi Sanjay, I tried running your code on spark shell piece by piece – // 
Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = 
“025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) 
val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin 
with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so 
goodval r3 = r2.map(fields => {  if (fields.length >= 11 && 
!fields(0).contains("VAERS_ID")) {    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))//
 Returns a pair (String, String), good  }  else {    ""// Returns a String, bad 
 }  })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues 
since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I 
am surprised how your code compiled correctly.  The following changes in your 
snippet make it work as intended - reacRdd.map(line => 
line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ("","")
  }
  }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please 
note that this too saves lines like (025126,Chills),i.e. with opening and 
closing brackets ( and ). If you want to get rid of them, better do another map 
operation to map pair to String. Kapil From: Sanjay Subramanian 
[mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues hey guys  My dataset is like this  
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is 
==025126,Chills025126,Injection site oedema025126,Injection 
site reaction025126,Malaise025126,Myalgia My code is as follows but the 
flatMapValues does not work even after I have created the pair 
RDD.reacRdd.map(line
 => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + 
outFile)
 thanks sanjay



  

FlatMapValues

2014-12-31 Thread Sanjay Subramanian
hey guys 
My dataset is like this 
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is ==025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia

My code is as follows but the flatMapValues does not work even after I have 
created the pair 
RDD.reacRdd.map(line
 => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {

(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + 
outFile)
thanks
sanjay

Re: How to identify erroneous input record ?

2014-12-24 Thread Sanjay Subramanian
Although not elegantly I got the output via my code but totally agree on the 
parsing 5 times (thats really bad).Will add your suggested logic and check it 
out. I have a "long" way to the finish line. I am re-architecting my entire 
hadoop code and getting it onto spark.
Check out what I do at www.medicalsidefx.orgPrimarily an iPhone app but 
underlying is Lucene, Hadoop and hopefully soon in 2015 - Spark :-)  
  From: Sean Owen 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 24, 2014 8:56 AM
 Subject: Re: How to identify erroneous input record ?
   
I don't believe that works since your map function does not return a
value for lines shorter than 13 tokens. You should use flatMap and
Some/None. (You probably want to not parse the string 5 times too.)

val demoRddFilterMap = demoRddFilter.flatMap { line =>
  val tokens = line.split('$')
  if (tokens.length >= 13) {
    val parsed = tokens(0) + "~" + tokens(5) + "~" + tokens(11) + "~"
+ tokens(12)
    Some(parsed)
  } else {
    None
  }
}



On Wed, Dec 24, 2014 at 4:35 PM, Sanjay Subramanian
 wrote:
> DOH Looks like I did not have enough coffee before I asked this :-)
> I added the if statement...
>
> var demoRddFilter = demoRdd.filter(line =>
> !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") ||
> !line.contains("primaryid$caseid$caseversion"))
> var demoRddFilterMap = demoRddFilter.map(line => {
>  if (line.split('$').length >= 13){
>    line.split('$')(0) + "~" + line.split('$')(5) + "~" +
> line.split('$')(11) + "~" + line.split('$')(12)
>  }
> })
>
>
> 
> From: Sanjay Subramanian 
> To: "user@spark.apache.org" 
> Sent: Wednesday, December 24, 2014 8:28 AM
> Subject: How to identify erroneous input record ?
>
> hey guys
>
> One of my input records has an problem that makes the code fail.
>
> var demoRddFilter = demoRdd.filter(line =>
> !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") ||
> !line.contains("primaryid$caseid$caseversion"))
>
> var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" +
> line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12))
>
> demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile)
>
>
> This is possibly happening because perhaps one input record may not have 13
> fields.
>
> If this were Hadoop mapper code , I have 2 ways to solve this
>
> 1. test the number of fields of each line before applying the map function
>
> 2. enclose the mapping function in a try catch block so that the mapping
> function only fails for the erroneous record
>
> How do I implement 1. or 2. in the Spark code ?
>
> Thanks
>
>
> sanjay
>
>
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Re: How to identify erroneous input record ?

2014-12-24 Thread Sanjay Subramanian
DOH Looks like I did not have enough coffee before I asked this :-) I added the 
if statement...var demoRddFilter = demoRdd.filter(line => 
!line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || 
!line.contains("primaryid$caseid$caseversion"))
var demoRddFilterMap = demoRddFilter.map(line => {
  if (line.split('$').length >= 13){
line.split('$')(0) + "~" + line.split('$')(5) + "~" + line.split('$')(11) + 
"~" + line.split('$')(12)
  }
})

  From: Sanjay Subramanian 
 To: "user@spark.apache.org"  
 Sent: Wednesday, December 24, 2014 8:28 AM
 Subject: How to identify erroneous input record ?
   
hey guys 
One of my input records has an problem that makes the code fail.
var demoRddFilter = demoRdd.filter(line => 
!line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || 
!line.contains("primaryid$caseid$caseversion"))

var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + 
line.split('$')(5) + "~" + line.split('$')(11) + "~" + 
line.split('$')(12))demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + 
outFile)
This is possibly happening because perhaps one input record may not have 13 
fields.If this were Hadoop mapper code , I have 2 ways to solve this 1. test 
the number of fields of each line before applying the map function2. enclose 
the mapping function in a try catch block so that the mapping function only 
fails for the erroneous recordHow do I implement 1. or 2. in the Spark code 
?Thanks
sanjay

  #yiv8750085330 #yiv8750085330 -- filtered {font-family:Helvetica;panose-1:2 
11 6 4 2 2 2 2 2 4;}#yiv8750085330 filtered {panose-1:2 4 5 3 5 4 6 3 2 
4;}#yiv8750085330 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 
4;}#yiv8750085330 p.yiv8750085330MsoNormal, #yiv8750085330 
li.yiv8750085330MsoNormal, #yiv8750085330 div.yiv8750085330MsoNormal 
{margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv8750085330 a:link, 
#yiv8750085330 span.yiv8750085330MsoHyperlink 
{color:#0563C1;text-decoration:underline;}#yiv8750085330 a:visited, 
#yiv8750085330 span.yiv8750085330MsoHyperlinkFollowed 
{color:#954F72;text-decoration:underline;}#yiv8750085330 
p.yiv8750085330MsoListParagraph, #yiv8750085330 
li.yiv8750085330MsoListParagraph, #yiv8750085330 
div.yiv8750085330MsoListParagraph 
{margin-top:0cm;margin-right:0cm;margin-bottom:0cm;margin-left:36.0pt;margin-bottom:.0001pt;font-size:11.0pt;}#yiv8750085330
 span.yiv8750085330EstiloCorreo17 {color:windowtext;}#yiv8750085330 
.yiv8750085330MsoChpDefault {}#yiv8750085330 filtered {margin:70.85pt 3.0cm 
70.85pt 3.0cm;}#yiv8750085330 div.yiv8750085330WordSection1 {}#yiv8750085330 
filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 
filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 
filtered {}#yiv8750085330 filtered {}#yiv8750085330 filtered {}#yiv8750085330 
filtered {}#yiv8750085330 ol {margin-bottom:0cm;}#yiv8750085330 ul 
{margin-bottom:0cm;}#yiv8750085330 

  

How to identify erroneous input record ?

2014-12-24 Thread Sanjay Subramanian
hey guys 
One of my input records has an problem that makes the code fail.
var demoRddFilter = demoRdd.filter(line => 
!line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || 
!line.contains("primaryid$caseid$caseversion"))

var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + 
line.split('$')(5) + "~" + line.split('$')(11) + "~" + 
line.split('$')(12))demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + 
outFile)
This is possibly happening because perhaps one input record may not have 13 
fields.If this were Hadoop mapper code , I have 2 ways to solve this 1. test 
the number of fields of each line before applying the map function2. enclose 
the mapping function in a try catch block so that the mapping function only 
fails for the erroneous recordHow do I implement 1. or 2. in the Spark code 
?Thanks
sanjay  

Re: Spark or MR, Scala or Java?

2014-11-23 Thread Sanjay Subramanian
Thanks a ton Ashishsanjay
  From: Ashish Rangole 
 To: Sanjay Subramanian  
Cc: Krishna Sankar ; Sean Owen ; 
Guillermo Ortiz ; user  
 Sent: Sunday, November 23, 2014 11:03 AM
 Subject: Re: Spark or MR, Scala or Java?
   
This being a very broad topic, a discussion can quickly get subjective. I'll 
try not to deviate from my experiences and observations to keep this thread 
useful to those looking for answers.
I have used Hadoop MR (with Hive, MR Java apis, Cascading and Scalding) as well 
as Spark (since v 0.6) in Scala. I learnt Scala for using Spark. My 
observations are below.
Spark and Hadoop MR:1. There doesn't have to be a dichotomy between Hadoop 
ecosystem and Spark since Spark is a part of it.
2. Spark or Hadoop MR, there is no getting away from learning how partitioning, 
input splits, and shuffle process work. In order to optimize performance, 
troubleshoot and design software one must know these. I recommend reading first 
6-7 chapters of "Hadoop The definitive Guide" book to develop initial 
understanding. Indeed knowing a couple of divide and conquer algorithms is a 
pre-requisite and I assume everyone on this mailing list is very familiar :)
3. Having used a lot of different APIs and layers of abstraction for Hadoop MR, 
my experience progressing from MR Java API --> Cascading --> Scalding is that 
each new API looks "simpler" than the previous one. However, Spark API and 
abstraction has been simplest. Not only for me but those who I have seen start 
with Hadoop MR or Spark first. It is easiest to get started and become 
productive with Spark with the exception of Hive for those who are already 
familiar with SQL. Spark's ease of use is critical for teams starting out with 
Big Data.
4. It is also extremely simple to chain multi-stage jobs in Spark, you do it 
without even realizing by operating over RDDs. In Hadoop MR, one has to handle 
it explicitly.
5. Spark has built-in support for graph algorithms (including Bulk Synchronous 
Parallel processing BSP algorithms e.g. Pregel), Machine Learning and Stream 
processing. In Hadoop MR you need a separate library/Framework for each and it 
is non-trivial to combine multiple of these in the same application. This is 
huge!
6. In Spark one does have to learn how to configure the memory and other 
parameters of their cluster. Just to be clear, similar parameters exist in MR 
as well (e.g. shuffle memory parameters) but you don't *have* to learn about 
tuning them until you have jobs with larger data size jobs. In Spark you learn 
this by reading the configuration and tuning documentation followed by 
experimentation. This is an area of Spark where things can be better.
Java or Scala : I knew Java already yet I learnt Scala when I came across 
Spark. As others have said, you can get started with a little bit of Scala and 
learn more as you progress. Once you have started using Scala for a few weeks 
you would want to stay with it instead of going back to Java. Scala is arguably 
more elegant and less verbose than Java which translates into higher developer 
productivity and more maintainable code.
Myth: Spark is for in-memory processing *only*. This is a common beginner 
misunderstanding.
Sanjay: Spark uses Hadoop API for performing I/O from file systems (local, 
HDFS, S3 etc). Therefore you can use the same Hadoop InputFormat and 
RecordReader with Spark that you use with Hadoop for your multi-line record 
format. See SparkContext APIs. Just like Hadoop, you will need to make sure 
that your files are split at record boundaries.
Hope this is helpful.



On Sun, Nov 23, 2014 at 8:35 AM, Sanjay Subramanian 
 wrote:

I am a newbie as well to Spark. Been Hadoop/Hive/Oozie programming extensively 
before this. I use Hadoop(Java MR code)/Hive/Impala/Presto on a daily basis.
To get me jumpstarted into Spark I started this gitHub where there is 
"IntelliJ-ready-To-run" code (simple examples of jon, sparksql etc) and I will 
keep adding to that. I dont know scala and I am learning that too to help me 
use Spark better.https://github.com/sanjaysubramanian/msfx_scala.git

Philosophically speaking its possibly not a good idea to take an either/or 
approach to technology...Like its never going to be either RDBMS or NOSQL (If 
the Cassandra behind FB shows 100 fewer likes instead of 1000 on you Photo a 
day for some reason u may not be as upset...but if the Oracle/Db2 systems 
behind Wells Fargo show $100 LESS in your account due to an database error, you 
will be PANIC-ing).

So its the same case with Spark or Hadoop. I can speak for myself. I have a 
usecase for processing old logs that are multiline (i.e. they have a 
[begin_timestamp_logid] and [end_timestamp_logid] and have many lines in  
between. In Java Hadoop I created custom RecordReaders to solve this. I still 
dont know how to do this in Spark. Till that time I am possibly gonna run the 
Hadoop code within Oozie in production. 
Also my

Re: Spark or MR, Scala or Java?

2014-11-23 Thread Sanjay Subramanian
I am a newbie as well to Spark. Been Hadoop/Hive/Oozie programming extensively 
before this. I use Hadoop(Java MR code)/Hive/Impala/Presto on a daily basis.
To get me jumpstarted into Spark I started this gitHub where there is 
"IntelliJ-ready-To-run" code (simple examples of jon, sparksql etc) and I will 
keep adding to that. I dont know scala and I am learning that too to help me 
use Spark better.https://github.com/sanjaysubramanian/msfx_scala.git

Philosophically speaking its possibly not a good idea to take an either/or 
approach to technology...Like its never going to be either RDBMS or NOSQL (If 
the Cassandra behind FB shows 100 fewer likes instead of 1000 on you Photo a 
day for some reason u may not be as upset...but if the Oracle/Db2 systems 
behind Wells Fargo show $100 LESS in your account due to an database error, you 
will be PANIC-ing).

So its the same case with Spark or Hadoop. I can speak for myself. I have a 
usecase for processing old logs that are multiline (i.e. they have a 
[begin_timestamp_logid] and [end_timestamp_logid] and have many lines in  
between. In Java Hadoop I created custom RecordReaders to solve this. I still 
dont know how to do this in Spark. Till that time I am possibly gonna run the 
Hadoop code within Oozie in production. 
Also my current task is evangelizing Big Data at my company. So the tech people 
I can educate with Hadoop and Spark and they would learn that but not the 
business intelligence analysts. They love SQL so I have to educate them using 
Hive , Presto, Impala...so the question is what is your task or tasks ?

Sorry , a long non technical answer to your question...
Make sense ?
sanjay     
  From: Krishna Sankar 
 To: Sean Owen  
Cc: Guillermo Ortiz ; user  
 Sent: Saturday, November 22, 2014 4:53 PM
 Subject: Re: Spark or MR, Scala or Java?
   
Adding to already interesting answers:   
   - "Is there any case where MR is better than Spark? I don't know what cases 
I should be used Spark by MR. When is MR faster than Spark?"   

   
   - Many. MR would be better (am not saying faster ;o)) for 
   
   - Very large dataset,
   - Multistage map-reduce flows,
   - Complex map-reduce semantics
   
   - Spark is definitely better for the classic iterative,interactive workloads.
   - Spark is very effective for implementing the concepts of in-memory 
datasets & real time analytics 
   
   - Take a look at the Lambda architecture
   
   - Also checkout how Ooyala is using Spark in multiple layers & 
configurations. They also have MR in many places
   - In our case, we found Spark very effective for ELT - we would have used MR 
earlier
   
   -  "I know Java, is it worth it to learn Scala for programming to Spark or 
it's okay just with Java?"   

   
   - Java will work fine. Especially when Java 8 becomes the norm, we will get 
back some of the elegance
   - I, personally, like Scala & Python lot better than Java. Scala is a lot 
more elegant, but compilations, IDE integration et al are still clunky
   - One word of caution - stick with one language as much as 
possible-shuffling between Java & Scala is not fun
Cheers & HTH


On Sat, Nov 22, 2014 at 8:26 AM, Sean Owen  wrote:

MapReduce is simpler and narrower, which also means it is generally lighter 
weight, with less to know and configure, and runs more predictably. If you have 
a job that is truly just a few maps, with maybe one reduce, MR will likely be 
more efficient. Until recently its shuffle has been more developed and offers 
some semantics the Spark shuffle does not.I suppose it integrates with tools 
like Oozie, that Spark does not. I suggest learning enough Scala to use Spark 
in Scala. The amount you need to know is not large.(Mahout MR based 
implementations do not run on Spark and will not. They have been removed 
instead.)On Nov 22, 2014 3:36 PM, "Guillermo Ortiz"  
wrote:

Hello,

I'm a newbie with Spark but I've been working with Hadoop for a while.
I have two questions.

Is there any case where MR is better than Spark? I don't know what
cases I should be used Spark by MR. When is MR faster than Spark?

The other question is, I know Java, is it worth it to learn Scala for
programming to Spark or it's okay just with Java? I have done a little
piece of code with Java because I feel more confident with it,, but I
seems that I'm missed something

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org






  

Re: Extracting values from a Collecion

2014-11-22 Thread Sanjay Subramanian
I could not iterate thru the set but changed the code to get what I was looking 
for(Not elegant but gets me going)
package org.medicalsidefx.common.utils

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

import scala.collection.mutable.ArrayBuffer

/**
 * Created by sansub01 on 11/19/14.
 */
object TwoWayJoin2 {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: TwoWayJoinCount")
  System.exit(12)
}

val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-TwoWayJoin")

val sc = new SparkContext(sconf)

val file1 = args(0)
val file2 = args(1)

val file1Rdd = sc.textFile(file1).map(x => (x.split(",")(0), 
x.split(",")(1)))
val file2Rdd = sc.textFile(file2).map(x => (x.split(",")(0), 
x.split(",")(1))).reduceByKey((v1,v2) => v1+"|"+v2)

file1Rdd.collect().foreach(println)
file2Rdd.collect().foreach(println)

file1Rdd.join(file2Rdd).collect().foreach( e => 
println(e.toString.replace("(","").replace(")","")))

  }
}

  From: Jey Kottalam 
 To: Sanjay Subramanian  
Cc: Arun Ahuja ; Andrew Ash ; user 
 
 Sent: Friday, November 21, 2014 10:07 PM
 Subject: Extracting values from a Collecion
   
Hi Sanjay,

These are instances of the standard Scala collection type "Set", and its 
documentation can be found by googling the phrase "scala set".

Hope that helps,
-Jey



On Fri, Nov 21, 2014 at 10:41 AM, Sanjay Subramanian 
 wrote:
> hey guys
>
> names.txt
> =
> 1,paul
> 2,john
> 3,george
> 4,ringo
>
>
> songs.txt
> =
> 1,Yesterday
> 2,Julia
> 3,While My Guitar Gently Weeps
> 4,With a Little Help From My Friends
> 1,Michelle
> 2,Nowhere Man
> 3,Norwegian Wood
> 4,Octopus's Garden
>
> What I want to do is real simple
>
> Desired Output
> ==
> (4,(With a Little Help From My Friends, Octopus's Garden))
> (2,(Julia, Nowhere Man))
> (3,(While My Guitar Gently Weeps, Norwegian Wood))
> (1,(Yesterday, Michelle))
>
>
> My Code
> ===
> val file1Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2RddGrp = file2Rdd.groupByKey()
> file2Rdd.groupByKey().mapValues(names =>
> names.toSet).collect().foreach(println)
>
> Result
> ===
> (4,Set(With a Little Help From My Friends, Octopus's Garden))
> (2,Set(Julia, Nowhere Man))
> (3,Set(While My Guitar Gently Weeps, Norwegian Wood))
> (1,Set(Yesterday, Michelle))
>
>
> How can I extract values from the Set ?
>
> Thanks
>
> sanjay
>



  

Re: Extracting values from a Collecion

2014-11-22 Thread Sanjay Subramanian
Thanks Jeyregardssanjay
  From: Jey Kottalam 
 To: Sanjay Subramanian  
Cc: Arun Ahuja ; Andrew Ash ; user 
 
 Sent: Friday, November 21, 2014 10:07 PM
 Subject: Extracting values from a Collecion
   
Hi Sanjay,

These are instances of the standard Scala collection type "Set", and its 
documentation can be found by googling the phrase "scala set".

Hope that helps,
-Jey



On Fri, Nov 21, 2014 at 10:41 AM, Sanjay Subramanian 
 wrote:
> hey guys
>
> names.txt
> =
> 1,paul
> 2,john
> 3,george
> 4,ringo
>
>
> songs.txt
> =
> 1,Yesterday
> 2,Julia
> 3,While My Guitar Gently Weeps
> 4,With a Little Help From My Friends
> 1,Michelle
> 2,Nowhere Man
> 3,Norwegian Wood
> 4,Octopus's Garden
>
> What I want to do is real simple
>
> Desired Output
> ==
> (4,(With a Little Help From My Friends, Octopus's Garden))
> (2,(Julia, Nowhere Man))
> (3,(While My Guitar Gently Weeps, Norwegian Wood))
> (1,(Yesterday, Michelle))
>
>
> My Code
> ===
> val file1Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2RddGrp = file2Rdd.groupByKey()
> file2Rdd.groupByKey().mapValues(names =>
> names.toSet).collect().foreach(println)
>
> Result
> ===
> (4,Set(With a Little Help From My Friends, Octopus's Garden))
> (2,Set(Julia, Nowhere Man))
> (3,Set(While My Guitar Gently Weeps, Norwegian Wood))
> (1,Set(Yesterday, Michelle))
>
>
> How can I extract values from the Set ?
>
> Thanks
>
> sanjay
>



  

Re: Extracting values from a Collecion

2014-11-21 Thread Sanjay Subramanian
I am sorry the last line in the code is 
file1Rdd.join(file2RddGrp.mapValues(names => 
names.toSet)).collect().foreach(println)
so 
My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file1Rdd.join(file2RddGrp.mapValues(names => 
names.toSet)).collect().foreach(println)
Result===(4,(ringo,Set(With a Little Help From My Friends, Octopus's 
Garden)))(2,(john,Set(Julia, Nowhere Man)))(3,(george,Set(While My Guitar 
Gently Weeps, Norwegian Wood)))(1,(paul,Set(Yesterday, Michelle)))
Again the question is how do I extract values from the Set ?
thanks
sanjay  From: Sanjay Subramanian 
 To: Arun Ahuja ; Andrew Ash  
Cc: user  
 Sent: Friday, November 21, 2014 10:41 AM
 Subject: Extracting values from a Collecion
   
hey guys
names.txt= 1,paul2,john3,george4,ringo 

songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a 
Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's 
Garden
What I want to do is real simple 
Desired Output ==(4,(With a Little Help From My Friends, Octopus's 
Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian 
Wood))(1,(Yesterday, Michelle))

My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => 
names.toSet).collect().foreach(println)

Result===(4,Set(With a Little Help From My Friends, Octopus's 
Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, 
Norwegian Wood))(1,Set(Yesterday, Michelle))

How can I extract values from the Set ?


Thanks
sanjay


  

Extracting values from a Collecion

2014-11-21 Thread Sanjay Subramanian
hey guys
names.txt= 1,paul2,john3,george4,ringo 

songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a 
Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's 
Garden
What I want to do is real simple 
Desired Output ==(4,(With a Little Help From My Friends, Octopus's 
Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian 
Wood))(1,(Yesterday, Michelle))

My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => 
names.toSet).collect().foreach(println)

Result===(4,Set(With a Little Help From My Friends, Octopus's 
Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, 
Norwegian Wood))(1,Set(Yesterday, Michelle))

How can I extract values from the Set ?
Thanks
sanjay


Re: Code works in Spark-Shell but Fails inside IntelliJ

2014-11-20 Thread Sanjay Subramanian
Not using SBT...I have been creating and adapting various Spark Scala examples 
and put it here and all u have to do is git clone and import as maven project 
into IntelliJhttps://github.com/sanjaysubramanian/msfx_scala.git

Sidenote , IMHO, IDEs encourage the "new to Spark/Scala developers" to quickly 
test , experiment and debug code.
  From: Jay Vyas 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Thursday, November 20, 2014 4:53 PM
 Subject: Re: Code works in Spark-Shell but Fails inside IntelliJ
   
This seems pretty standard: your IntelliJ classpath isn't matched to the 
correct ones that are used in spark shell
Are you using the SBT plugin? If not how are you putting deps into IntelliJ?



On Nov 20, 2014, at 7:35 PM, Sanjay Subramanian 
 wrote:


hey guys
I am at AmpCamp 2014 at UCB right now :-) 
Funny Issue...
This code works in Spark-Shell but throws a funny exception in IntelliJ
CODE
val sqlContext = new 
org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString",
 "true")val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT
 username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username 
ORDER BY cnt DESC LIMIT 10").collect().foreach(println)
RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot
 NG,353]

INTELLIJ CODE=object ParquetSql {
  def main(args: Array[String]) {

val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql")
val sc = new SparkContext(sconf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")
wikiData.registerTempTable("wikiData")
val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM 
wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10")
results.collect().foreach(println)
  }

}

INTELLIJ ERROR==Exception in thread "main" 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.spark.serializer.Serializer, but class was expected at 
org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244)
 at 
org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48)
 at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at 
org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at 
org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)


 


  

Re: Code works in Spark-Shell but Fails inside IntelliJ

2014-11-20 Thread Sanjay Subramanian
Awesome that was it...Hit me with with a hockey stick :-) 
unmatched Spark Core (1.0.0) and SparkSql (1.1.1) versionsCorrected that to 
1.1.0 on both 

   org.apache.spark
   spark-core_2.10
   1.0.0


   org.apache.spark
   spark-sql_2.10
   1.1.0
 

  From: Michael Armbrust 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Thursday, November 20, 2014 4:49 PM
 Subject: Re: Code works in Spark-Shell but Fails inside IntelliJ
   
Looks like intelij might be trying to load the wrong version of spark?


On Thu, Nov 20, 2014 at 4:35 PM, Sanjay Subramanian 
 wrote:

hey guys
I am at AmpCamp 2014 at UCB right now :-) 
Funny Issue...
This code works in Spark-Shell but throws a funny exception in IntelliJ
CODE
val sqlContext = new 
org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString",
 "true")val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT
 username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username 
ORDER BY cnt DESC LIMIT 10").collect().foreach(println)
RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot
 NG,353]

INTELLIJ CODE=object ParquetSql {
  def main(args: Array[String]) {

val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql")
val sc = new SparkContext(sconf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")
wikiData.registerTempTable("wikiData")
val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM 
wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10")
results.collect().foreach(println)
  }

}

INTELLIJ ERROR==Exception in thread "main" 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.spark.serializer.Serializer, but class was expected at 
org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244)
 at 
org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48)
 at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at 
org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at 
org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)


 



  

Code works in Spark-Shell but Fails inside IntelliJ

2014-11-20 Thread Sanjay Subramanian
hey guys
I am at AmpCamp 2014 at UCB right now :-) 
Funny Issue...
This code works in Spark-Shell but throws a funny exception in IntelliJ
CODE
val sqlContext = new 
org.apache.spark.sql.SQLContext(sc)sqlContext.setConf("spark.sql.parquet.binaryAsString",
 "true")val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")wikiData.registerTempTable("wikiData")sqlContext.sql("SELECT
 username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username 
ORDER BY cnt DESC LIMIT 10").collect().foreach(println)
RESULTS[Waacstats,2003][Cydebot,949][BattyBot,939][Yobot,890][Addbot,853][Monkbot,668][ChrisGualtieri,438][RjwilmsiBot,387][OccultZone,377][ClueBot
 NG,353]

INTELLIJ CODE=object ParquetSql {
  def main(args: Array[String]) {

val sconf = new 
SparkConf().setMaster("local").setAppName("MedicalSideFx-NamesFoodSql")
val sc = new SparkContext(sconf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
val wikiData = 
sqlContext.parquetFile("/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet")
wikiData.registerTempTable("wikiData")
val results = sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM 
wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10")
results.collect().foreach(println)
  }

}

INTELLIJ ERROR==Exception in thread "main" 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.spark.serializer.Serializer, but class was expected at 
org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244)
 at 
org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48)
 at 
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151)
 at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) 
at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at 
org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at 
org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at 
org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)




Cant start spark-shell in CDH Spark Standalone 1.1.0+cdh5.2.0+56

2014-10-27 Thread Sanjay Subramanian
hey guys
Anyone using CDH Spark StandaloneI installed Spark standalone thru Cloudera 
Manager
$ spark-shell --total-executor-cores 8
/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/bin/../lib/spark/bin/spark-shell:
 line 44: 
/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/bin/utils.sh: No 
such file or directory 
It used to all work in the 5.1.x version of CDH
sanjay

Re: Spark inside Eclipse

2014-10-03 Thread Sanjay Subramanian
So some progress but still errors 

object WordCount {  def main(args: Array[String]) {    if (args.length < 1) {   
   System.err.println("Usage: WordCount ")      System.exit(1)    }    
val conf = new SparkConf().setMaster("local").setAppName(s"Whatever")    val sc 
= new SparkContext(conf);
    val file = args(0)    val counts = sc.textFile(file).       flatMap(line => 
line.split("\\W")).       map(word => (word,1)).       reduceByKey((v1,v2) => 
v1+v2)
    counts.take(10).foreach(println)  }}

The errors I am getting are 14/10/03 18:09:17 INFO spark.SecurityManager: 
Changing view acls to: sansub0114/10/03 18:09:17 INFO spark.SecurityManager: 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(sansub01)Exception in thread "main" 
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at 
akka.util.Collections$EmptyImmutableSeq$.(Collections.scala:15) at 
akka.util.Collections$EmptyImmutableSeq$.(Collections.scala) at 
akka.japi.Util$.immutableSeq(JavaAPI.scala:209) at 
akka.actor.ActorSystem$Settings.(ActorSystem.scala:150) at 
akka.actor.ActorSystemImpl.(ActorSystem.scala:470) at 
akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at 
akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:104) at 
org.apache.spark.SparkEnv$.create(SparkEnv.scala:152) at 
org.apache.spark.SparkContext.(SparkContext.scala:202) at 
com.roberthalf.common.utils.WordCount$.main(WordCount.scala:14) at 
com.roberthalf.common.utils.WordCount.main(WordCount.scala)Caused by: 
java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at 
java.net.URLClassLoader$1.run(URLClassLoader.java:366) at 
java.net.URLClassLoader$1.run(URLClassLoader.java:355) at 
java.security.AccessController.doPrivileged(Native Method) at 
java.net.URLClassLoader.findClass(URLClassLoader.java:354) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:425) at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 12 more
I am gonna keep working to solve this. Meanwhile if u can provide some guidance 
that would be cool 
sanjay    From: Daniel Siegmann 
 To: Ashish Jain  
Cc: Sanjay Subramanian ; "user@spark.apache.org" 
 
 Sent: Thursday, October 2, 2014 6:52 AM
 Subject: Re: Spark inside Eclipse
   
You don't need to do anything special to run in local mode from within Eclipse. 
Just create a simple SparkConf and create a SparkContext from that. I have unit 
tests which execute on a local SparkContext, and they work from inside Eclipse 
as well as SBT.

val conf = new SparkConf().setMaster("local").setAppName(s"Whatever")
val sc = new SparkContext(sparkConf)

Keep in mind you can only have one local SparkContext at a time, otherwise you 
will get some weird errors. If you have tests running sequentially, make sure 
to close the SparkContext in your tear down method. If tests run in parallel 
you'll need to share the SparkContext between tests.

For unit testing, you can make use of SparkContext.parallelize to set up your 
test inputs and RDD.collect to retrieve the outputs.




On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain  wrote:

Hello Sanjay,This can be done, and is a very effective way to debug.1) Compile 
and package your project to get a fat jar
2) In your SparkConf use setJars and give location of this jar. Also set your 
master here as local in SparkConf
3) Use this SparkConf when creating JavaSparkContext
4) Debug your program like you would any normal program.Hope this helps.Thanks
AshishOn Oct 1, 2014 4:35 PM, "Sanjay Subramanian" 
 wrote:

hey guys
Is there a way to run Spark in local mode from within Eclipse.I am running 
Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop 
map/reduce applications from within Eclipse and debug and learn.
thanks
sanjay    




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io

  

Re: Spark inside Eclipse

2014-10-03 Thread Sanjay Subramanian
cool thanks will set this up and report back how things wentregardssanjay  
From: Daniel Siegmann 
 To: Ashish Jain  
Cc: Sanjay Subramanian ; "user@spark.apache.org" 
 
 Sent: Thursday, October 2, 2014 6:52 AM
 Subject: Re: Spark inside Eclipse
   
You don't need to do anything special to run in local mode from within Eclipse. 
Just create a simple SparkConf and create a SparkContext from that. I have unit 
tests which execute on a local SparkContext, and they work from inside Eclipse 
as well as SBT.

val conf = new SparkConf().setMaster("local").setAppName(s"Whatever")
val sc = new SparkContext(sparkConf)

Keep in mind you can only have one local SparkContext at a time, otherwise you 
will get some weird errors. If you have tests running sequentially, make sure 
to close the SparkContext in your tear down method. If tests run in parallel 
you'll need to share the SparkContext between tests.

For unit testing, you can make use of SparkContext.parallelize to set up your 
test inputs and RDD.collect to retrieve the outputs.




On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain  wrote:

Hello Sanjay,This can be done, and is a very effective way to debug.1) Compile 
and package your project to get a fat jar
2) In your SparkConf use setJars and give location of this jar. Also set your 
master here as local in SparkConf
3) Use this SparkConf when creating JavaSparkContext
4) Debug your program like you would any normal program.Hope this helps.Thanks
AshishOn Oct 1, 2014 4:35 PM, "Sanjay Subramanian" 
 wrote:

hey guys
Is there a way to run Spark in local mode from within Eclipse.I am running 
Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop 
map/reduce applications from within Eclipse and debug and learn.
thanks
sanjay    




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io

  

Re: Multiple spark shell sessions

2014-10-01 Thread Sanjay Subramanian
Awesome thanks a TON. It works
There is a clash in the UI port initially but looks like it creates a second UI 
port at 4041 for the second user wanting to use the spark-shell 14/10/01 
17:34:38 INFO JettyUtils: Failed to create UI at port, 4040. Trying 
again.14/10/01 17:34:38 INFO JettyUtils: Error was: 
Failure(java.net.BindException: Address already in use)14/10/01 17:34:38 INFO 
SparkUI: Started SparkUI at http://hadoop02:4041
sanjay
  From: Matei Zaharia 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Wednesday, October 1, 2014 5:19 PM
 Subject: Re: Multiple spark shell sessions
   
You need to set --total-executor-cores to limit how many total cores it grabs 
on the cluster. --executor-cores is just for each individual executor, but it 
will try to launch many of them.
Matei


On Oct 1, 2014, at 4:29 PM, Sanjay Subramanian 
 wrote:

hey guys

I am using  spark 1.0.0+cdh5.1.0+41
When two users try to run "spark-shell" , the first guy's spark-shell shows
active in the 18080 Web UI but the second user shows WAITING and the shell
has a bunch of errors but does go the spark-shell and "sc.master" seems to
point to the correct master.

I tried controlling the number of cores in the "spark-shell" command
--executor-cores 8
Does not work

thanks

sanjay 
   

   



  

Spark inside Eclipse

2014-10-01 Thread Sanjay Subramanian
hey guys
Is there a way to run Spark in local mode from within Eclipse.I am running 
Eclipse Kepler on a Macbook Pro with MavericksLike one can run hadoop 
map/reduce applications from within Eclipse and debug and learn.
thanks
sanjay   

Multiple spark shell sessions

2014-10-01 Thread Sanjay Subramanian
hey guys

I am using  spark 1.0.0+cdh5.1.0+41
When two users try to run "spark-shell" , the first guy's spark-shell shows
active in the 18080 Web UI but the second user shows WAITING and the shell
has a bunch of errors but does go the spark-shell and "sc.master" seems to
point to the correct master.

I tried controlling the number of cores in the "spark-shell" command
--executor-cores 8
Does not work

thanks

sanjay