Re: Spark 1.6.0: substring on df.select

2016-05-11 Thread Raghavendra Pandey
You can create a column with count of /.  Then take max of it and create
that many columns for every row with null fillers.

Raghav
On 11 May 2016 20:37, "Bharathi Raja"  wrote:

Hi,



I have a dataframe column col1 with values something like
“/client/service/version/method”. The number of “/” are not constant.

Could you please help me to extract all methods from the column col1?



In Pig i used SUBSTRING with LAST_INDEX_OF(“/”).



Thanks in advance.

Regards,

Raja


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
On 11 May 2016 02:13, "gpatcham"  wrote:

>

> Hi All,
>
> I'm using --jars option in spark-submit to send 3rd party jars . But I
don't
> see they are actually passed to mesos slaves. Getting Noclass found
> exceptions.
>
> This is how I'm using --jars option
>
> --jars hdfs://namenode:8082/user/path/to/jar
>
> Am I missing something here or what's the correct  way to do ?
>
> Thanks
>
>
>
> --
> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
By any chance, are you using docker to execute?
On 11 May 2016 21:16, "Raghavendra Pandey" 
wrote:

> On 11 May 2016 02:13, "gpatcham"  wrote:
>
> >
>
> > Hi All,
> >
> > I'm using --jars option in spark-submit to send 3rd party jars . But I
> don't
> > see they are actually passed to mesos slaves. Getting Noclass found
> > exceptions.
> >
> > This is how I'm using --jars option
> >
> > --jars hdfs://namenode:8082/user/path/to/jar
> >
> > Am I missing something here or what's the correct  way to do ?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
You have kept 3rd party jars at hdfs. I don't think executors as of today
can download jars from hdfs..  Can you try with a shared directory..
Application jar is downloaded by executors through http server..

-Raghav
On 12 May 2016 00:04, "Giri P"  wrote:

> Yes..They are  reachable. Application jar which I send as argument is at
> same location as third party jar. Application jar is getting uploaded.
>
> On Wed, May 11, 2016 at 10:51 AM, lalit sharma 
> wrote:
>
>> Point to note as per docs as well :
>>
>> *Note that jars or python files that are passed to spark-submit should be
>> URIs reachable by Mesos slaves, as the Spark driver doesn’t automatically
>> upload local jars.**http://spark.apache.org/docs/latest/running-on-mesos.html
>> <http://spark.apache.org/docs/latest/running-on-mesos.html> *
>>
>> On Wed, May 11, 2016 at 10:05 PM, Giri P  wrote:
>>
>>> I'm not using docker
>>>
>>> On Wed, May 11, 2016 at 8:47 AM, Raghavendra Pandey <
>>> raghavendra.pan...@gmail.com> wrote:
>>>
>>>> By any chance, are you using docker to execute?
>>>> On 11 May 2016 21:16, "Raghavendra Pandey" <
>>>> raghavendra.pan...@gmail.com> wrote:
>>>>
>>>>> On 11 May 2016 02:13, "gpatcham"  wrote:
>>>>>
>>>>> >
>>>>>
>>>>> > Hi All,
>>>>> >
>>>>> > I'm using --jars option in spark-submit to send 3rd party jars . But
>>>>> I don't
>>>>> > see they are actually passed to mesos slaves. Getting Noclass found
>>>>> > exceptions.
>>>>> >
>>>>> > This is how I'm using --jars option
>>>>> >
>>>>> > --jars hdfs://namenode:8082/user/path/to/jar
>>>>> >
>>>>> > Am I missing something here or what's the correct  way to do ?
>>>>> >
>>>>> > Thanks
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>> >
>>>>> > -
>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >
>>>>>
>>>>
>>>
>>
>


Re: Executor memory requirement for reduceByKey

2016-05-17 Thread Raghavendra Pandey
Even though it does not sound intuitive,  reduce by key expects all values
for a particular key for a partition to be loaded into memory. So once you
increase the partitions you can run the jobs.


Re: My notes on Spark Performance & Tuning Guide

2016-05-17 Thread Raghavendra Pandey
Can you please send me as well.

Thanks
Raghav
On 12 May 2016 20:02, "Tom Ellis"  wrote:

> I would like to also Mich, please send it through, thanks!
>
> On Thu, 12 May 2016 at 15:14 Alonso Isidoro  wrote:
>
>> Me too, send me the guide.
>>
>> Enviado desde mi iPhone
>>
>> El 12 may 2016, a las 12:11, Ashok Kumar > > escribió:
>>
>> Hi Dr Mich,
>>
>> I will be very keen to have a look at it and review if possible.
>>
>> Please forward me a copy
>>
>> Thanking you warmly
>>
>>
>> On Thursday, 12 May 2016, 11:08, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> Hi Al,,
>>
>>
>> Following the threads in spark forum, I decided to write up on
>> configuration of Spark including allocation of resources and configuration
>> of driver, executors, threads, execution of Spark apps and general
>> troubleshooting taking into account the allocation of resources for Spark
>> applications and OS tools at the disposal.
>>
>> Since the most widespread configuration as I notice is with "Spark
>> Standalone Mode", I have decided to write these notes starting with
>> Standalone and later on moving to Yarn
>>
>>
>>- *Standalone *– a simple cluster manager included with Spark that
>>makes it easy to set up a cluster.
>>- *YARN* – the resource manager in Hadoop 2.
>>
>>
>> I would appreciate if anyone interested in reading and commenting to get
>> in touch with me directly on mich.talebza...@gmail.com so I can send the
>> write-up for their review and comments.
>>
>> Just to be clear this is not meant to be any commercial proposition or
>> anything like that. As I seem to get involved with members troubleshooting
>> issues and threads on this topic, I thought it is worthwhile writing a note
>> about it to summarise the findings for the benefit of the community.
>>
>> Regards.
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>


Re: [Spakr1.4.1] StuctField for date column in CSV file while creating custom schema

2015-12-29 Thread Raghavendra Pandey
U can use date type...
On Dec 29, 2015 9:02 AM, "Divya Gehlot"  wrote:

> Hi,
> I am newbee to Spark ,
> My appologies for such a naive question
> I am using Spark 1.4.1 and wrtiting code in scala . I have input data as
> CSVfile  which I am parsing using spark-csv package . I am creating custom
> schema to process the CSV file .
> Now my query is which dataype or can say  Structfield should I use for
> Date column of my CSV file.
> I am using hivecontext and have requirement to create hive table after
> processing the CSV file.
> For example my date columnin CSV file  looks like
>
> 25/11/2014 20/9/2015 25/10/2015 31/10/2012 25/9/2013 25/11/2012 20/10/2013
> 25/10/2011
>
>


Re: how to flatten the dataframe

2016-03-06 Thread Raghavendra Pandey
Can you just use select and pass all the leaf level columns into it..

Raghav
On Mar 6, 2016 4:40 PM, "shubham@celebal"  wrote:

> root
>  |-- adultbasefare: long (nullable = true)
>  |-- adultcommission: long (nullable = true)
>  |-- adultservicetax: long (nullable = true)
>  |-- adultsurcharge: long (nullable = true)
>  |-- airline: string (nullable = true)
>  |-- arrdate: string (nullable = true)
>  |-- arrtime: string (nullable = true)
>  |-- cafecommission: long (nullable = true)
>  |-- carrierid: string (nullable = true)
>  |-- class: string (nullable = true)
>  |-- depdate: string (nullable = true)
>  |-- deptime: string (nullable = true)
>  |-- destination: string (nullable = true)
>  |-- discount: long (nullable = true)
>  |-- duration: string (nullable = true)
>  |-- fare: struct (nullable = true)
>  ||-- A: long (nullable = true)
>  ||-- C: long (nullable = true)
>  ||-- I: long (nullable = true)
>  ||-- adultairlinetxncharge: long (nullable = true)
>  ||-- adultairporttax: long (nullable = true)
>  ||-- adultbasefare: long (nullable = true)
>  ||-- adultcommission: double (nullable = true)
>  ||-- adultsurcharge: long (nullable = true)
>  ||-- adulttotalfare: long (nullable = true)
>  ||-- childairlinetxncharge: long (nullable = true)
>  ||-- childairporttax: long (nullable = true)
>  ||-- childbasefare: long (nullable = true)
>  ||-- childcommission: double (nullable = true)
>  ||-- childsurcharge: long (nullable = true)
>  ||-- childtotalfare: long (nullable = true)
>  ||-- discount: long (nullable = true)
>  ||-- infantairlinetxncharge: long (nullable = true)
>  ||-- infantairporttax: long (nullable = true)
>  ||-- infantbasefare: long (nullable = true)
>  ||-- infantcommission: long (nullable = true)
>  ||-- infantsurcharge: long (nullable = true)
>  ||-- infanttotalfare: long (nullable = true)
>  ||-- servicetax: long (nullable = true)
>  ||-- totalbasefare: long (nullable = true)
>  ||-- totalcommission: double (nullable = true)
>  ||-- totalfare: long (nullable = true)
>  ||-- totalsurcharge: long (nullable = true)
>  ||-- transactionfee: long (nullable = true)
>  |-- farebasis: string (nullable = true)
>  |-- farerule: string (nullable = true)
>  |-- flightcode: string (nullable = true)
>  |-- flightno: string (nullable = true)
>  |-- k: string (nullable = true)
>  |-- onwardflights: array (nullable = true)
>  ||-- element: string (containsNull = true)
>  |-- origin: string (nullable = true)
>  |-- promocode: string (nullable = true)
>  |-- promodiscount: long (nullable = true)
>  |-- promotionText: string (nullable = true)
>  |-- stops: string (nullable = true)
>  |-- tickettype: string (nullable = true)
>  |-- totalbasefare: long (nullable = true)
>  |-- totalcommission: long (nullable = true)
>  |-- totalfare: long (nullable = true)
>  |-- totalpriceamount: long (nullable = true)
>  |-- totalsurcharge: long (nullable = true)
>  |-- transactionfee: long (nullable = true)
>  |-- viacharges: long (nullable = true)
>  |-- warnings: string (nullable = true)
>
>
>
> Now i want to flatten it so that the fare field will be removed and
> everything will be flatten
>
> For this i used explode. But i am getting an error:
>
> org.apache.spark.sql.AnalysisException: cannot resolve 'explode(fare)' due
> to data type mismatch: input to function explode should be array or map
> type, not StructType(StructField(A,LongType,true),
> StructField(C,LongType,true), StructField(I,LongType,true),
> StructField(adultairlinetxncharge,LongType,true),
> StructField(adultairporttax,LongType,true),
> StructField(adultbasefare,LongType,true),
> StructField(adultcommission,DoubleType,true),
> StructField(adultsurcharge,LongType,true),
> StructField(adulttotalfare,LongType,true),
> StructField(childairlinetxncharge,LongType,true),
> StructField(childairporttax,LongType,true),
> StructField(childbasefare,LongType,true),
> StructField(childcommission,DoubleType,true),
> StructField(childsurcharge,LongType,true),
> StructField(childtotalfare,LongType,true),
> StructField(discount,LongType,true),
> StructField(infantairlinetxncharge,LongType,true),
> StructField(infantairporttax,LongType,true),
> StructField(infantbasefare,LongType,true),
> StructField(infantcommission,LongType,true),
> StructField(infantsurcharge,LongType,true),
> StructField(infanttotalfare,LongType,true),
> StructField(servicetax,LongType,true),
> StructField(totalbasefare,LongType,true),
> StructField(totalcommission,DoubleType,true),
> StructField(totalfare,LongType,true),
> StructField(totalsurcharge,LongType,true),
> StructField(transactionfee,LongType,true));
>
> If not explode how can i flatten it.Your help will be appreciated. Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-flatten-the-dataframe-tp26411.html
> Sent from the Apache Spark User List mailing list archive 

Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Raghavendra Pandey
If you cache rdd it will save some operations. But anyway filter is a lazy
operation. And it runs based on what you will do later on with rdd1 and
rdd2...

Raghavendra
On Jul 16, 2015 1:33 PM, "Bin Wang"  wrote:

> If I write code like this:
>
> val rdd = input.map(_.value)
> val f1 = rdd.filter(_ == 1)
> val f2 = rdd.filter(_ == 2)
> ...
>
> Then the DAG of the execution may be this:
>
>  -> Filter -> ...
> Map
>  -> Filter -> ...
>
> But the two filters is operated on the same RDD, which means it could be
> done by just scan the RDD once. Does spark have this kind optimization for
> now?
>


Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Raghavendra Pandey
Depending on what you do with them, they will get computed separately.
Bcoz u may have long dag in each branch. So spark tries to run all the
transformation function together rather than trying to optimize things
across branches.
On Jul 16, 2015 1:40 PM, "Bin Wang"  wrote:

> What if I would use both rdd1 and rdd2 later?
>
> Raghavendra Pandey 于2015年7月16日周四 下午4:08写道:
>
>> If you cache rdd it will save some operations. But anyway filter is a
>> lazy operation. And it runs based on what you will do later on with rdd1
>> and rdd2...
>>
>> Raghavendra
>> On Jul 16, 2015 1:33 PM, "Bin Wang"  wrote:
>>
>>> If I write code like this:
>>>
>>> val rdd = input.map(_.value)
>>> val f1 = rdd.filter(_ == 1)
>>> val f2 = rdd.filter(_ == 2)
>>> ...
>>>
>>> Then the DAG of the execution may be this:
>>>
>>>  -> Filter -> ...
>>> Map
>>>  -> Filter -> ...
>>>
>>> But the two filters is operated on the same RDD, which means it could be
>>> done by just scan the RDD once. Does spark have this kind optimization for
>>> now?
>>>
>>


Re: BroadCast on Interval ( eg every 10 min )

2015-07-17 Thread Raghavendra Pandey
Broadcasted variables are immutable.  Anyway, how are you getting that data
which you want to broadcast at regular intervals.
On Jul 16, 2015 9:33 PM, "Ashish Soni"  wrote:

> Hi All ,
> How can i broadcast a data change to all the executor ever other 10 min or
> 1 min
>
> Ashish
>


Create StructType column in data frame

2015-07-27 Thread Raghavendra Pandey
Hello,

I would like to add a column of StructType to DataFrame.
What would be the best way to do it? Not sure if it is possible using
withColumn. A possible way is to convert the dataframe into a RDD[Row], add
the struct and then convert it back to dataframe. But that seems an
overkill.

Please note that I don't know the StructType beforehand and I am creating
it based on some configuration so using case classes is out of picture.

Thanks.


Re: DataFrame column structure change

2015-08-08 Thread Raghavendra Pandey
You can use struct function of org.apache.spark.sql.function class to
combine two columns to create struct column.
Sth like.
val nestedCol = struct(df("d"), df("e"))
df.select(df(a), df(b), df(c), nestedCol)
On Aug 7, 2015 3:14 PM, "Rishabh Bhardwaj"  wrote:

> I am doing it by creating a new data frame out of the fields to be nested
> and then join with the original DF.
> Looking for some optimized solution here.
>
> On Fri, Aug 7, 2015 at 2:06 PM, Rishabh Bhardwaj 
> wrote:
>
>> Hi all,
>>
>> I want to have some nesting structure from the existing columns of
>> the dataframe.
>> For that,,I am trying to transform a DF in the following way,but couldn't
>> do it.
>>
>> scala> df.printSchema
>> root
>>  |-- a: string (nullable = true)
>>  |-- b: string (nullable = true)
>>  |-- c: string (nullable = true)
>>  |-- d: string (nullable = true)
>>  |-- e: string (nullable = true)
>>  |-- f: string (nullable = true)
>>
>> *To*
>>
>> scala> newDF.printSchema
>> root
>>  |-- a: string (nullable = true)
>>  |-- b: string (nullable = true)
>>  |-- c: string (nullable = true)
>>  |-- newCol: struct (nullable = true)
>>  ||-- d: string (nullable = true)
>>  ||-- e: string (nullable = true)
>>
>>
>> help me.
>>
>> Regards,
>> Rishabh.
>>
>
>


Spark sql jobs n their partition

2015-08-08 Thread Raghavendra Pandey
I have a complex transformation requirements that i m implementing using
dataframe.  It involves lot of joins also with Cassandra table.
I was wondering how can I debug the jobs n stages queued by spark sql the
way I can do for Rdds.

In one of cases, spark sql creates more than 17 lakhs tasks for 2gb data..
I have set sql partition@32.

Raghav


Re: How to specify column type when saving DataFrame as parquet file?

2015-08-14 Thread Raghavendra Pandey
I think you can try dataFrame create api that takes RDD[Row] and Struct
type...
On Aug 11, 2015 4:28 PM, "Jyun-Fan Tsai"  wrote:

> Hi all,
> I'm using Spark 1.4.1.  I create a DataFrame from json file.  There is
> a column C that all values are null in the json file.  I found that
> the datatype of column C in the created DataFrame is string.  However,
> I would like to specify the column as Long when saving it as parquet
> file.  What should I do to specify the column type when saving parquet
> file?
>
> Thank you,
> Jyun-Fan Tsai
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Left outer joining big data set with small lookups

2015-08-14 Thread Raghavendra Pandey
In spark 1.4 there is a parameter to control that. Its default value is 10
M. So you need to cache your dataframe to hint the size.
On Aug 14, 2015 7:09 PM, "VIJAYAKUMAR JAWAHARLAL" 
wrote:

> Hi
>
> I am facing huge performance problem when I am trying to left outer join
> very big data set (~140GB) with bunch of small lookups [Start schema type].
> I am using data frame  in spark sql. It looks like data is shuffled and
> skewed when that join happens. Is there any way to improve performance of
> such type of join in spark?
>
> How can I hint optimizer to go with replicated join etc., to avoid
> shuffle? Would it help to create broadcast variables on small lookups?  If
> I create broadcast variables, how can I convert them into data frame and
> use them in sparksql type of join?
>
> Thanks
> Vijay
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-21 Thread Raghavendra Pandey
Did you try with hadoop version 2.7.1 .. It is known that s3a works really
well with parquet which is available in 2.7. They fixed lot of issues
related to metadata reading there...
On Aug 21, 2015 11:24 PM, "Jerrick Hoang"  wrote:

> @Cheng, Hao : Physical plans show that it got stuck on scanning S3!
>
> (table is partitioned by date_prefix and hour)
> explain select count(*) from test_table where date_prefix='20150819' and
> hour='00';
>
> TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[],
> value=[(count(1),mode=Partial,isDistinct=false)]
>Scan ParquetRelation[ ..  ]
>
> Why does spark have to scan all partitions when the query only concerns
> with 1 partitions? Doesn't it defeat the purpose of partitioning?
>
> Thanks!
>
> On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver 
> wrote:
>
>> I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
>> and I couldn't find much information about it online. What does it mean
>> exactly to disable it? Are there any negative consequences to disabling it?
>>
>> On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao  wrote:
>>
>>> Can you make some more profiling? I am wondering if the driver is busy
>>> with scanning the HDFS / S3.
>>>
>>> Like jstack 
>>>
>>>
>>>
>>> And also, it’s will be great if you can paste the physical plan for the
>>> simple query.
>>>
>>>
>>>
>>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>>> *Sent:* Thursday, August 20, 2015 1:46 PM
>>> *To:* Cheng, Hao
>>> *Cc:* Philip Weaver; user
>>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
>>> partitions
>>>
>>>
>>>
>>> I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
>>> CLs trying to speed up spark sql with tables with a huge number of
>>> partitions, I've made sure that those CLs are included but it's still very
>>> slow
>>>
>>>
>>>
>>> On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
>>> wrote:
>>>
>>> Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
>>> false.
>>>
>>>
>>>
>>> BTW, which version are you using?
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>>> *Sent:* Thursday, August 20, 2015 12:16 PM
>>> *To:* Philip Weaver
>>> *Cc:* user
>>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
>>> partitions
>>>
>>>
>>>
>>> I guess the question is why does spark have to do partition discovery
>>> with all partitions when the query only needs to look at one partition? Is
>>> there a conf flag to turn this off?
>>>
>>>
>>>
>>> On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
>>> wrote:
>>>
>>> I've had the same problem. It turns out that Spark (specifically
>>> parquet) is very slow at partition discovery. It got better in 1.5 (not yet
>>> released), but was still unacceptably slow. Sadly, we ended up reading
>>> parquet files manually in Python (via C++) and had to abandon Spark SQL
>>> because of this problem.
>>>
>>>
>>>
>>> On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
>>> wrote:
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I did a simple experiment with Spark SQL. I created a partitioned
>>> parquet table with only one partition (date=20140701). A simple `select
>>> count(*) from table where date=20140701` would run very fast (0.1 seconds).
>>> However, as I added more partitions the query takes longer and longer. When
>>> I added about 10,000 partitions, the query took way too long. I feel like
>>> querying for a single partition should not be affected by having more
>>> partitions. Is this a known behaviour? What does spark try to do here?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jerrick
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Raghavendra Pandey
Impact,
You can group by the data and then sort it by timestamp and take max to
select the oldest value.
On Aug 21, 2015 11:15 PM, "Impact"  wrote:

> I am also looking for a way to achieve the reducebykey functionality on
> data
> frames. In my case I need to select one particular row (the oldest, based
> on
> a timestamp column value) by key.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to list all dataframes and RDDs available in current session?

2015-08-21 Thread Raghavendra Pandey
You get the list of all the persistet rdd using spark context...
On Aug 21, 2015 12:06 AM, "Rishitesh Mishra" 
wrote:

> I am not sure if you can view all RDDs in a session. Tables are maintained
> in a catalogue . Hence its easier. However  you can see the DAG
> representation , which lists all the RDDs in a job , with Spark UI.
> On 20 Aug 2015 22:34, "Dhaval Patel"  wrote:
>
>> Apologies
>>
>> I accidentally included Spark User DL on BCC. The actual email message is
>> below.
>> =
>>
>>
>> Hi:
>>
>> I have been working on few example using zeppelin.
>>
>> I have been trying to find a command that would list all
>> *dataframes/RDDs* that has been created in current session. Anyone knows if
>> there is any such commands available?
>>
>> Something similar to SparkSQL to list all temp tables :
>>   show tables;
>>
>> Thanks,
>> Dhaval
>>
>>
>>
>> On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel 
>> wrote:
>>
>>> Hi:
>>>
>>> I have been working on few example using zeppelin.
>>>
>>> I have been trying to find a command that would list all
>>> *dataframes/RDDs* that has been created in current session. Anyone knows if
>>> there is any such commands available?
>>>
>>> Something similar to SparkSQL to list all temp tables :
>>>   show tables;
>>>
>>> Thanks,
>>> Dhaval
>>>
>>
>>


Re: How to set environment of worker applications

2015-08-23 Thread Raghavendra Pandey
I think the only way to pass on environment variables to worker node is to
write it in spark-env.sh file on each worker node.

On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat 
wrote:

> Check for spark.driver.extraJavaOptions and
> spark.executor.extraJavaOptions in the following article. I think you can
> use -D to pass system vars:
>
> spark.apache.org/docs/latest/configuration.html#runtime-environment
> Hi,
>
> I am starting a spark streaming job in standalone mode with spark-submit.
>
> Is there a way to make the UNIX environment variables with which
> spark-submit is started available to the processes started on the worker
> nodes?
>
> Jan
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to set environment of worker applications

2015-08-24 Thread Raghavendra Pandey
System properties and environment variables are two different things.. One
can use spark.executor.extraJavaOptions to pass system properties and
spark-env.sh to pass environment variables.

-raghav

On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat 
wrote:

> That's surprising. Passing the environment variables using
> spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
> fetching them using System.getProperty("myenvvar") has worked for me.
>
> What is the error that you guys got?
>
> On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> spark-env.sh works for me in Spark 1.4 but not
>> spark.executor.extraJavaOptions.
>>
>> On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> I think the only way to pass on environment variables to worker node is
>>> to write it in spark-env.sh file on each worker node.
>>>
>>> On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat 
>>> wrote:
>>>
>>>> Check for spark.driver.extraJavaOptions and
>>>> spark.executor.extraJavaOptions in the following article. I think you can
>>>> use -D to pass system vars:
>>>>
>>>> spark.apache.org/docs/latest/configuration.html#runtime-environment
>>>> Hi,
>>>>
>>>> I am starting a spark streaming job in standalone mode with
>>>> spark-submit.
>>>>
>>>> Is there a way to make the UNIX environment variables with which
>>>> spark-submit is started available to the processes started on the worker
>>>> nodes?
>>>>
>>>> Jan
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>


Re: org.apache.spark.shuffle.FetchFailedException

2015-08-24 Thread Raghavendra Pandey
Did you try increasing sql partitions?

On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar 
wrote:

> I am running this query on a data size of 4 billion rows and
> getting org.apache.spark.shuffle.FetchFailedException error.
>
> select adid,position,userid,price
> from (
> select adid,position,userid,price,
> dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank
> FROM trainInfo) as tmp
> WHERE rank <= 2
>
>
> I have attached the error logs from spark-sql terminal.
>
> Please suggest what is the reason for these kind of errors and how can I
> resolve them.
>
>
> Regards,
> Kundan
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Array Out OF Bound Exception

2015-08-29 Thread Raghavendra Pandey
So either you empty line at the end or when you use string.split you dont
specify -1 as second parameter...
On Aug 29, 2015 1:18 PM, "Akhil Das"  wrote:

> I suspect in the last scenario you are having an empty new line at the
> last line. If you put a try..catch you'd definitely know.
>
> Thanks
> Best Regards
>
> On Tue, Aug 25, 2015 at 2:53 AM, Michael Armbrust 
> wrote:
>
>> This top line here is indicating that the exception is being throw from
>> your code (i.e. code written in the console).
>>
>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:40)
>>
>>
>> Check to make sure that you are properly handling data that has less
>> columns than you would expect.
>>
>>
>>
>> On Mon, Aug 24, 2015 at 12:41 PM, SAHA, DEBOBROTA  wrote:
>>
>>> Hi ,
>>>
>>>
>>>
>>> I am using SPARK 1.4 and I am getting an array out of bound Exception
>>> when I am trying to read from a registered table in SPARK.
>>>
>>>
>>>
>>> For example If I have 3 different text files with the content as below:
>>>
>>>
>>>
>>> *Scenario 1*:
>>>
>>> A1|B1|C1
>>>
>>> A2|B2|C2
>>>
>>>
>>>
>>> *Scenario 2*:
>>>
>>> A1| |C1
>>>
>>> A2| |C2
>>>
>>>
>>>
>>> *Scenario 3*:
>>>
>>> A1| B1|
>>>
>>> A2| B2|
>>>
>>>
>>>
>>> So for Scenario 1 and 2 it’s working fine but for Scenario 3 I am
>>> getting the following error:
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2
>>>
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:40)
>>>
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:38)
>>>
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>
>>> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>
>>> at scala.collection.TraversableOnce$class.to
>>> (TraversableOnce.scala:273)
>>>
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>
>>> at
>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>
>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>>>
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>>>
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Driver stacktrace:
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetF

Re: Alternative to Large Broadcast Variables

2015-08-29 Thread Raghavendra Pandey
We are using Cassandra for similar kind of problem and it works well... You
need to take care of race condition between updating the store and looking
up the store...
On Aug 29, 2015 1:31 AM, "Ted Yu"  wrote:

> +1 on Jason's suggestion.
>
> bq. this large variable is broadcast many times during the lifetime
>
> Please consider making this large variable more granular. Meaning, reduce
> the amount of data transferred between the key value store and your app
> during update.
>
> Cheers
>
> On Fri, Aug 28, 2015 at 12:44 PM, Jason  wrote:
>
>> You could try using an external key value store (like HBase, Redis) and
>> perform lookups/updates inside of your mappers (you'd need to create the
>> connection within a mapPartitions code block to avoid the connection
>> setup/teardown overhead)?
>>
>> I haven't done this myself though, so I'm just throwing the idea out
>> there.
>>
>> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff  wrote:
>>
>>> Hi,
>>>
>>> I am working on a Spark application that is using of a large (~3G)
>>> broadcast variable as a lookup table. The application refines the data in
>>> this lookup table in an iterative manner. So this large variable is
>>> broadcast many times during the lifetime of the application process.
>>>
>>> From what I have observed perhaps 60% of the execution time is spent
>>> waiting for the variable to broadcast in each iteration. My reading of a
>>> Spark performance article[1] suggests that the time spent broadcasting will
>>> increase with the number of nodes I add.
>>>
>>> My question for the group - what would you suggest as an alternative to
>>> broadcasting a large variable like this?
>>>
>>> One approach I have considered is segmenting my RDD and adding a copy of
>>> the lookup table for each X number of values to process. So, for example,
>>> if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
>>> split this into segments of 100K entries, with a copy of the lookup table,
>>> and make that an RDD[(Lookup, Array[Entry]).
>>>
>>> Another solution I am looking at it is making the lookup table an RDD
>>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>>> improve performance. One issue with this approach is that I would have to
>>> rewrite my application code to use two RDDs so that I do not reference the
>>> lookup RDD in the from within the closure of another RDD.
>>>
>>> Any other recommendations?
>>>
>>> Jeff
>>>
>>>
>>> [1]
>>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>>
>>> [2]https://github.com/amplab/spark-indexedrdd
>>>
>>
>


Re: Spark Version upgrade isue:Exception in thread "main" java.lang.NoSuchMethodError

2015-08-29 Thread Raghavendra Pandey
Looks like ur version n spark's Jackson package are at different versions.

Raghav
On Aug 28, 2015 4:01 PM, "Manohar753" 
wrote:

> Hi Team,
> I upgraded spark older versions to 1.4.1 after maven build i tried to ran
> my
> simple application but it failed and giving the below stacktrace.
>
> Exception in thread "main" java.lang.NoSuchMethodError:
>
> com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.addField(Lcom/fasterxml/jackson/databind/introspect/AnnotatedField;Lcom/fasterxml/jackson/databind/PropertyName;ZZZ)V
> at
> com.fasterxml.jackson.module.scala.introspect.ScalaPropertiesCollector.com
> $fasterxml$jackson$module$scala$introspect$ScalaPropertiesCollector$$_addField(ScalaPropertiesCollector.scala:109)
>
> i checked all the forex jackson versions but no luck
>
>
>
> Any help on this if some body already faced this issue.
>
> Thanks  in adcance.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Version-upgrade-isue-Exception-in-thread-main-java-lang-NoSuchMethodError-tp24488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: FlatMap Explanation

2015-09-02 Thread Raghavendra Pandey
Flatmap is just like map but it flattens out the seq output of the
closure...
In your case, you call "to" function that is to return list...

a.to(b) returns list(a,...,b)

So rdd.flatMap( x => x.to(3)) will take all element and return range upto
3..
On Sep 3, 2015 7:36 AM, "Ashish Soni"  wrote:

> Hi ,
>
> Can some one please explain the output of the flat map
> data in RDD as below
> {1, 2, 3, 3}
>
> rdd.flatMap(x => x.to(3))
>
> output as below
>
> {1, 2, 3, 2, 3, 3, 3}
> i am not able to understand how the output came as above.
>
> Thanks,
>
>


Re: Parquet partitioning for unique identifier

2015-09-02 Thread Raghavendra Pandey
Did you specify partitioning column while saving data..
On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:

> Hello experts,
>
> I have a huge json file (> 40G) and trying to use Parquet as a file
> format. Each entry has a unique identifier but other than that, it doesn't
> have 'well balanced value' column to partition it. Right now it just throws
> OOM and couldn't figure out what to do with it.
>
> It would be ideal if I could provide a partitioner based on the unique
> identifier value like computing its hash value or something.  One of the
> option would be to produce a hash value and add it as a separate column,
> but it doesn't sound right to me. Is there any other ways I can try ?
>
> Regards,
> --
> Kohki Nishio
>


Re: Unbale to run Group BY on Large File

2015-09-02 Thread Raghavendra Pandey
You can increase number of partitions n try...
On Sep 3, 2015 5:33 AM, "Silvio Fiorito" 
wrote:

> Unfortunately, groupBy is not the most efficient operation. What is it
> you’re trying to do? It may be possible with one of the other *byKey
> transformations.
>
> From: "SAHA, DEBOBROTA"
> Date: Wednesday, September 2, 2015 at 7:46 PM
> To: "'user@spark.apache.org'"
> Subject: Unbale to run Group BY on Large File
>
> Hi ,
>
>
>
> I am getting below error while I am trying to select data using SPARK SQL
> from a RDD table.
>
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> "Spark Context Cleaner" java.lang.InterruptedException
>
>
>
>
>
> The file or table size is around 113 GB and I am running SPARK 1.4 on a
> standalone cluster. Tried to extend the heap size but extending to 64GB
> also didn’t help.
>
>
>
> I would really appreciate any help on this.
>
>
>
> Thanks,
>
> Debobrota
>


Re: about mr-style merge sort

2015-09-10 Thread Raghavendra Pandey
In mr jobs, the output is sorted only within reducer.. That can be better
emulated by sorting each partition of rdd rather than total sorting the
rdd..
In Rdd.mapPartition you can sort the data in one partition and try...
On Sep 11, 2015 7:36 AM, "周千昊"  wrote:

> Hi, all
>  Can anyone give some tips about this issue?
>
> 周千昊 于2015年9月8日周二 下午4:46写道:
>
>> Hi, community
>>  I have an application which I try to migrate from MR to Spark.
>>  It will do some calculations from Hive and output to hfile which
>> will be bulk load to HBase Table, details as follow:
>>
>>  Rdd input = getSourceInputFromHive()
>>  Rdd> mapSideResult =
>> input.glom().mapPartitions(/*some calculation*/)
>>  // PS: the result in each partition has already been sorted
>> according to the lexicographical order during the calculation
>>  mapSideResult.reduceByKey(/*some
>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
>> Tuple2*/).saveAsNewAPIHadoopFile(/*write
>> to hfile*/)
>>
>>   *Here is the problem, as in MR, in the reducer side, the mapper
>> output has already been sorted, so that it is a merge sort which makes
>> writing to hfile is sequential and fast.*
>> *  However in Spark, the output of reduceByKey phase has been
>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>> slower 2x running on Spark than on MR.*
>> *  I am wondering that, if there is anything I can leverage has the
>> same effect as MR. I happen to see a JIRA
>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>> . Is it related to what I
>> am looking for?*
>>
> --
> Best Regard
> ZhouQianhao
>


Re: Spark based Kafka Producer

2015-09-10 Thread Raghavendra Pandey
What is the value of spark master conf.. By default it is local, that means
only one thread can run and that is why your job is stuck.
Specify it local[*], to make thread pool equal to number of cores...

Raghav
On Sep 11, 2015 6:06 AM, "Atul Kulkarni"  wrote:

> Hi Folks,
>
> Below is the code  have for Spark based Kafka Producer to take advantage
> of multiple executors reading files in parallel on my cluster but I am
> stuck at The program not making any progress.
>
> Below is my scrubbed code:
>
> val sparkConf = new SparkConf().setAppName(applicationName)
> val ssc = new StreamingContext(sparkConf, Seconds(2))
>
> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>
> val zipFileDStreams = ssc.textFileStream(inputFiles)
> zipFileDStreams.foreachRDD {
>   rdd =>
> rdd.foreachPartition(
>   partition => {
> partition.foreach{
>   case (logLineText) =>
> println(logLineText)
> producerObj.value.send(topics, logLineText)
> }
>   }
> )
> }
>
> ssc.start()
> ssc.awaitTermination()
>
> ssc.stop()
>
> The code for KafkaSink is as follows.
>
> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
> Array[Byte]]) extends Serializable {
>
>   lazy val producer = createProducer()
>   val logParser = new LogParser()
>
>   def send(topic: String, value: String): Unit = {
>
> val logLineBytes = 
> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
> logLineBytes))
>   }
> }
>
> object KafkaSink {
>   def apply(config: Properties): KafkaSink = {
>
> val f = () => {
>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
> null, null)
>
>   sys.addShutdownHook {
> producer.close()
>   }
>   producer
> }
>
> new KafkaSink(f)
>   }
> }
>
> Disclaimer: it is based on the code inspired by
> http://allegro.tech/spark-kafka-integration.html.
>
> The job just sits there I cannot see any Job Stages being created.
> Something I want to mention - I I am trying to read gzipped files from HDFS
> - could it be that Streaming context is not able to read *.gz files?
>
>
> I am not sure what more details I can provide to help explain my problem.
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Spark based Kafka Producer

2015-09-11 Thread Raghavendra Pandey
You can pass the number of executors via command line option
--num-executors.You need more than 2 executors to make spark-streaming
working.

For more details on command line option, please go through
http://spark.apache.org/docs/latest/running-on-yarn.html.


On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni 
wrote:

> I am submitting the job with yarn-cluster mode.
>
> spark-submit --master yarn-cluster ...
>
> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> What is the value of spark master conf.. By default it is local, that
>> means only one thread can run and that is why your job is stuck.
>> Specify it local[*], to make thread pool equal to number of cores...
>>
>> Raghav
>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni"  wrote:
>>
>>> Hi Folks,
>>>
>>> Below is the code  have for Spark based Kafka Producer to take advantage
>>> of multiple executors reading files in parallel on my cluster but I am
>>> stuck at The program not making any progress.
>>>
>>> Below is my scrubbed code:
>>>
>>> val sparkConf = new SparkConf().setAppName(applicationName)
>>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>
>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>>
>>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>>> zipFileDStreams.foreachRDD {
>>>   rdd =>
>>> rdd.foreachPartition(
>>>   partition => {
>>> partition.foreach{
>>>   case (logLineText) =>
>>> println(logLineText)
>>> producerObj.value.send(topics, logLineText)
>>> }
>>>   }
>>> )
>>> }
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> ssc.stop()
>>>
>>> The code for KafkaSink is as follows.
>>>
>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>>> Array[Byte]]) extends Serializable {
>>>
>>>   lazy val producer = createProducer()
>>>   val logParser = new LogParser()
>>>
>>>   def send(topic: String, value: String): Unit = {
>>>
>>> val logLineBytes = 
>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>>> logLineBytes))
>>>   }
>>> }
>>>
>>> object KafkaSink {
>>>   def apply(config: Properties): KafkaSink = {
>>>
>>> val f = () => {
>>>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>>> null, null)
>>>
>>>   sys.addShutdownHook {
>>> producer.close()
>>>   }
>>>   producer
>>> }
>>>
>>> new KafkaSink(f)
>>>   }
>>> }
>>>
>>> Disclaimer: it is based on the code inspired by
>>> http://allegro.tech/spark-kafka-integration.html.
>>>
>>> The job just sits there I cannot see any Job Stages being created.
>>> Something I want to mention - I I am trying to read gzipped files from HDFS
>>> - could it be that Streaming context is not able to read *.gz files?
>>>
>>>
>>> I am not sure what more details I can provide to help explain my problem.
>>>
>>>
>>> --
>>> Regards,
>>> Atul Kulkarni
>>>
>>
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Catalog, SessionCatalog and ExternalCatalog in spark 2.0

2016-09-03 Thread Raghavendra Pandey
Kapil -- I afraid you need to plugin your own SessionCatalog as
ResolveRelations class depends on that. To keep up with consistent design
you may like to implement ExternalCatalog as well.
You can also look to plug in your own Analyzer class to give your more
flexibility. Ultimately that is where all Relations get resolved from
SessionCatalog.

On Sat, Sep 3, 2016 at 5:49 PM, Kapil Malik 
wrote:

> Hi all,
>
> I have a Spark SQL 1.6 application in production which does following on
> executing sqlContext.sql(...) -
> 1. Identify the table-name mentioned in query
> 2. Use an external database to decide where's the data located, in which
> format (parquet or csv or jdbc) etc.
> 3. Load the dataframe
> 4. Register it as temp table (for future calls to this table)
>
> This is achieved by extending HiveContext, and correspondingly
> HiveCatalog. I have my own implementation of trait "Catalog", which
> over-rides the "lookupRelation" method to do the magic behind the scenes.
>
> However, in spark 2.0, I can see following -
> SessionCatalog - which contains lookupRelation method, but doesn't have
> any interface / abstract class to it.
> ExternalCatalog - which deals with CatalogTable instead of Df /
> LogicalPlan.
> Catalog - which also doesn't expose any method to lookup Df / LogicalPlan.
>
> So apparently it looks like I need to extend SessionCatalog only.
> However, just wanted to get a feedback on if there's a better /
> recommended approach to achieve this.
>
>
> Thanks and regards,
>
>
> Kapil Malik
> *Sr. Principal Engineer | Data Platform, Technology*
> M: +91 8800836581 | T: 0124-433 | EXT: 20910
> ASF Centre A | 1st Floor | Udyog Vihar Phase IV |
> Gurgaon | Haryana | India
>
> *Disclaimer:* This communication is for the sole use of the addressee and
> is confidential and privileged information. If you are not the intended
> recipient of this communication, you are prohibited from disclosing it and
> are required to delete it forthwith. Please note that the contents of this
> communication do not necessarily represent the views of Jasper Infotech
> Private Limited ("Company"). E-mail transmission cannot be guaranteed to be
> secure or error-free as information could be intercepted, corrupted, lost,
> destroyed, arrive late or incomplete, or contain viruses. The Company,
> therefore, does not accept liability for any loss caused due to this
> communication. *Jasper Infotech Private Limited, Registered Office: 1st
> Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN:
> U72300DL2007PTC168097*
>
>


Re: Importing large file with SparkContext.textFile

2016-09-03 Thread Raghavendra Pandey
If your file format is splittable say TSV, CSV etc, it will be distributed
across all executors.

On Sat, Sep 3, 2016 at 3:38 PM, Somasundaram Sekar <
somasundar.se...@tigeranalytics.com> wrote:

> Hi All,
>
>
>
> Would like to gain some understanding on the questions listed below,
>
>
>
> 1.   When processing a large file with Apache Spark, with, say,
> sc.textFile("somefile.xml"), does it split it for parallel processing
> across executors or, will it be processed as a single chunk in a single
> executor?
>
> 2.   When using dataframes, with implicit XMLContext from Databricks
> is there any optimization prebuilt for such large file processing?
>
>
>
> Please help!!!
>
>
>
> http://stackoverflow.com/questions/39305310/does-spark-
> process-large-file-in-the-single-worker
>
>
>
> Regards,
>
> Somasundaram S
>


Re: how to pass trustStore path into pyspark ?

2016-09-03 Thread Raghavendra Pandey
Did you try passing them in spark-env.sh?

On Sat, Sep 3, 2016 at 2:42 AM, Eric Ho  wrote:

> I'm trying to pass a trustStore pathname into pyspark.
> What env variable and/or config file or script I need to change to do this
> ?
> I've tried setting JAVA_OPTS env var but to no avail...
> any pointer much appreciated...  thx
>
> --
>
> -eric ho
>
>


Re: Passing Custom App Id for consumption in History Server

2016-09-03 Thread Raghavendra Pandey
Default implementation is to add milliseconds. For mesos it is
framework-id. If you are using mesos, you can assume that your framework id
used to register your app is same as app-id.
As you said, you have a system application to schedule spark jobs, you can
keep track of framework-ids submitted by your application, you can use the
same info.

On Fri, Sep 2, 2016 at 6:29 PM, Amit Shanker 
wrote:

> Currently Spark sets current time in Milliseconds as the app Id. Is there
> a way one can pass in the app id to the spark job, so that it uses this
> provided app id instead of generating one using time?
>
> Lets take the following scenario : I have a system application which
> schedules spark jobs, and records the metadata for that job (say job
> params, cores, etc). In this system application, I want to link every job
> with its corresponding UI (history server). The only way I can do this is
> if I have the app Id of that job stored in this system application. And the
> only way one can get the app Id is by using the
> SparkContext.getApplicationId() function - which needs to be run from
> inside the job. So, this make it difficult to convey this piece of
> information from spark to a system outside spark.
>
> Thanks,
> Amit Shanker
>


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Raghavendra Pandey
How large is your first text file? The idea is you read first text file and
if it is not large you can collect all the lines on driver and then again
read text files for each line and union all rdds.

On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:

> Just wonder if this is possible with Spark?
>
> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I've got a text file where each line is a record. For each record, I need
>> to process a file in HDFS.
>>
>> So if I represent these records as an RDD and invoke a map() operation on
>> them how can I access the HDFS within that map()? Do I have to create a
>> Spark context within map() or is there a better solution to that?
>>
>> Thank you,
>> Saliya
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


Re: How to compile Spark with customized Hadoop?

2015-10-10 Thread Raghavendra Pandey
There is spark without hadoop version.. You can use that to link with any
custom hadoop version.

Raghav
On Oct 10, 2015 5:34 PM, "Steve Loughran"  wrote:

>
> During development, I'd recommend giving Hadoop a version ending with
> -SNAPSHOT, and building spark with maven, as mvn knows to refresh the
> snapshot every day.
>
> you can do this in hadoop with
>
> mvn versions:set 2.7.0.stevel-SNAPSHOT
>
> if you are working on hadoop branch-2 or trunk direct, they come with
> -SNAPSHOT anyway, but unless you build hadoop every morning, you may find
> maven pulls in the latest nightly builds from the apache snapshot
> repository, which will cause chaos and confusion. This is also why you must
> never have maven build which spans midnight in your time zone.
>
>
> On 9 Oct 2015, at 22:31, Matei Zaharia  wrote:
>
> You can publish your version of Hadoop to your Maven cache with mvn
> publish (just give it a different version number, e.g. 2.7.0a) and then
> pass that as the Hadoop version to Spark's build (see
> http://spark.apache.org/docs/latest/building-spark.html).
>
> Matei
>
> On Oct 9, 2015, at 3:10 PM, Dogtail L  wrote:
>
> Hi all,
>
> I have modified Hadoop source code, and I want to compile Spark with my
> modified Hadoop. Do you know how to do that? Great thanks!
>
>
>
>


Re: Spark Master Dying saying TimeoutException

2015-10-14 Thread Raghavendra Pandey
I fixed these timeout errors by retrying...
On Oct 15, 2015 3:41 AM, "Kartik Mathur"  wrote:

> Hi,
>
> I have some nightly jobs which runs every night but dies sometimes because
> of unresponsive master , spark master logs says -
>
> Not seeing much else there , what could possible cause an exception like
> this.
>
> *Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]*
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> 2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed
> out] [
>
> akka.remote.RemoteTransportException: Startup timed out
>
> at
> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
>
> at akka.remote.Remoting.start(Remoting.scala:198)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
>
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
>
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
>
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
>
> at
> org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)
>
> at org.apache.spark.deploy.master.Master$.main(Master.scala:869)
>
> at org.apache.spark.deploy.master.Master.main(Master.scala)
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [1 milliseconds]
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> ... 17 more
>
>
>


Re: Problem installing Sparck on Windows 8

2015-10-14 Thread Raghavendra Pandey
Looks like you are facing ipv6 issue. Can you try using preferIPv4 property
on.
On Oct 15, 2015 2:10 AM, "Steve Loughran"  wrote:

>
> On 14 Oct 2015, at 20:56, Marco Mistroni  wrote:
>
>
> 15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
> loopback/non-r
> eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
> find any
>  external IP address!
> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
> dir: /t
> mp/hive on HDFS should be writable. Current permissions are: -
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
> a:522)
> at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
> cala:171)
> at
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
>
>
> now, that I haven't seen. Looks like it thinks the permissions are wrong,
> doesn't it?
>


Re: s3a file system and spark deployment mode

2015-10-15 Thread Raghavendra Pandey
You can use spark 1.5.1 with no hadoop and hadoop 2.7.1..
Hadoop 2.7.1 is more mature for s3a access. You also need to set hadoop
tools dir into hadoop classpath...

Raghav
On Oct 16, 2015 1:09 AM, "Scott Reynolds"  wrote:

> We do not use EMR. This is deployed on Amazon VMs
>
> We build Spark with Hadoop-2.6.0 but that does not include the s3a
> filesystem nor the Amazon AWS SDK
>
> On Thu, Oct 15, 2015 at 12:26 PM, Spark Newbie 
> wrote:
>
>> Are you using EMR?
>> You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster.
>> And that brings s3a jars to the worker nodes and it becomes available to
>> your application.
>>
>> On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds 
>> wrote:
>>
>>> List,
>>>
>>> Right now we build our spark jobs with the s3a hadoop client. We do this
>>> because our machines are only allowed to use IAM access to the s3 store. We
>>> can build our jars with the s3a filesystem and the aws sdk just fine and
>>> this jars run great in *client mode*.
>>>
>>> We would like to move from client mode to cluster mode as that will
>>> allow us to be more resilient to driver failure. In order to do this either:
>>> 1. the jar file has to be on worker's local disk
>>> 2. the jar file is in shared storage (s3a)
>>>
>>> We would like to put the jar file in s3 storage, but when we give the
>>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>>> aws sdk in its classpath / uber jar.
>>>
>>> Other then building spark with those two dependencies, what other
>>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>>> thing.
>>>
>>> Need to get s3a access to both the master (so that we can log spark
>>> event log to s3) and to the worker processes (driver, executor).
>>>
>>> Looking for ideas before just adding the dependencies to our spark build
>>> and calling it a day.
>>>
>>
>>
>


Re: Complex transformation on a dataframe column

2015-10-17 Thread Raghavendra Pandey
Here is a quick code sample I can come up with :

case class Input(ID:String, Name:String, PhoneNumber:String, Address:
String)
val df = sc.parallelize(Seq(Input("1", "raghav", "0123456789",
"houseNo:StreetNo:City:State:Zip"))).toDF()
val formatAddress = udf { (s: String) => s.split(":").mkString("-")}
val outputDF = df.withColumn("FormattedAddress",
formatAddress(df("Address")))


-Raghav

On Thu, Oct 15, 2015 at 10:34 PM, Hao Wang  wrote:

> Hi,
>
> I have searched around but could not find a satisfying answer to this
> question: what is the best way to do a complex transformation on a
> dataframe column?
>
> For example, I have a dataframe with the following schema and a function
> that has pretty complex logic to format addresses. I would like to use the
> function to format each address and store the output as an additional
> column in the dataframe. What is the best way to do it? Use Dataframe.map?
> Define a UDF? Some code example would be appreciated.
>
> Input dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>
> Output dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>  |-- FormattedAddress: string (nullable = true)
>
> The function for format addresses:
> def formatAddress(address: String): String
>
>
> Best regards,
> Hao Wang
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: repartition vs partitionby

2015-10-17 Thread Raghavendra Pandey
You can use coalesce function, if you want to reduce the number of
partitions. This one minimizes the data shuffle.

-Raghav

On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
wrote:

> Hi folks
>
> I need to reparation large set of data around(300G) as i see some portions
> have large data(data skew)
>
> i have pairRDDs [({},{}),({},{}),({},{})]
>
> what is the best way to solve the the problem
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: s3a file system and spark deployment mode

2015-10-17 Thread Raghavendra Pandey
You can add classpath info in hadoop env file...

Add the following line to your $HADOOP_HOME/etc/hadoop/hadoop-env.sh
export
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/*

Add the following line to $SPARK_HOME/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop --config
$HADOOP_HOME/etc/hadoop classpath)


This is how you set up hadoop 2.7.1 and spark 1.5.1 with no hadoop. This
will also put necessary jars to your classpath to access s3a.

Also, please note that you need to set fs.s3a.access.key
and fs.s3a.secret.key property into your core-site.xml, rather
than fs.s3a.awsSecretAccessKey and fs.s3a.awsAccessKeyId as mentioned in
the docs.

Good luck
-Raghav

On Fri, Oct 16, 2015 at 9:07 PM, Scott Reynolds 
wrote:

> hmm I tried using --jars and that got passed to MasterArguments and that
> doesn't work :-(
>
>
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
>
> Same with Worker:
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
>
> Both Master and Worker have to start with these two jars because
> a.) the Master has to serve the event log in s3
> b.) the Worker runs the Driver and has to download the jar from s3
>
> And yes I am using these deps:
>
> 
> 
> org.apache.hadoop
> hadoop-aws
> 2.7.1
> 
>
> 
> com.amazonaws
> aws-java-sdk
> 1.7.4
> 
>
> I think I have settled on just modifying the java command line that starts
> up the worker and master. Just seems easier. Currently launching them with
> spark-class bash script
>
> /mnt/services/spark/bin/spark-class org.apache.spark.deploy.master.Master \
> --ip `hostname -i` --port 7077 --webui-port 8080
>
> If all else fails I will update the spark pom and and include it in the
> shaded spark jar.
>
> On Fri, Oct 16, 2015 at 2:25 AM, Steve Loughran 
> wrote:
>
>>
>> > On 15 Oct 2015, at 19:04, Scott Reynolds  wrote:
>> >
>> > List,
>> >
>> > Right now we build our spark jobs with the s3a hadoop client. We do
>> this because our machines are only allowed to use IAM access to the s3
>> store. We can build our jars with the s3a filesystem and the aws sdk just
>> fine and this jars run great in *client mode*.
>> >
>> > We would like to move from client mode to cluster mode as that will
>> allow us to be more resilient to driver failure. In order to do this either:
>> > 1. the jar file has to be on worker's local disk
>> > 2. the jar file is in shared storage (s3a)
>> >
>> > We would like to put the jar file in s3 storage, but when we give the
>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>> aws sdk in its classpath / uber jar.
>> >
>> > Other then building spark with those two dependencies, what other
>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>> thing.
>> >
>> > Need to get s3a access to both the master (so that we can log spark
>> event log to s3) and to the worker processes (driver, executor).
>> >
>> > Looking for ideas before just adding the dependencies to our spark
>> build and calling it a day.
>>
>>
>> you can use --jars to add these, e.g
>>
>> -jars hadoop-aws.jar,aws-java-sdk-s3
>>
>>
>> as others have warned, you need Hadoop 2.7.1 for s3a to work proplery
>>
>
>


Re: REST api to avoid spark context creation

2015-10-18 Thread Raghavendra Pandey
You may like to look at spark job server.
https://github.com/spark-jobserver/spark-jobserver

Raghavendra


Re: Save to paquet files failed

2015-10-22 Thread Raghavendra Pandey
Can ypu increase number of partitions and try... Also, i dont think you
need to cache dfs before saving them... U can do away with that as well...

Raghav
On Oct 23, 2015 7:45 AM, "Ram VISWANADHA" 
wrote:

> Hi ,
> I am trying to load 931MB file into an RDD, then create a DataFrame and
> store the data in a Parquet file. The save method of Parquet file is
> hanging. I have set the timeout to 1800 but still the system fails to
> respond and hangs. I can’t spot any errors in my code. Can someone help me?
> Thanks in advance.
>
> Environment
>
>1. OS X 10.10.5 with 8G RAM
>2. JDK 1.8.0_60
>
> Code
>
> final SQLContext sqlContext = new SQLContext(jsc);
> //convert user viewing history to ratings (hash user_id to int)
> JavaRDD ratingJavaRDD = createMappedRatingsRDD(jsc);
> //for testing with 2d_full.txt data
> //JavaRDD ratingJavaRDD = createMappedRatingRDDFromFile(jsc);
> JavaRDD ratingRowsRDD = ratingJavaRDD.map(new GenericRowFromRating());
> ratingRowsRDD.cache();
>
> //This line saves the files correctly
>
> ratingJavaRDD.saveAsTextFile("file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd");
>
> final DataFrame ratingDF = sqlContext.createDataFrame(ratingRowsRDD,
> getStructTypeForRating());
> ratingDF.registerTempTable("rating_db");
> ratingDF.show();
> ratingDF.cache();
>
> //this line hangs
>
> ratingDF.write().format("parquet").save(
> "file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet"
> );
>
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-*
>
> -rw-r--r--  1 r.viswanadha  staff   785K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-0
>
> -rw-r--r--  1 r.viswanadha  staff   790K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-1
>
> -rw-r--r--  1 r.viswanadha  staff   786K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-2
>
> -rw-r--r--  1 r.viswanadha  staff   796K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-3
>
> -rw-r--r--  1 r.viswanadha  staff   791K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-4
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/
>
> The only thing that is saved is the temporary part file
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/task_201510221857_0007_m_00/
>
> total 336
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 .
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 ..
>
> -rw-r--r--  1 r.viswanadha  staff   1.3K Oct 22 18:57
> .part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet.crc
>
> -rw-r--r--  1 r.viswanadha  staff   163K Oct 22 18:57
> part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet
>
>
> Active Stages (1) Stage Id Description Submitted Duration Tasks:
> Succeeded/Total Input Output Shuffle Read Shuffle Write 7 (kill)
> save at
> Recommender.java:549 
> +details
> 
>
> 2015/10/22 18:57:15 17 min
> 1/5
> 9.4 MB
> Best Regards,
> Ram
>
>


Nested ifs in sparksql

2017-01-10 Thread Raghavendra Pandey
I have of around 41 level of nested if else in spark sql. I have programmed
it using apis on dataframe. But it takes too much time.
Is there anything I can do to improve on time here?


Re: Nested ifs in sparksql

2017-01-11 Thread Raghavendra Pandey
I am not using case when. It is mostly IF. By slow, I mean 6 min even for
10 records for 41 level nested ifs.

On Jan 11, 2017 3:31 PM, "Georg Heiler"  wrote:

> I was using the dataframe api not sql. The main problem was that too much
> code was generated.
> Using an unforgettable turned out to be quicker as well.
> Olivier Girardot  schrieb am Di. 10.
> Jan. 2017 um 21:54:
>
>> Are you using the "case when" functions ? what do you mean by slow ? can
>> you share a snippet ?
>>
>>
>>
>> On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
>> wrote:
>>
>> Maybe you can create an UDF?
>>
>> Raghavendra Pandey  schrieb am Di., 10.
>> Jan. 2017 um 20:04 Uhr:
>>
>> I have of around 41 level of nested if else in spark sql. I have
>> programmed it using apis on dataframe. But it takes too much time.
>> Is there anything I can do to improve on time here?
>>
>>
>>
>> *Olivier Girardot* | Associé
>> o.girar...@lateral-thoughts.com
>> +33 6 24 09 17 94
>>
>


Re: How do I dynamically add nodes to spark standalone cluster and be able to discover them?

2017-01-25 Thread Raghavendra Pandey
When you start a slave you pass address of master as a parameter. That
slave will contact master and register itself.

On Jan 25, 2017 4:12 AM, "kant kodali"  wrote:

> Hi,
>
> How do I dynamically add nodes to spark standalone cluster and be able to
> discover them? Does Zookeeper do service discovery? What is the standard
> tool for these things?
>
> Thanks,
> kant
>


Re: Spark Streaming: Async action scheduling inside foreachRDD

2017-08-04 Thread Raghavendra Pandey
Did you try SparkContext.addSparkListener?



On Aug 3, 2017 1:54 AM, "Andrii Biletskyi"
 wrote:

> Hi all,
>
> What is the correct way to schedule multiple jobs inside foreachRDD method
> and importantly await on result to ensure those jobs have completed
> successfully?
> E.g.:
>
> kafkaDStream.foreachRDD{ rdd =>
> val rdd1 = rdd.map(...)
> val rdd2 = rdd1.map(...)
>
> val job1Future = Future{
> rdd1.saveToCassandra(...)
> }
>
> val job2Future = Future{
> rdd1.foreachPartition( iter => /* save to Kafka */)
> }
>
>   Await.result(
>   Future.sequence(job1Future, job2Future),
>   Duration.Inf)
>
>
>// commit Kafka offsets
> }
>
> In this code I'm scheduling two actions in futures and awaiting them. I
> need to be sure when I commit Kafka offsets at the end of the batch
> processing that job1 and job2 have actually executed successfully. Does
> given approach provide these guarantees? I.e. in case one of the jobs fails
> the entire batch will be marked as failed too?
>
>
> Thanks,
> Andrii
>


Multiple queries on same stream

2017-08-08 Thread Raghavendra Pandey
I am using structured streaming to evaluate multiple rules on same running
stream.
I have two options to do that. One is to use forEach and evaluate all the
rules on the row..
The other option is to express rules in spark sql dsl and run multiple
queries.
I was wondering if option 1 will result in better performance even though I
can get catalyst optimization in option 2.

Thanks
Raghav


Kafka version support

2017-11-29 Thread Raghavendra Pandey
Just wondering if anyone has tried spark structured streaming kafka
connector (2.2) with Kafka 0.11 or Kafka 1.0 version

Thanks
Raghav


Re: [Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-21 Thread Raghavendra Pandey
Yes as long as there are 3 cores available on your local machine.

On Fri, Apr 20, 2018 at 10:56 AM karthikjay  wrote:

> I have the following code to read data from Kafka topic using the
> structured
> streaming. The topic has 3 partitions:
>
>  val spark = SparkSession
>   .builder
>   .appName("TestPartition")
>   .master("local[*]")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dataFrame = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
>   .option("subscribe", "partition_test")
>   .option("failOnDataLoss", "false")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>
> My understanding is that Spark will launch 3 Kafka consumers (for 3
> partitions) and these 3 consumers will be running on the worker nodes. Is
> my
> understanding right ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: StorageLevel.MEMORY_AND_DISK_SER

2015-07-01 Thread Raghavendra Pandey
I think persist api is internal to rdd whereas write api is for saving
content on dist.
Rdd persist will dump your obj bytes serialized on the disk.. If you wanna
change that behavior you need to override the class serialization that your
are storing in rdd..
 On Jul 1, 2015 8:50 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

> This is my write API. how do i integrate it here.
>
>
>  protected def writeOutputRecords(detailRecords:
> RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
> val writeJob = new Job()
> val schema = SchemaUtil.outputSchema(_detail)
> AvroJob.setOutputKeySchema(writeJob, schema)
> val outputRecords = detailRecords.coalesce(100)
> outputRecords.saveAsNewAPIHadoopFile(outputDir,
>   classOf[AvroKey[GenericRecord]],
>   classOf[org.apache.hadoop.io.NullWritable],
>   classOf[AvroKeyOutputFormat[GenericRecord]],
>   writeJob.getConfiguration)
>   }
>
> On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers  wrote:
>
>> rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
>>
>> On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>> wrote:
>>
>>> How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ?
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


Re: Can a Spark Driver Program be a REST Service by itself?

2015-07-01 Thread Raghavendra Pandey
I am using spark driver as a rest service. I used spray.io to make my app
rest server.

I think this is a good design for applications that you want to keep in
long running mode..
On Jul 1, 2015 6:28 PM, "Arush Kharbanda" 
wrote:

> You can try using Spark Jobserver
>
> https://github.com/spark-jobserver/spark-jobserver
>
> On Wed, Jul 1, 2015 at 4:32 PM, Spark Enthusiast  > wrote:
>
>> Folks,
>>
>> My Use case is as follows:
>>
>> My Driver program will be aggregating a bunch of Event Streams and acting
>> on it. The Action on the aggregated events is configurable and can change
>> dynamically.
>>
>> One way I can think of is to run the Spark Driver as a Service where a
>> config push can be caught via an API that the Driver exports.
>> Can I have a Spark Driver Program run as a REST Service by itself? Is
>> this a common use case?
>> Is there a better way to solve my problem?
>>
>> Thanks
>>
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Issue with parquet write after join (Spark 1.4.0)

2015-07-01 Thread Raghavendra Pandey
By any chance, are you using time field in your df. Time fields are known
to be notorious in rdd conversion.
On Jul 1, 2015 6:13 PM, "Pooja Jain"  wrote:

> Join is happening successfully as I am able to do count() after the join.
>
> Error is coming only while trying to write in parquet format on hdfs.
>
> Thanks,
> Pooja.
>
> On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das 
> wrote:
>
>> It says:
>>
>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>
>> Could you look in the executor logs (stderr on slave2) and see what made
>> it shut down? Since you are doing a join there's a high possibility of OOM
>> etc.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain 
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
>>> spark-submit. We are facing parquet write issue after doing dataframe joins
>>>
>>> We have a full data set and then an incremental data. We are reading
>>> them as dataframes, joining them, and then writing the data to the hdfs
>>> system in parquet format. We are getting the timeout error on the last
>>> partition.
>>>
>>> But if we do a count on the joined data it is working - which gives us
>>> the confidence that join is happening properly. Only in case of writing to
>>> the hdfs it is timing out.
>>>
>>> Code flow:
>>>
>>> // join two data frames - dfBase and dfIncr on primaryKey
>>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === 
>>> dfIncr(primaryKey), "outer")
>>>
>>> // applying a reduce function on each row.
>>> val mergedDF = joinedDF.map(x =>
>>>   reduceFunc(x)
>>> )
>>>
>>> //converting back to dataframe
>>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>>>
>>> //writing to parquet file
>>> newdf.write.parquet(hdfsfilepath)
>>>
>>> Getting following exception:
>>>
>>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with 
>>> no recent heartbeats: 255766 ms exceeds timeout 24 ms
>>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
>>> slave2: Executor heartbeat timed out after 255766 ms
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
>>> from TaskSet 7.0
>>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 
>>> (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 
>>> 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
>>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
>>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to 
>>> kill executor(s) 26
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
>>> executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block 
>>> manager BlockManagerId(26, slave2, 54845)
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully 
>>> in removeExecutor
>>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number 
>>> of 26 executor(s).
>>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now 
>>> unavailable on executor 26 (193/200, false)
>>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested 
>>> to kill executor(s) 26.
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
>>> or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
>>> slave2: remote Rpc client disassociated
>>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
>>> from TaskSet 7.0
>>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
>>> executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully 
>>> in removeExecutor
>>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with 
>>> remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address 
>>> is now gated for [5000] ms. Reason is: [Disassociated].
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
>>> or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 
>>> (TID 310, slave2): org.apache.spark.SparkException: Task failed while 
>>> writing rows.
>>> at 
>>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>>> at 
>>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> at 
>>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> at org.apache.spark.scheduler.ResultTask.runT

Re: StorageLevel.MEMORY_AND_DISK_SER

2015-07-01 Thread Raghavendra Pandey
So do you want to change the behavior of persist api or write the rdd on
disk...
On Jul 1, 2015 9:13 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

> I think i want to use persist then and write my intermediate RDDs to
> disk+mem.
>
> On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> I think persist api is internal to rdd whereas write api is for saving
>> content on dist.
>> Rdd persist will dump your obj bytes serialized on the disk.. If you
>> wanna change that behavior you need to override the class serialization
>> that your are storing in rdd..
>>  On Jul 1, 2015 8:50 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>>
>>> This is my write API. how do i integrate it here.
>>>
>>>
>>>  protected def writeOutputRecords(detailRecords:
>>> RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
>>> val writeJob = new Job()
>>> val schema = SchemaUtil.outputSchema(_detail)
>>> AvroJob.setOutputKeySchema(writeJob, schema)
>>> val outputRecords = detailRecords.coalesce(100)
>>> outputRecords.saveAsNewAPIHadoopFile(outputDir,
>>>   classOf[AvroKey[GenericRecord]],
>>>   classOf[org.apache.hadoop.io.NullWritable],
>>>   classOf[AvroKeyOutputFormat[GenericRecord]],
>>>   writeJob.getConfiguration)
>>>   }
>>>
>>> On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers  wrote:
>>>
>>>> rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>> On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>>>> wrote:
>>>>
>>>>> How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ?
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>
>
> --
> Deepak
>
>


Re: StorageLevel.MEMORY_AND_DISK_SER

2015-07-01 Thread Raghavendra Pandey
For that you need to change the serialize and deserialize behavior of your
class.
Preferably, you can use Kyro serializers n override the behavior.
For details u can look
https://github.com/EsotericSoftware/kryo/blob/master/README.md
On Jul 1, 2015 9:26 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

i original assumed that persisting is similar to writing. But its not.
Hence i want to change the behavior of intermediate persists.

On Wed, Jul 1, 2015 at 8:46 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> So do you want to change the behavior of persist api or write the rdd on
> disk...
> On Jul 1, 2015 9:13 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>
>> I think i want to use persist then and write my intermediate RDDs to
>> disk+mem.
>>
>> On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> I think persist api is internal to rdd whereas write api is for saving
>>> content on dist.
>>> Rdd persist will dump your obj bytes serialized on the disk.. If you
>>> wanna change that behavior you need to override the class serialization
>>> that your are storing in rdd..
>>>  On Jul 1, 2015 8:50 PM, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>>>
>>>> This is my write API. how do i integrate it here.
>>>>
>>>>
>>>>  protected def writeOutputRecords(detailRecords:
>>>> RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
>>>> val writeJob = new Job()
>>>> val schema = SchemaUtil.outputSchema(_detail)
>>>> AvroJob.setOutputKeySchema(writeJob, schema)
>>>> val outputRecords = detailRecords.coalesce(100)
>>>> outputRecords.saveAsNewAPIHadoopFile(outputDir,
>>>>   classOf[AvroKey[GenericRecord]],
>>>>   classOf[org.apache.hadoop.io.NullWritable],
>>>>   classOf[AvroKeyOutputFormat[GenericRecord]],
>>>>   writeJob.getConfiguration)
>>>>   }
>>>>
>>>> On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers 
>>>> wrote:
>>>>
>>>>> rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
>>>>>
>>>>> On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>>>>> wrote:
>>>>>
>>>>>> How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ?
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>
>>
>> --
>> Deepak
>>
>>


-- 
Deepak


Re: DataFrame Filter Inside Another Data Frame Map

2015-07-01 Thread Raghavendra Pandey
You cannot refer to one rdd inside another rdd.map function...
Rdd object is not serialiable. Whatever objects you use inside map
function  should be serializable as they get transferred to executor nodes.
On Jul 2, 2015 6:13 AM, "Ashish Soni"  wrote:

> Hi All  ,
>
> I am not sure what is the wrong with below code as it give below error
> when i access inside the map but it works outside
>
> JavaRDD rdd2 = rdd.map(new Function() {
>
> @Override
> public Charge call(Charge ch) throws Exception {
>
>
>* DataFrame df = accountRdd.filter("login=test");*
>
> return ch;
> }
>
> });
>
> 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.NullPointerException
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:129)
> at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
>


Re: BroadCast Multiple DataFrame ( JDBC Tables )

2015-07-01 Thread Raghavendra Pandey
I am not sure if you can broadcast data frame without collecting it on
driver...
On Jul 1, 2015 11:45 PM, "Ashish Soni"  wrote:

> Hi ,
>
> I need to load 10 tables in memory and have them available to all the
> workers , Please let me me know what is the best way to do broadcast them
>
> sc.broadcast(df)  allow only one
>
> Thanks,
>
>
>


Re: Spark SQL and Streaming - How to execute JDBC Query only once

2015-07-02 Thread Raghavendra Pandey
This will not work i.e. using data frame inside map function..
Although you can try to create df separately n cache it...
Then you can join your event stream with this df.
On Jul 2, 2015 6:11 PM, "Ashish Soni"  wrote:

> Hi All  ,
>
> I have and Stream of Event coming in and i want to fetch some additional
> data from the database based on the values in the incoming data , For Eg
> below is the data coming in
>
> loginName
> Email
> address
> city
>
> Now for each login name i need to go to oracle database and get the userId
> from the database *but i do not want to hit the database again and again
> instead i want to load the complete table in memory and then find the user
> id based on the incoming data*
>
> JavaRDD rdd =
> sc.textFile("/home/spark/workspace/data.csv").map(new Function String>() {
> @Override
> public Charge call(String s) {
> String str[] = s.split(",");
> *//How to load the complete table in memory and use it as
> when i do outside the loop i get stage failure error *
> *   DataFrame dbRdd =
> sqlContext.read().format("jdbc").options(options).load();*
>
> System.out.println(dbRdd.filter("ogin_nm='"+str[0]+"'").count());
>
>   return str[0];
> }
> });
>
>
> How i can achieve this , Please suggest
>
> Thanks
>


Re: DataFrame Filter Inside Another Data Frame Map

2015-07-02 Thread Raghavendra Pandey
You can collect the dataframe as array n then create map out of it...,
On Jul 2, 2015 9:23 AM,  wrote:

> Any example how can i return a Hashmap from data frame ?
>
> Thanks ,
> Ashish
>
> On Jul 1, 2015, at 11:34 PM, Holden Karau  wrote:
>
> Collecting it as a regular (Java/scala/Python) map. You can also broadcast
> the map if your going to use it multiple times.
>
> On Wednesday, July 1, 2015, Ashish Soni  wrote:
>
>> Thanks , So if i load some static data from database and then i need to
>> use than in my map function to filter records what will be the best way to
>> do it,
>>
>> Ashish
>>
>> On Wed, Jul 1, 2015 at 10:45 PM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> You cannot refer to one rdd inside another rdd.map function...
>>> Rdd object is not serialiable. Whatever objects you use inside map
>>> function  should be serializable as they get transferred to executor nodes.
>>> On Jul 2, 2015 6:13 AM, "Ashish Soni"  wrote:
>>>
>>>> Hi All  ,
>>>>
>>>> I am not sure what is the wrong with below code as it give below error
>>>> when i access inside the map but it works outside
>>>>
>>>> JavaRDD rdd2 = rdd.map(new Function() {
>>>>
>>>>
>>>> @Override
>>>> public Charge call(Charge ch) throws Exception {
>>>>
>>>>
>>>>* DataFrame df = accountRdd.filter("login=test");*
>>>>
>>>> return ch;
>>>> }
>>>>
>>>> });
>>>>
>>>> 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0
>>>> (TID 0)
>>>> java.lang.NullPointerException
>>>> at org.apache.spark.sql.DataFrame.(DataFrame.scala:129)
>>>> at org.apache.spark.sql.DataFrame.org
>>>> $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
>>>>
>>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>
>


Re: Filter on Grouped Data

2015-07-03 Thread Raghavendra Pandey
Why dont you apply filter first and then Group the data and run
aggregations..
On Jul 3, 2015 1:29 PM, "Megha Sridhar- Cynepia" 
wrote:

> Hi,
>
>
> I have a Spark DataFrame object, which when trimmed, looks like,
>
>
>
> FromTo  SubjectMessage-ID
> karen@xyz.com['vance.me...@enron.com', SEC Inquiry
> <19952575.1075858>
>  'jeannie.mandel...@enron.com',
>  'mary.cl...@enron.com',
>  'sarah.pal...@enron.com']
>
>
>
> elyn.hug...@xyz.com['dennis.ve...@enron.com',Revised
> documents<33499184.1075858>
>  'gina.tay...@enron.com',
>  'kelly.kimbe...@enron.com']
> .
> .
> .
>
>
> I have run a groupBy("From") on the above dataFrame and obtained a
> GroupedData object as a result. I need to apply a filter on the grouped
> data (for instance, getting the sender who sent maximum number of the mails
> that were addressed to a particular receiver in the "To" list).
> Is there a way to accomplish this by applying filter on grouped data?
>
>
> Thanks,
> Megha
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Streaming: updating broadcast variables

2015-07-03 Thread Raghavendra Pandey
You cannot update the broadcasted variable.. It wont get reflected on
workers.
On Jul 3, 2015 12:18 PM, "James Cole"  wrote:

> Hi all,
>
> I'm filtering a DStream using a function. I need to be able to change this
> function while the application is running (I'm polling a service to see if
> a user has changed their filtering). The filter function is a
> transformation and runs on the workers, so that's where the updates need to
> go. I'm not sure of the best way to do this.
>
> Initially broadcasting seemed like the way to go: the filter is actually
> quite large. But I don't think I can update something I've broadcasted.
> I've tried unpersisting and re-creating the broadcast variable but it
> became obvious this wasn't updating the reference on the worker. So am I
> correct in thinking I can't use broadcasted variables for this purpose?
>
> The next option seems to be: stopping the JavaStreamingContext, creating a
> new one from the SparkContext, updating the filter function, and
> re-creating the DStreams (I'm using direct streams from Kafka).
>
> If I re-created the JavaStreamingContext would the accumulators (which are
> created from the SparkContext) keep working? (Obviously I'm going to try
> this soon)
>
> In summary:
>
> 1) Can broadcasted variables be updated?
>
> 2) Is there a better way than re-creating the JavaStreamingContext and
> DStreams?
>
> Thanks,
>
> James
>
>


Re: Optimizations

2015-07-03 Thread Raghavendra Pandey
This is the basic design of spark that it runs all actions in different
stages...
Not sure you can achieve what you r looking for.
On Jul 3, 2015 12:43 PM, "Marius Danciu"  wrote:

> Hi all,
>
> If I have something like:
>
> rdd.join(...).mapPartitionToPair(...)
>
> It looks like mapPartitionToPair runs in a different stage then join. Is
> there a way to piggyback this computation inside the join stage ? ... such
> that each result partition after join is passed to
> the mapPartitionToPair function, all running in the same state without any
> other costs.
>
> Best,
> Marius
>


Re: Are Spark Streaming RDDs always processed in order?

2015-07-03 Thread Raghavendra Pandey
I dont think you can expect any order guarantee except the records in one
partition.
 On Jul 4, 2015 7:43 AM, "khaledh"  wrote:

> I'm writing a Spark Streaming application that uses RabbitMQ to consume
> events. One feature of RabbitMQ that I intend to make use of is bulk ack of
> messages, i.e. no need to ack one-by-one, but only ack the last event in a
> batch and that would ack the entire batch.
>
> Before I commit to doing so, I'd like to know if Spark Streaming always
> processes RDDs in the same order they arrive in, i.e. if RDD1 arrives
> before
> RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is
> finished?
>
> This is crucial to the ack logic, since if RDD2 can be potentially
> processed
> while RDD1 is still being processed, then if I ack the the last event in
> RDD2 that would also ack all events in RDD1, even though they may have not
> been completely processed yet.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

2014-12-19 Thread Raghavendra Pandey
It seems there is hadoop 1 somewhere in the path.

On Fri, Dec 19, 2014, 21:24 Sean Owen  wrote:

> Yes, but your error indicates that your application is actually using
> Hadoop 1.x of some kind. Check your dependencies, especially
> hadoop-client.
>
> On Fri, Dec 19, 2014 at 2:11 PM, Haopu Wang  wrote:
> > I’m using Spark 1.1.0 built for HDFS 2.4.
> >
> > My application enables check-point (to HDFS 2.5.1) and it can build. But
> > when I run it, I get below error:
> >
> >
> >
> > Exception in thread "main" org.apache.hadoop.ipc.RemoteException:
> Server IPC
> > version 9 cannot communicate with client version 4
> >
> > at org.apache.hadoop.ipc.Client.call(Client.java:1070)
> >
> > at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
> >
> > at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
> >
> > at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
> >
> > at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
> >
> > at
> > org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
> >
> > at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
> >
> > at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
> >
> > at
> > org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
> DistributedFileSystem.java:89)
> >
> > at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
> >
> > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
> >
> > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
> >
> > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
> >
> > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
> >
> > at
> > org.apache.spark.streaming.StreamingContext.checkpoint(
> StreamingContext.scala:201)
> >
> >
> >
> > Does that mean I have to use HDFS 2.4 to save check-point? Thank you!
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark app performance

2014-12-30 Thread Raghavendra Pandey
I have a spark app that involves series of mapPartition operations and then
a keyBy operation. I have measured the time inside mapPartition function
block. These blocks take trivial time. Still the application takes way too
much time and even sparkUI shows that much time.
So i was wondering where does it take time and how can I reduce this.

Thanks
Raghavendra


Re: FlatMapValues

2014-12-31 Thread Raghavendra Pandey
Why don't you push "\n" instead of "\t" in your first transformation [
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"
+fields(9)))] and then do saveAsTextFile?

-Raghavendra

On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian
 wrote:

> hey guys
>
> My dataset is like this
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
> Intended output is
> ==
> 025126,Chills
> 025126,Injection site oedema
> 025126,Injection site reaction
> 025126,Malaise
> 025126,Myalgia
>
> My code is as follows but the flatMapValues does not work even after I have 
> created the pair RDD.
>
> 
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
> 
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
> 
>
>
> thanks
>
> sanjay
>


Re: Spark app performance

2015-01-01 Thread Raghavendra Pandey
I have seen that link. I am using RDD of Byte Array n Kryo serialization.
Inside mapPartition when I measure time it is never more than 1 ms whereas
total time took by application is like 30 min. Codebase has lot of
dependencies. I m trying to come up with a simple version where I can
reproduce this problem.
Also GC timings reported by spark ui is always in the range of 3~4%of total
time.

On Thu, Jan 1, 2015, 14:05 Akhil Das  wrote:

> Would be great if you can share the piece of code happening inside your
> mapPartition, I'm assuming you are creating/handling a lot of Complex
> objects and hence it slows down the performance. Here's a link
> <http://spark.apache.org/docs/latest/tuning.html> to performance tuning
> if you haven't seen it already.
>
> Thanks
> Best Regards
>
> On Wed, Dec 31, 2014 at 8:45 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> I have a spark app that involves series of mapPartition operations and
>> then a keyBy operation. I have measured the time inside mapPartition
>> function block. These blocks take trivial time. Still the application takes
>> way too much time and even sparkUI shows that much time.
>> So i was wondering where does it take time and how can I reduce this.
>>
>> Thanks
>> Raghavendra
>>
>
>


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
You can also use join function of rdd. This is actually kind of append
funtion that add up all the rdds and create one uber rdd.

On Wed, Jan 7, 2015, 14:30 rkgurram  wrote:

> Thank you for the response, sure will try that out.
>
> Currently I changed my code such that the first map "files.map" to
> "files.flatMap", which I guess will do similar what you are saying, it
> gives
> me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
> which I then turned into a mega RDD. The current problem seems to be gone,
> I
> no longer get the NPE but further down I am getting a indexOutOfBounds, so
> trying to figure out if the original problem is manifesting itself as a new
> one.
>
>
> Regards
> -Ravi
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
> uber-RDD-tp20986p21012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
Yup, i meant union only.

On Wed, Jan 7, 2015, 16:19 Sean Owen  wrote:

> I think you mean union(). Yes, you could also simply make an RDD for each
> file, and use SparkContext.union() to put them together.
>
> On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> You can also use join function of rdd. This is actually kind of append
>> funtion that add up all the rdds and create one uber rdd.
>>
>> On Wed, Jan 7, 2015, 14:30 rkgurram  wrote:
>>
>>> Thank you for the response, sure will try that out.
>>>
>>> Currently I changed my code such that the first map "files.map" to
>>> "files.flatMap", which I guess will do similar what you are saying, it
>>> gives
>>> me a List[] of elements (in this case LabeledPoints, I could also do
>>> RDDs)
>>> which I then turned into a mega RDD. The current problem seems to be
>>> gone, I
>>> no longer get the NPE but further down I am getting a indexOutOfBounds,
>>> so
>>> trying to figure out if the original problem is manifesting itself as a
>>> new
>>> one.
>>>
>>>
>>> Regards
>>> -Ravi
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
>>> uber-RDD-tp20986p21012.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-08 Thread Raghavendra Pandey
I have the similar kind of requirement where I want to push avro data into
parquet. But it seems you have to do it on your own. There is parquet-mr
project that uses hadoop to do so. I am trying to write a spark job to do
similar kind of thing.

On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam  wrote:

> Hi spark users,
>
> I'm using spark SQL to create parquet files on HDFS. I would like to store
> the avro schema into the parquet meta so that non spark sql applications
> can marshall the data without avro schema using the avro parquet reader.
> Currently, schemaRDD.saveAsParquetFile does not allow to do that. Is there
> another API that allows me to do this?
>
> Best Regards,
>
> Jerry
>


Re: Failed to save RDD as text file to local file system

2015-01-08 Thread Raghavendra Pandey
Can you check permissions etc as I am able to run
r.saveAsTextFile("file:///home/cloudera/tmp/out1")
successfully on my machine..

On Fri, Jan 9, 2015 at 10:25 AM, NingjunWang 
wrote:

> I try to save RDD as text file to local file system (Linux) but it does not
> work
>
> Launch spark-shell and run the following
>
> val r = sc.parallelize(Array("a", "b", "c"))
> r.saveAsTextFile("file:///home/cloudera/tmp/out1")
>
>
> IOException: Mkdirs failed to create
>
> file:/home/cloudera/tmp/out1/_temporary/0/_temporary/attempt_201501082027_0003_m_00_47
> (exists=false, cwd=file:/var/run/spark/work/app-20150108201046-0021/0)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:442)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:801)
> at
>
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
> at
> org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1056)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1047)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I also try with 4 slash but still get the same error
> r.saveAsTextFile("file:home/cloudera/tmp/out1")
>
> Please advise
>
> Ningjun
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-08 Thread Raghavendra Pandey
I cam across this http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/.
You can take a look.

On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> I have the similar kind of requirement where I want to push avro data into
> parquet. But it seems you have to do it on your own. There is parquet-mr
> project that uses hadoop to do so. I am trying to write a spark job to do
> similar kind of thing.
>
> On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam  wrote:
>
>> Hi spark users,
>>
>> I'm using spark SQL to create parquet files on HDFS. I would like to
>> store the avro schema into the parquet meta so that non spark sql
>> applications can marshall the data without avro schema using the avro
>> parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do
>> that. Is there another API that allows me to do this?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: Cleaning up spark.local.dir automatically

2015-01-09 Thread Raghavendra Pandey
You may like to look at spark.cleaner.ttl configuration which is infinite
by default. Spark has that configuration to delete temp files time to time.

On Fri Jan 09 2015 at 8:34:10 PM  wrote:

>  Hi,
>
>
>
> Is there a way of automatically cleaning up the spark.local.dir after a
> job has been run? I have noticed a large number of temporary files have
> been stored here and are not cleaned up. The only solution I can think of
> is to run some sort of cron job to delete files older than a few days. I am
> currently using a mixture of standalone and YARN spark builds.
>
>
>
> Thanks,
>
> Michael
>
>
>
> This e-mail (including any attachments) is private and confidential, may
> contain proprietary or privileged information and is intended for the named
> recipient(s) only. Unintended recipients are strictly prohibited from
> taking action on the basis of information in this e-mail and must contact
> the sender immediately, delete this e-mail (and all attachments) and
> destroy any hard copies. Nomura will not accept responsibility or liability
> for the accuracy or completeness of, or the presence of any virus or
> disabling code in, this e-mail. If verification is sought please request a
> hard copy. Any reference to the terms of executed transactions should be
> treated as preliminary only and subject to formal written confirmation by
> Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
> communications through its networks (subject to and in accordance with
> applicable laws). No confidentiality or privilege is waived or lost by
> Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is
> a reference to any entity in the Nomura Holdings, Inc. group. Please read
> our Electronic Communications Legal Notice which forms part of this e-mail:
> http://www.Nomura.com/email_disclaimer.htm
>


Re: Web Service + Spark

2015-01-11 Thread Raghavendra Pandey
You can take a look at http://zeppelin.incubator.apache.org. it is a
notebook and graphic visual designer.

On Sun, Jan 11, 2015, 01:45 Cui Lin  wrote:

>  Thanks, Gaurav and Corey,
>
>  Probably I didn’t make myself clear. I am looking for best Spark
> practice similar to Shiny for R, the analysis/visualziation results can be
> easily published to web server and shown from web browser. Or any dashboard
> for Spark?
>
>  Best regards,
>
>  Cui Lin
>
>   From: gtinside 
> Date: Friday, January 9, 2015 at 7:45 PM
> To: Corey Nolet 
> Cc: Cui Lin , "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: Web Service + Spark
>
>   You can also look at Spark Job Server
> https://github.com/spark-jobserver/spark-jobserver
>
> - Gaurav
>
> On Jan 9, 2015, at 10:25 PM, Corey Nolet  wrote:
>
>   Cui Lin,
>
>  The solution largely depends on how you want your services deployed
> (Java web container, Spray framework, etc...) and if you are using a
> cluster manager like Yarn or Mesos vs. just firing up your own executors
> and master.
>
>  I recently worked on an example for deploying Spark services inside of
> Jetty using Yarn as the cluster manager. It forced me to learn how Spark
> wires up the dependencies/classpaths. If it helps, the example that
> resulted from my tinkering is located at [1].
>
>
>  [1] https://github.com/calrissian/spark-jetty-server
>
> On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin  wrote:
>
>>  Hello, All,
>>
>>  What’s the best practice on deploying/publishing spark-based scientific
>> applications into a web service? Similar to Shiny on R.
>>  Thanks!
>>
>>  Best regards,
>>
>>  Cui Lin
>>
>
>


Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-11 Thread Raghavendra Pandey
I think AvroWriteSupport class already saves avro schema as part of parquet
meta data. You can think of using parquet-mr
<https://github.com/Parquet/parquet-mr> directly.

Raghavendra

On Fri, Jan 9, 2015 at 10:32 PM, Jerry Lam  wrote:

> Hi Raghavendra,
>
> This makes a lot of sense. Thank you.
> The problem is that I'm using Spark SQL right now to generate the parquet
> file.
>
> What I think I need to do is to use Spark directly and transform all rows
> from SchemaRDD to avro objects and supply it to use saveAsNewAPIHadoopFile
> (from the PairRDD). From there, I can supply the avro schema to parquet via
> AvroParquetOutputFormat.
>
> It is not difficult just not as simple as I would like because SchemaRDD
> can write to Parquet file using its schema and if I can supply the avro
> schema to parquet, it save me the transformation step for avro objects.
>
> I'm thinking of overriding the saveAsParquetFile method to allows me to
> persist the avro schema inside parquet. Is this possible at all?
>
> Best Regards,
>
> Jerry
>
>
> On Fri, Jan 9, 2015 at 2:05 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> I cam across this
>> http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. You can take
>> a look.
>>
>>
>> On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> I have the similar kind of requirement where I want to push avro data
>>> into parquet. But it seems you have to do it on your own. There
>>> is parquet-mr project that uses hadoop to do so. I am trying to write a
>>> spark job to do similar kind of thing.
>>>
>>> On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam  wrote:
>>>
>>>> Hi spark users,
>>>>
>>>> I'm using spark SQL to create parquet files on HDFS. I would like to
>>>> store the avro schema into the parquet meta so that non spark sql
>>>> applications can marshall the data without avro schema using the avro
>>>> parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do
>>>> that. Is there another API that allows me to do this?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>


Re: Save RDD with partition information

2015-01-13 Thread Raghavendra Pandey
I believe the default hash partitioner logic in spark will send all the
same keys to same machine.

On Wed, Jan 14, 2015, 03:03 Puneet Kapoor  wrote:

> Hi,
>
> I have a usecase where in I have hourly spark job which creates hourly
> RDDs, which are partitioned by keys.
>
> At the end of the day I need to access all of these RDDs and combine the
> Key/Value pairs over the day.
>
> If there is a key K1 in RDD0 (1st hour of day), RDD1 ... RDD23(last hour
> of the day); we need to combine all the values of this K1 using some logic.
>
> What I want to do is to avoid the shuffling at the end of the day since
> the data in huge ~ hundreds of GB.
>
> Questions
> ---
> 1.) Is there a way that i can persist hourly RDDs with partition
> information and then while reading back the RDDs the partition information
> is restored.
> 2.) Can i ensure that partitioning is similar for different hours. Like if
> K1 goes to container_X, it would go to the same container in the next hour
> and so on.
>
> Regards
> Puneet
>
>


Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-18 Thread Raghavendra Pandey
If you are running spark in local mode, executor parameters are not used as
there is no executor. You should try to set corresponding driver parameter
to effect it.

On Mon, Jan 19, 2015, 00:21 Sean Owen  wrote:

> OK. Are you sure the executor has the memory you think? "-Xmx24g" in
> its command line? It may be that for some reason your job is reserving
> an exceptionally large amount of non-heap memory. I am not sure that's
> to be expected with the ALS job though. Even if the settings work,
> considering using the explicit command line configuration.
>
> On Sat, Jan 17, 2015 at 12:49 PM, Antony Mayi 
> wrote:
> > the values are for sure applied as expected - confirmed using the spark
> UI
> > environment page...
> >
> > it comes from my defaults configured using
> > 'spark.yarn.executor.memoryOverhead=8192' (yes, now increased even
> more) in
> > /etc/spark/conf/spark-defaults.conf and 'export
> SPARK_EXECUTOR_MEMORY=24G'
> > in /etc/spark/conf/spark-env.sh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get the master URL at runtime inside driver program?

2015-01-19 Thread Raghavendra Pandey
If you pass spark master URL to spark-submit, you don't need to pass the
same to SparkConf object. You can create SparkConf without this property or
for that matter any other property that you pass in spark-submit.

On Sun, Jan 18, 2015 at 7:38 AM, guxiaobo1982  wrote:

> Hi,
>
> Driver programs submitted by the spark-submit script will get the runtime
> spark master URL, but how it get the URL inside the main method when
> creating the SparkConf object?
>
> Regards,
>
>


Re: Is there a way to delete hdfs file/directory using spark API?

2015-01-21 Thread Raghavendra Pandey
You can use Hadoop Client Api to remove files
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#delete(org.apache.hadoop.fs.Path,
boolean). I don't think spark has any wrapper on hadoop filesystem APIs.

On Thu, Jan 22, 2015 at 12:15 PM, LinQili  wrote:

> Hi, all
> I wonder how to delete hdfs file/directory using spark API?
>


Re: Cheapest way to materialize an RDD?

2015-02-02 Thread Raghavendra Pandey
You can also do something like
rdd.sparkContext.runJob(rdd,(iter: Iterator[T]) => {
  while(iter.hasNext) iter.next()
})

On Sat, Jan 31, 2015 at 5:24 AM, Sean Owen  wrote:

> Yeah, from an unscientific test, it looks like the time to cache the
> blocks still dominates. Saving the count is probably a win, but not
> big. Well, maybe good to know.
>
> On Fri, Jan 30, 2015 at 10:47 PM, Stephen Boesch 
> wrote:
> > Theoretically your approach would require less overhead - i.e. a collect
> on
> > the driver is not required as the last step.  But maybe the difference is
> > small and that particular path may or may not have been properly
> optimized
> > vs the count(). Do you have a biggish data set to compare the timings?
> >
> > 2015-01-30 14:42 GMT-08:00 Sean Owen :
> >>
> >> So far, the canonical way to materialize an RDD just to make sure it's
> >> cached is to call count(). That's fine but incurs the overhead of
> >> actually counting the elements.
> >>
> >> However, rdd.foreachPartition(p => None) for example also seems to
> >> cause the RDD to be materialized, and is a no-op. Is that a better way
> >> to do it or am I not thinking of why it's insufficient?
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Shuffle read/write issue in spark 1.2

2015-02-05 Thread Raghavendra Pandey
Even I observed the same issue.

On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg 
wrote:

>  Hi,
>
>  While moving from spark 1.1 to spark 1.2, we are facing an issue where
> Shuffle read/write has been increased significantly. We also tried running
> the job by rolling back to spark 1.1 configuration where we set
> spark.shuffle.manager to hash and spark.shuffle.blockTransferService to
> nio. It did improve the performance a bit but it was still much worse than
> spark 1.1. The scenario seems similar to the bug raised sometime back
> https://issues.apache.org/jira/browse/SPARK-5081.
> Has anyone come across any similar issue? Please tell us if any
> configuration change can help.
>
>  Regards, Praveen
>
>


Re: Executor lost with too many temp files

2015-02-25 Thread Raghavendra Pandey
Can you try increasing the ulimit -n on your machine.

On Mon, Feb 23, 2015 at 10:55 PM, Marius Soutier  wrote:

> Hi Sameer,
>
> I’m still using Spark 1.1.1, I think the default is hash shuffle. No
> external shuffle service.
>
> We are processing gzipped JSON files, the partitions are the amount of
> input files. In my current data set we have ~850 files that amount to 60 GB
> (so ~600 GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM
> each. We extract five different groups of data from this to filter, clean
> and denormalize (i.e. join) it for easier downstream processing.
>
> By the way this code does not seem to complete at all without using
> coalesce() at a low number, 5 or 10 work great. Everything above that make
> it very likely it will crash, even on smaller datasets (~300 files). But
> I’m not sure if this is related to the above issue.
>
>
> On 23.02.2015, at 18:15, Sameer Farooqui  wrote:
>
> Hi Marius,
>
> Are you using the sort or hash shuffle?
>
> Also, do you have the external shuffle service enabled (so that the Worker
> JVM or NodeManager can still serve the map spill files after an Executor
> crashes)?
>
> How many partitions are in your RDDs before and after the problematic
> shuffle operation?
>
>
>
> On Monday, February 23, 2015, Marius Soutier  wrote:
>
>> Hi guys,
>>
>> I keep running into a strange problem where my jobs start to fail with
>> the dreaded "Resubmitted (resubmitted due to lost executor)” because of
>> having too many temp files from previous runs.
>>
>> Both /var/run and /spill have enough disk space left, but after a given
>> amount of jobs have run, following jobs will struggle with completion.
>> There are a lot of failures without any exception message, only the above
>> mentioned lost executor. As soon as I clear out /var/run/spark/work/ and
>> the spill disk, everything goes back to normal.
>>
>> Thanks for any hint,
>> - Marius
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Using 'fair' scheduler mode

2015-03-31 Thread Raghavendra Pandey
I am facing the same issue. FAIR and FIFO behaving in the same way.

On Wed, Apr 1, 2015 at 1:49 AM, asadrao  wrote:

> Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the
> first query is a very expensive query (ex: ‘select *’ on a really big data
> set) than any subsequent query seem to get blocked. I would have expected
> the second query to run in parallel since I am using the ‘fair’ scheduler
> mode not the ‘fifo’. I am submitting the query through thrift server.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL: How to get the hierarchical element with SQL?

2014-12-08 Thread Raghavendra Pandey
Yeah, the dot notation works. It works even for arrays. But I am not sure
if it can handle complex hierarchies.

On Mon Dec 08 2014 at 11:55:19 AM Cheng Lian  wrote:

>  You may access it via something like SELECT filterIp.element FROM tb,
> just like Hive. Or if you’re using Spark SQL DSL, you can use
> tb.select("filterIp.element".attr).
>
> On 12/8/14 1:08 PM, Xuelin Cao wrote:
>
>
>  Hi,
>
>  I'm generating a Spark SQL table from an offline Json file.
>
>  The difficulty is, in the original json file, there is a
> hierarchical structure. And, as a result, this is what I get:
>
>  scala> tb.printSchema
> root
>  |-- budget: double (nullable = true)
> * |-- filterIp: array (nullable = true)*
> * ||-- element: string (containsNull = false)*
>  |-- status: integer (nullable = true)
>  |-- third_party: integer (nullable = true)
>  |-- userId: integer (nullable = true)
>
>  As you may have noticed, the table schema is with a hierarchical
> structure ("element" field is a sub-field under the "filterIp" field).
> Then, my question is, how do I access the "element" field with SQL?
>
>
>​
>


Re: Locking for shared RDDs

2014-12-08 Thread Raghavendra Pandey
You don't need to worry about locks as such as one thread/worker is
responsible exclusively for one partition of the RDD. You can use
Accumulator variables that spark provides to get the state updates.

On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye 
wrote:

> I am relatively new to Spark. I am planning to use Spark Streaming for my
> OLAP use case, but I would like to know how RDDs are shared between
> multiple
> workers.
> If I need to constantly compute some stats on the streaming data,
> presumably
> shared state would have to updated serially by different spark workers. Is
> this managed by Spark automatically or does the application need to ensure
> distributed locks are acquired?
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>