Job Opportunities in India,UK,Australia,UAE,Singapore or USA

2024-11-12 Thread sri hari kali charan Tummala
Hello Spark Community,

As a seasoned Data Engineering professional with 12+ years of experience, I
specialize in Apache Spark, particularly in Structured Streaming. I'm
currently exploring job opportunities in India, the UK, Australia, UAE,
Singapore, and the USA.

If anyone is aware of openings or can connect me with potential employers,
I'd greatly appreciate it. Thank you!

Best regards,
Sri Tummala


Spark Job Opportunities !

2024-09-26 Thread sri hari kali charan Tummala
Hi Spark Community,

I'm a Hands on Apache Flink/Spark Software Engineer with over 13+ of Data
Engineering experience looking for job opportunities in India,USA (with h1b
transfer ) or the UK (with Tier 2 sponsorship). Point me to right direction
if you come across such opportunities.

Thanks,
Sri Tummala


Job Opportunities in India or UK with Tier 2 Sponsorship - Spark Expert

2024-08-26 Thread sri hari kali charan Tummala
Hi Spark Community,

I'm a seasoned Data Engineering professional with 13+ years of experience
and expertise in Apache Spark, particularly in Structured Streaming. I'm
looking for job opportunities in India or the UK that offer Tier 2
sponsorship. If anyone knows of openings or can connect me with potential
employers, please reach out.

Thanks,
Sri Tummala


India Scala & Big Data Job Referral

2023-12-21 Thread sri hari kali charan Tummala
Hi Community,

I was laid off from Apple in February 2023, which led to my relocation from
the USA due to immigration issues related to my H1B visa.


I have over 12 years of experience as a consultant in Big Data, Spark,
Scala, Python, and Flink.


Despite my move to India, I haven't secured a job yet. I am seeking
referrals within product firms (preferably non-consulting) in India that
work with Flink, Spark, Scala, Big Data, or in the fields of ML & AI. Can
someone assist me with this?

Thanks
Sri


Spark Scala Contract Opportunity @USA

2022-11-10 Thread sri hari kali charan Tummala
Hi All,

Is anyone looking for a spark scala contract role inside the USA? A company
called Maxonic has an open spark scala contract position (100% remote)
inside the USA if anyone is interested, please send your CV to
kali.tumm...@gmail.com.

Thanks & Regards
Sri Tummala


Big Data Contract Roles ?

2022-09-14 Thread sri hari kali charan Tummala
Hi Flink Users/ Spark Users,

Is anyone hiring contract corp to corp big Data spark scala or Flink scala
roles ?


Thanks
Sri


Unsubscribe

2022-09-13 Thread Hari Kunapareddy



Spark MLlib: Should I call .cache before fitting a model?

2018-02-27 Thread Gevorg Hari
Imagine that I am training a Spark MLlib model as follows:

val traingData = loadTrainingData(...)val logisticRegression = new
LogisticRegression()

traingData.cacheval logisticRegressionModel =
logisticRegression.fit(trainingData)

Does the call traingData.cache improve performances at training time or is
it not needed?

Does the .fit(...) method for a ML algorithm call cache/unpersist
internally?


Spark MLlib Question - Online Scoring of PipelineModel

2018-01-05 Thread Gevorg Hari
Is Spark planning to support *online scoring* (without any Spark
dependencies) of a PipelineModel trained offline? Not being able to do so
is a huge barrier to entry for using Spark in production at my company...

For online support, I found this https://github.com/combust/mleap
Any feedback on production use of *MLeap*? Will this ever be integrated
into the main Spark project? When?

Thanks a lot!


Re: sql to spark scala rdd

2016-08-01 Thread sri hari kali charan Tummala
Hi All,

Below code calculates cumulative sum (running sum) and moving average using
scala RDD type of programming, I was using wrong function which is sliding
use scalleft instead.


sc.textFile("C:\\Users\\kalit_000\\Desktop\\Hadoop_IMP_DOC\\spark\\data.txt")
  .map(x => x.split("\\~"))
  .map(x => (x(0), x(1), x(2).toDouble))
  .groupBy(_._1)
  .mapValues{(x => x.toList.sortBy(_._2).zip(Stream from
1).scanLeft(("","",0.0,0.0,0.0,0.0))
  { (a,b) => (b._1._1,b._1._2,b._1._3,(b._1._3.toDouble +
a._3.toDouble),(b._1._3.toDouble + a._3.toDouble)/b._2,b._2)}.tail)}
  .flatMapValues(x => x.sortBy(_._1))
  .foreach(println)

Input Data:-

Headers:-
Key,Date,balance

786~20160710~234
786~20160709~-128
786~20160711~-457
987~20160812~456
987~20160812~567

Output Data:-

Column Headers:-
key, (key,Date,balance , daily balance, running average , row_number based
on key)

(786,(786,20160709,-128.0,-128.0,-128.0,1.0))
(786,(786,20160710,234.0,106.0,53.0,2.0))
(786,(786,20160711,-457.0,-223.0,-74.33,3.0))

(987,(987,20160812,567.0,1023.0,511.5,2.0))
(987,(987,20160812,456.0,456.0,456.0,1.0))

Reference:-

https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/


Thanks
Sri


On Mon, Aug 1, 2016 at 12:07 AM, Sri  wrote:

> Hi ,
>
> I solved it using spark SQL which uses similar window functions mentioned
> below , for my own knowledge I am trying to solve using Scala RDD which I
> am unable to.
> What function in Scala supports window function like SQL unbounded
> preceding and current row ? Is it sliding ?
>
>
> Thanks
> Sri
>
> Sent from my iPhone
>
> On 31 Jul 2016, at 23:16, Mich Talebzadeh 
> wrote:
>
> hi
>
> You mentioned:
>
> I already solved it using DF and spark sql ...
>
> Are you referring to this code which is a classic analytics:
>
> SELECT DATE,balance,
>  SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>
>  AND
>
>  CURRENT ROW) daily_balance
>
>  FROM  table
>
>
> So how did you solve it using DF in the first place?
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 August 2016 at 07:04, Sri  wrote:
>
>> Hi ,
>>
>> Just wondering how spark SQL works behind the scenes does it not convert
>> SQL to some Scala RDD ? Or Scala ?
>>
>> How to write below SQL in Scala or Scala RDD
>>
>> SELECT DATE,balance,
>>
>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>>
>> AND
>>
>> CURRENT ROW) daily_balance
>>
>> FROM  table
>>
>>
>> Thanks
>> Sri
>> Sent from my iPhone
>>
>> On 31 Jul 2016, at 13:21, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Impossible - see
>>
>> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr]
>> .
>>
>> I tried to show you why you ended up with "non-empty iterator" after
>> println. You should really start with
>> http://www.scala-lang.org/documentation/
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala
>>  wrote:
>>
>> Tuple
>>
>>
>> [Lscala.Tuple2;@65e4cb84
>>
>>
>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:
>>
>>
>> Hi,
>>
>>
>> What's the result type of sliding(2,1)?
>>
>>
>> Pozdrawiam,
>>
>> Jacek Laskowski
>>
>> 
>>
>> https://medium.com/@jaceklaskowski/
>>
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>>
>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>
>>  wrote:
>>
>> tried this no luck, wht is non-empty iterator here ?
>>
&

Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => ((x(0),x(1),x(2
  .map{case (account,datevalue,amount) =>
((account,datevalue),(amount.toDouble))}.mapValues(x =>
x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ +
_._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)


On Sun, Jul 31, 2016 at 12:15 PM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I already solved it using DF and spark sql I was wondering how to solve in
> scala rdd, I just got the answer need to check my results compared to spark
> sql thanks all for your time.
>
> I am trying to solve moving average using scala RDD group by key.
>
>
> input:-
> -987~20150728~100
> -987~20150729~50
> -987~20150730~-100
> -987~20150804~200
> -987~20150807~-300
> -987~20150916~100
>
>
> val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
>   .map(x => x._2.split("\\~"))
>   .map(x => ((x(0),x(1),x(2
>   .map{case (account,datevalue,amount) => 
> ((account,datevalue),(amount.toDouble))}.mapValues(x => 
> x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ + 
> _._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)
>
> Op:-
>
> accountkey,date,balance_of_account, daily_average, sum_base_on_window
>
> ((-987,20150728),50.0,75.0,150.0)
> ((-987,20150729),-100.0,-25.0,-50.0)
> ((-987,20150730),200.0,50.0,100.0)
> ((-987,20150804),-300.0,-50.0,-100.0)
> ((-987,20150807),100.0,-100.0,-200.0)
>
>
> below book is written for Hadoop Mapreduce the book has solution for
> moving average but its in Java.
>
>
> https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html
>
>
> Sql:-
>
>
> SELECT DATE,balance,
> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW) daily_balance
> FROM  table
>
> Thanks
> Sri
>
>
>
> On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Check also this
>> <https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 31 July 2016 at 19:49, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Tuple
>>>
>>> [Lscala.Tuple2;@65e4cb84
>>>
>>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What's the result type of sliding(2,1)?
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> 
>>>> https://medium.com/@jaceklaskowski/
>>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>
>>>>
>>>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>>>  wrote:
>>>> > tried this no luck, wht is non-empty iterator here ?
>>>> >
>>>> > OP:-
>>>> > (-987,non-empty iterator)
>>>> > (-987,non-empty iterator)
>>>> > (-987,non-empty iterator)
>>>> > (-987,non-empty iterator)
>>>> > (-987,non-empty iterator)
>>>> >
>>>> >
>>>> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>>> >   .map(x => x._2.split("\\~"))
>>>> >   .map(x => (x(0),x(2)))
>>>> > .map { case (key,value) =>
>>>> (key,value.toArray.toSeq.sliding(2,1).map(x
>>>> > => x.sum/x.size))}.foreach(println)
>>>> >
>>>> >
>>>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>>>> >  wrote:
>>>> >>
>>>> >> Hi

Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Hi All,

I already solved it using DF and spark sql I was wondering how to solve in
scala rdd, I just got the answer need to check my results compared to spark
sql thanks all for your time.

I am trying to solve moving average using scala RDD group by key.


input:-
-987~20150728~100
-987~20150729~50
-987~20150730~-100
-987~20150804~200
-987~20150807~-300
-987~20150916~100


val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => ((x(0),x(1),x(2
  .map{case (account,datevalue,amount) =>
((account,datevalue),(amount.toDouble))}.mapValues(x =>
x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ +
_._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)

Op:-

accountkey,date,balance_of_account, daily_average, sum_base_on_window

((-987,20150728),50.0,75.0,150.0)
((-987,20150729),-100.0,-25.0,-50.0)
((-987,20150730),200.0,50.0,100.0)
((-987,20150804),-300.0,-50.0,-100.0)
((-987,20150807),100.0,-100.0,-200.0)


below book is written for Hadoop Mapreduce the book has solution for moving
average but its in Java.

https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html


Sql:-


SELECT DATE,balance,
SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) daily_balance
FROM  table

Thanks
Sri



On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh  wrote:

> Check also this
> <https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 19:49, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Tuple
>>
>> [Lscala.Tuple2;@65e4cb84
>>
>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> What's the result type of sliding(2,1)?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>>  wrote:
>>> > tried this no luck, wht is non-empty iterator here ?
>>> >
>>> > OP:-
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> >
>>> >
>>> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>> >   .map(x => x._2.split("\\~"))
>>> >   .map(x => (x(0),x(2)))
>>> > .map { case (key,value) =>
>>> (key,value.toArray.toSeq.sliding(2,1).map(x
>>> > => x.sum/x.size))}.foreach(println)
>>> >
>>> >
>>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>>> >  wrote:
>>> >>
>>> >> Hi All,
>>> >>
>>> >> I managed to write using sliding function but can it get key as well
>>> in my
>>> >> output ?
>>> >>
>>> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>> >>   .map(x => x._2.split("\\~"))
>>> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>>> >> (x,x.size)).foreach(println)
>>> >>
>>> >>
>>> >> at the moment my output:-
>>> >>
>>> >> 75.0
>>> >> -25.0
>>> >> 50.0
>>> >> -50.0
>>> >> -100.0
>>> >>
>>> >> I want with key how to get moving average output based on key ?
>>> >>
>>> >>
>>> >> 987,75.0
>>> >> 987,-25
>>> >> 987,50.0
>>> >>
>>> >> Thanks
>>> >> Sri
>>> >>
>>> 

Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Tuple

[Lscala.Tuple2;@65e4cb84

On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:

> Hi,
>
> What's the result type of sliding(2,1)?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>  wrote:
> > tried this no luck, wht is non-empty iterator here ?
> >
> > OP:-
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> >
> >
> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
> >   .map(x => x._2.split("\\~"))
> >   .map(x => (x(0),x(2)))
> > .map { case (key,value) =>
> (key,value.toArray.toSeq.sliding(2,1).map(x
> > => x.sum/x.size))}.foreach(println)
> >
> >
> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
> >  wrote:
> >>
> >> Hi All,
> >>
> >> I managed to write using sliding function but can it get key as well in
> my
> >> output ?
> >>
> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
> >>   .map(x => x._2.split("\\~"))
> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
> >> (x,x.size)).foreach(println)
> >>
> >>
> >> at the moment my output:-
> >>
> >> 75.0
> >> -25.0
> >> 50.0
> >> -50.0
> >> -100.0
> >>
> >> I want with key how to get moving average output based on key ?
> >>
> >>
> >> 987,75.0
> >> 987,-25
> >> 987,50.0
> >>
> >> Thanks
> >> Sri
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
> >>  wrote:
> >>>
> >>> for knowledge just wondering how to write it up in scala or spark RDD.
> >>>
> >>> Thanks
> >>> Sri
> >>>
> >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
> >>> wrote:
> >>>>
> >>>> Why?
> >>>>
> >>>> Pozdrawiam,
> >>>> Jacek Laskowski
> >>>> 
> >>>> https://medium.com/@jaceklaskowski/
> >>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> >>>> Follow me at https://twitter.com/jaceklaskowski
> >>>>
> >>>>
> >>>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
> >>>>  wrote:
> >>>> > Hi All,
> >>>> >
> >>>> > I managed to write business requirement in spark-sql and hive I am
> >>>> > still
> >>>> > learning scala how this below sql be written using spark RDD not
> spark
> >>>> > data
> >>>> > frames.
> >>>> >
> >>>> > SELECT DATE,balance,
> >>>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
> AND
> >>>> > CURRENT ROW) daily_balance
> >>>> > FROM  table
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> > --
> >>>> > View this message in context:
> >>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
> >>>> > Sent from the Apache Spark User List mailing list archive at
> >>>> > Nabble.com.
> >>>> >
> >>>> >
> -
> >>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>> >
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks & Regards
> >>> Sri Tummala
> >>>
> >>
> >>
> >>
> >> --
> >> Thanks & Regards
> >> Sri Tummala
> >>
> >
> >
> >
> > --
> > Thanks & Regards
> > Sri Tummala
> >
>



-- 
Thanks & Regards
Sri Tummala


Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
tried this no luck, wht is non-empty iterator here ?

OP:-
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)


sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => (x(0),x(2)))
.map { case (key,value) =>
(key,value.toArray.toSeq.sliding(2,1).map(x =>
x.sum/x.size))}.foreach(println)


On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I managed to write using sliding function but can it get key as well in my
> output ?
>
> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>   .map(x => x._2.split("\\~"))
>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => 
> (x,x.size)).foreach(println)
>
>
> at the moment my output:-
>
> 75.0
> -25.0
> 50.0
> -50.0
> -100.0
>
> I want with key how to get moving average output based on key ?
>
>
> 987,75.0
> 987,-25
> 987,50.0
>
> Thanks
> Sri
>
>
>
>
>
>
> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> for knowledge just wondering how to write it up in scala or spark RDD.
>>
>> Thanks
>> Sri
>>
>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>> wrote:
>>
>>> Why?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>>  wrote:
>>> > Hi All,
>>> >
>>> > I managed to write business requirement in spark-sql and hive I am
>>> still
>>> > learning scala how this below sql be written using spark RDD not spark
>>> data
>>> > frames.
>>> >
>>> > SELECT DATE,balance,
>>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
>>> > CURRENT ROW) daily_balance
>>> > FROM  table
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


-- 
Thanks & Regards
Sri Tummala


Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Hi All,

I managed to write using sliding function but can it get key as well in my
output ?

sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
(x,x.size)).foreach(println)


at the moment my output:-

75.0
-25.0
50.0
-50.0
-100.0

I want with key how to get moving average output based on key ?


987,75.0
987,-25
987,50.0

Thanks
Sri






On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> for knowledge just wondering how to write it up in scala or spark RDD.
>
> Thanks
> Sri
>
> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski  wrote:
>
>> Why?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>  wrote:
>> > Hi All,
>> >
>> > I managed to write business requirement in spark-sql and hive I am still
>> > learning scala how this below sql be written using spark RDD not spark
>> data
>> > frames.
>> >
>> > SELECT DATE,balance,
>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
>> > CURRENT ROW) daily_balance
>> > FROM  table
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


-- 
Thanks & Regards
Sri Tummala


Re: sql to spark scala rdd

2016-07-30 Thread sri hari kali charan Tummala
for knowledge just wondering how to write it up in scala or spark RDD.

Thanks
Sri

On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski  wrote:

> Why?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>  wrote:
> > Hi All,
> >
> > I managed to write business requirement in spark-sql and hive I am still
> > learning scala how this below sql be written using spark RDD not spark
> data
> > frames.
> >
> > SELECT DATE,balance,
> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW) daily_balance
> > FROM  table
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Thanks & Regards
Sri Tummala


Re: spark local dir to HDFS ?

2016-07-05 Thread sri hari kali charan Tummala
thanks makes sense, can anyone answer this below question ?

http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-td27264.html

Thanks
Sri

On Tue, Jul 5, 2016 at 8:15 PM, Saisai Shao  wrote:

> It is not worked to configure local dirs to HDFS. Local dirs are mainly
> used for data spill and shuffle data persistence, it is not suitable to use
> hdfs. If you met capacity problem, you could configure multiple dirs
> located in different mounted disks.
>
> On Wed, Jul 6, 2016 at 9:05 AM, Sri  wrote:
>
>> Hi ,
>>
>> Space issue  we are currently using /tmp and at the moment we don't have
>> any mounted location setup yet.
>>
>> Thanks
>> Sri
>>
>>
>> Sent from my iPhone
>>
>> On 5 Jul 2016, at 17:22, Jeff Zhang  wrote:
>>
>> Any reason why you want to set this on hdfs ?
>>
>> On Tue, Jul 5, 2016 at 3:47 PM, kali.tumm...@gmail.com <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> can I set spark.local.dir to HDFS location instead of /tmp folder ?
>>>
>>> I tried setting up temp folder to HDFS but it didn't worked can
>>> spark.local.dir write to HDFS ?
>>>
>>> .set("spark.local.dir","hdfs://namednode/spark_tmp/")
>>>
>>>
>>> 16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in
>>> hdfs://namenode/spark_tmp/. Ignoring this directory.
>>> java.io.IOException: Failed to create a temp directory (under
>>> hdfs://namenode/spark_tmp/) after 10 attempts!
>>>
>>>
>>> Thanks
>>> Sri
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> .
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>


-- 
Thanks & Regards
Sri Tummala


Re: spark parquet too many small files ?

2016-07-02 Thread sri hari kali charan Tummala
Hi Takeshi,

I cant use coalesce in spark-sql shell right I know we can use coalesce in
spark with scala application , here in my project we are not building jar
or using python we are just executing hive query in spark-sql shell and
submitting to yarn client .

Example:-
spark-sql --verbose --queue default --name wchargeback_event.sparksql.kali
--master yarn-client --driver-memory 15g --executor-memory 15g
--num-executors 10 --executor-cores 2 -f /x/home/pp_dt_fin_batch/users/
srtummala/run-spark/sql/wtr_full.sql --conf
"spark.yarn.executor.memoryOverhead=8000"
--conf "spark.sql.shuffle.partitions=50" --conf
"spark.kyroserializer.buffer.max.mb=5g" --conf "spark.driver.maxResultSize=20g"
--conf "spark.storage.memoryFraction=0.8" --conf
"spark.hadoopConfiguration=2560"
--conf "spark.dynamicAllocation.enabled=false$" --conf
"spark.shuffle.service.enabled=false" --conf "spark.executor.instances=10"

Thanks
Sri




On Sat, Jul 2, 2016 at 2:53 AM, Takeshi Yamamuro 
wrote:

> Please also see https://issues.apache.org/jira/browse/SPARK-16188.
>
> // maropu
>
> On Fri, Jul 1, 2016 at 7:39 PM, kali.tumm...@gmail.com <
> kali.tumm...@gmail.com> wrote:
>
>> I found the jira for the issue will there be a fix in future ? or no fix ?
>>
>> https://issues.apache.org/jira/browse/SPARK-6221
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27267.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Thanks & Regards
Sri Tummala


HADOOP_HOME or hadoop.home.dir are not set

2016-03-21 Thread Hari Krishna Dara
I am using Spark 1.5.2 in yarn mode with Hadoop 2.6.0 (cdh5.4.2) and I am
consistently seeing the below exception in the map container logs for Spark
jobs (full stacktrace at the end of the message):

java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:304)
at org.apache.hadoop.util.Shell.(Shell.java:329)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:605)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:61)

This is the code that generates the above exception:

String home = System.getProperty("hadoop.home.dir");

// fall back to the system/user-global env variable
if (home == null) {
  home = System.getenv("HADOOP_HOME");
}

try {
   // couldn't find either setting for hadoop's home directory
   if (home == null) {
 throw new IOException("HADOOP_HOME or hadoop.home.dir are not
set.");
   }

I have hadoop home set in multiple places, such as:
- in bin/yarn as a system property
- in libexec/hadoop-config.sh as environment variable
- in conf/spark-env.sh as environment variable

However, this doesn't get passed in to the container JVM's. In fact, that
is the case even with a plain YARN job. I took a simple WordCount
application and added setup() method with the below code:

String homeDirProp = System.getProperty("hadoop.home.dir");
String homeDirEnv = System.getenv("HADOOP_HOME");
System.out.println("hadoop.home.dir="+homeDirProp+"
HADOOP_HOME="+homeDirEnv);

and when I check the stdout of the containers, I see this:

hadoop.home.dir=null HADOOP_HOME=null

As it stands, the IOException doesn't immediately fail the job, but I am
trying to understand   another issue with determining proxy IP and want to
rule this out. Interestingly, there doesn't seem to be anyway to pass a
system property or environment variable to map/reduce containers, so there
is no direct way to satisfy the Shell class, but it would be possible for
some other class to inject the system property as a workaround before it is
looked up by Shell.

Anyone else seen this issue? Could I be missing something here?

Thank you,
Hari

Full stack trace:

java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:304)
at org.apache.hadoop.util.Shell.(Shell.java:329)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:605)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:61)
at
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:46)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:386)
at
org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:384)
at
org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:384)
at
org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:401)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:149)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:250)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)


Re: how to run latest version of spark in old version of spark in cloudera cluster ?

2016-01-27 Thread sri hari kali charan Tummala
Thank you very much, well documented.

Thanks
Sri

On Wed, Jan 27, 2016 at 8:46 PM, Deenar Toraskar 
wrote:

> Sri
>
> Look at the instructions here. They are for 1.5.1, but should also work
> for 1.6
>
>
> https://www.linkedin.com/pulse/running-spark-151-cdh-deenar-toraskar-cfa?trk=hp-feed-article-title-publish&trkSplashRedir=true&forceNoSplash=true
>
> Deenar
>
>
> On 27 January 2016 at 20:16, Koert Kuipers  wrote:
>
>> you need to build spark 1.6 for your hadoop distro, and put that on the
>> proxy node and configure it correctly to find your cluster (hdfs and yarn).
>> then use the spark-submit script for that spark 1.6 version to launch your
>> application on yarn
>>
>> On Wed, Jan 27, 2016 at 3:11 PM, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi Koert,
>>>
>>> I am submitting my code (spark jar ) using spark-submit in proxy node ,
>>> I checked the version of the cluster and node its says 1.2 I dint really
>>> understand what you mean.
>>>
>>> can I ask yarn to use different version of spark ? or should I say
>>> override the spark_home variables to look at 1.6 spark jar ?
>>>
>>> Thanks
>>> Sri
>>>
>>> On Wed, Jan 27, 2016 at 7:45 PM, Koert Kuipers 
>>> wrote:
>>>
>>>> If you have yarn you can just launch your spark 1.6 job from a single
>>>> machine with spark 1.6 available on it and ignore the version of spark
>>>> (1.2) that is installed
>>>> On Jan 27, 2016 11:29, "kali.tumm...@gmail.com" 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Just realized cloudera version of spark on my cluster is 1.2, the jar
>>>>> which
>>>>> I built using maven is version 1.6 which is causing issue.
>>>>>
>>>>> Is there a way to run spark version 1.6 in 1.2 version of spark ?
>>>>>
>>>>> Thanks
>>>>> Sri
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>>
>


-- 
Thanks & Regards
Sri Tummala


Re: how to run latest version of spark in old version of spark in cloudera cluster ?

2016-01-27 Thread sri hari kali charan Tummala
Hi Koert,

I am submitting my code (spark jar ) using spark-submit in proxy node , I
checked the version of the cluster and node its says 1.2 I dint really
understand what you mean.

can I ask yarn to use different version of spark ? or should I say override
the spark_home variables to look at 1.6 spark jar ?

Thanks
Sri

On Wed, Jan 27, 2016 at 7:45 PM, Koert Kuipers  wrote:

> If you have yarn you can just launch your spark 1.6 job from a single
> machine with spark 1.6 available on it and ignore the version of spark
> (1.2) that is installed
> On Jan 27, 2016 11:29, "kali.tumm...@gmail.com" 
> wrote:
>
>> Hi All,
>>
>> Just realized cloudera version of spark on my cluster is 1.2, the jar
>> which
>> I built using maven is version 1.6 which is causing issue.
>>
>> Is there a way to run spark version 1.6 in 1.2 version of spark ?
>>
>> Thanks
>> Sri
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
Thanks & Regards
Sri Tummala


Re: how to turn off spark streaming gracefully ?

2015-12-18 Thread sri hari kali charan Tummala
Hi Cody,

KafkaUtils.createRDD totally make sense now I can run my spark job once in
15 minutes extract data out of kafka and stop ..., I rely on kafka offset
for Incremental data am I right ? so no duplicate data will be returned.


Thanks
Sri





On Fri, Dec 18, 2015 at 2:41 PM, Cody Koeninger  wrote:

> If you're really doing a daily batch job, have you considered just using
> KafkaUtils.createRDD rather than a streaming job?
>
> On Fri, Dec 18, 2015 at 5:04 AM, kali.tumm...@gmail.com <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>>
>> Imagine I have a Production spark streaming kafka (direct connection)
>> subscriber and publisher jobs running which publish and subscriber
>> (receive)
>> data from a kafka topic and I save one day's worth of data using
>> dstream.slice to Cassandra daily table (so I create daily table before
>> running spark streaming job).
>>
>> My question if all the above code runs in some scheduler like autosys how
>> should I say to spark publisher to stop publishing as it is End of day and
>> to spark subscriber to stop receiving to stop receiving without killing
>> the
>> jobs ? if I kill my autosys scheduler turns red saying the job had failed
>> etc ...
>> Is there a way to stop both subscriber and publisher with out killing or
>> terminating the code.
>>
>> Thanks
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-turn-off-spark-streaming-gracefully-tp25734.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Thanks & Regards
Sri Tummala


Re: Release data for spark 1.6?

2015-12-12 Thread sri hari kali charan Tummala
thanks Sean and Ted, I will wait for 1.6 to be out.

Happy Christmas to all !

Thanks
Sri

On Sat, Dec 12, 2015 at 12:18 PM, Ted Yu  wrote:

> Please take a look at SPARK-9078 which allows jdbc dialects to override
> the query for checking table existence.
>
> On Dec 12, 2015, at 7:12 PM, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
> Hi Michael, Ted,
>
>
> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48
>
> In Present spark version in line 48 there is a bug, to check whether table
> exists in a database using limit doesnt work for all databases sql server
> for example.
>
> best way to check whehter table exists in any database is to use, select *
> from table where 1=2;  or select 1 from table where 1=2; this supports all
> the databases.
>
> In spark 1.6 can this change be implemented, this lets
>  write.mode("append") bug to go away.
>
>
>
> def tableExists(conn: Connection, table: String): Boolean = { // Somewhat
> hacky, but there isn't a good way to identify whether a table exists for all 
> //
> SQL database systems, considering "table" could also include the database
> name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1"
> ).executeQuery().next()).isSuccess }
>
> Solution:-
> def tableExists(conn: Connection, table: String): Boolean = { // Somewhat
> hacky, but there isn't a good way to identify whether a table exists for all 
> //
> SQL database systems, considering "table" could also include the database
> name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2"
> ).executeQuery().next()).isSuccess }
>
>
>
> Thanks
> Sri
>
>
>
> On Wed, Dec 9, 2015 at 10:30 PM, Michael Armbrust 
> wrote:
>
>> The release date is "as soon as possible".  In order to make an Apache
>> release we must present a release candidate and have 72-hours of voting by
>> the PMC.  As soon as there are no known bugs, the vote will pass and 1.6
>> will be released.
>>
>> In the mean time, I'd love support from the community testing the most
>> recent release candidate.
>>
>> On Wed, Dec 9, 2015 at 2:19 PM, Sri  wrote:
>>
>>> Hi Ted,
>>>
>>> Thanks for the info , but there is no particular release date from my
>>> understanding the package is in testing there is no release date mentioned.
>>>
>>> Thanks
>>> Sri
>>>
>>>
>>>
>>> Sent from my iPhone
>>>
>>> > On 9 Dec 2015, at 21:38, Ted Yu  wrote:
>>> >
>>> > See this thread:
>>> >
>>> >
>>> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC&subj=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+
>>> >
>>> >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" <
>>> kali.tumm...@gmail.com> wrote:
>>> >>
>>> >> Hi All,
>>> >>
>>> >> does anyone know exact release data for spark 1.6 ?
>>> >>
>>> >> Thanks
>>> >> Sri
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
>>> >> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com <http://nabble.com>.
>>> >>
>>> >> -
>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>> >>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


-- 
Thanks & Regards
Sri Tummala


Re: spark data frame write.mode("append") bug

2015-12-12 Thread sri hari kali charan Tummala
Hi All,

https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48

In Present spark version in line 48 there is a bug, to check whether table
exists in a database using limit doesnt work for all databases sql server
for example.

best way to check whehter table exists in any database is to use, select *
from table where 1=2;  or select 1 from table where 1=2; this supports all
the databases.

In spark 1.6 can this change be implemented, this lets
 write.mode("append") bug to go away.



def tableExists(conn: Connection, table: String): Boolean = {

// Somewhat hacky, but there isn't a good way to identify whether a
table exists for all
// SQL database systems, considering "table" could also include the
database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT
1").executeQuery().next()).isSuccess
  }

Solution:-
def tableExists(conn: Connection, table: String): Boolean = {

// Somewhat hacky, but there isn't a good way to identify whether a
table exists for all
// SQL database systems, considering "table" could also include the
database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table where
1=2").executeQuery().next()).isSuccess
  }



Thanks

On Wed, Dec 9, 2015 at 4:24 PM, Seongduk Cheon  wrote:

> Not for sure, but I think it is bug as of 1.5.
>
> Spark is using LIMIT keyword whether a table exists.
>
> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48
>
> If your database does not support LIMIT keyword such as SQL Server, spark
> try to create table
>
> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L272-L275
>
> This issue has already fixed and It will be released on 1.6
> https://issues.apache.org/jira/browse/SPARK-9078
>
>
> --
> Cheon
>
> 2015-12-09 22:54 GMT+09:00 kali.tumm...@gmail.com 
> :
>
>> Hi Spark Contributors,
>>
>> I am trying to append data  to target table using df.write.mode("append")
>> functionality but spark throwing up table already exists exception.
>>
>> Is there a fix scheduled in later spark release ?, I am using spark 1.5.
>>
>> val sourcedfmode=sourcedf.write.mode("append")
>> sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)
>>
>> Full Code:-
>>
>> https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala
>>
>> Spring Config File:-
>>
>> https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml
>>
>>
>> Thanks
>> Sri
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


-- 
Thanks & Regards
Sri Tummala


Re: Release data for spark 1.6?

2015-12-12 Thread sri hari kali charan Tummala
Hi Michael, Ted,

https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48

In Present spark version in line 48 there is a bug, to check whether table
exists in a database using limit doesnt work for all databases sql server
for example.

best way to check whehter table exists in any database is to use, select *
from table where 1=2;  or select 1 from table where 1=2; this supports all
the databases.

In spark 1.6 can this change be implemented, this lets
 write.mode("append") bug to go away.



def tableExists(conn: Connection, table: String): Boolean = { // Somewhat
hacky, but there isn't a good way to identify whether a table exists for all //
SQL database systems, considering "table" could also include the database
name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1"
).executeQuery().next()).isSuccess }

Solution:-
def tableExists(conn: Connection, table: String): Boolean = { // Somewhat
hacky, but there isn't a good way to identify whether a table exists for all //
SQL database systems, considering "table" could also include the database
name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2"
).executeQuery().next()).isSuccess }



Thanks
Sri



On Wed, Dec 9, 2015 at 10:30 PM, Michael Armbrust 
wrote:

> The release date is "as soon as possible".  In order to make an Apache
> release we must present a release candidate and have 72-hours of voting by
> the PMC.  As soon as there are no known bugs, the vote will pass and 1.6
> will be released.
>
> In the mean time, I'd love support from the community testing the most
> recent release candidate.
>
> On Wed, Dec 9, 2015 at 2:19 PM, Sri  wrote:
>
>> Hi Ted,
>>
>> Thanks for the info , but there is no particular release date from my
>> understanding the package is in testing there is no release date mentioned.
>>
>> Thanks
>> Sri
>>
>>
>>
>> Sent from my iPhone
>>
>> > On 9 Dec 2015, at 21:38, Ted Yu  wrote:
>> >
>> > See this thread:
>> >
>> >
>> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC&subj=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+
>> >
>> >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" <
>> kali.tumm...@gmail.com> wrote:
>> >>
>> >> Hi All,
>> >>
>> >> does anyone know exact release data for spark 1.6 ?
>> >>
>> >> Thanks
>> >> Sri
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Thanks & Regards
Sri Tummala


Re: spark sql current time stamp function ?

2015-12-07 Thread sri hari kali charan Tummala
Hi Ted,

Gave and exception am I following right approach ?

val test=sqlContext.sql("select *,  monotonicallyIncreasingId()  from kali")


On Mon, Dec 7, 2015 at 4:52 PM, Ted Yu  wrote:

> Have you tried using monotonicallyIncreasingId ?
>
> Cheers
>
> On Mon, Dec 7, 2015 at 7:56 AM, Sri  wrote:
>
>> Thanks , I found the right function current_timestamp().
>>
>> different Question:-
>> Is there a row_number() function in spark SQL ? Not in Data frame just
>> spark SQL?
>>
>>
>> Thanks
>> Sri
>>
>> Sent from my iPhone
>>
>> On 7 Dec 2015, at 15:49, Ted Yu  wrote:
>>
>> Does unix_timestamp() satisfy your needs ?
>> See sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
>>
>> On Mon, Dec 7, 2015 at 6:54 AM, kali.tumm...@gmail.com <
>> kali.tumm...@gmail.com> wrote:
>>
>>> I found a way out.
>>>
>>> import java.text.SimpleDateFormat
>>> import java.util.Date;
>>>
>>> val format = new SimpleDateFormat("-M-dd hh:mm:ss")
>>>
>>>  val testsql=sqlContext.sql("select
>>> column1,column2,column3,column4,column5
>>> ,'%s' as TIME_STAMP from TestTable limit 10".format(format.format(new
>>> Date(
>>>
>>>
>>> Thanks
>>> Sri
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620p25621.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


-- 
Thanks & Regards
Sri Tummala


Re: Creating new Spark context when running in Secure YARN fails

2015-11-20 Thread Hari Shreedharan
Can you try this: https://github.com/apache/spark/pull/9875 
<https://github.com/apache/spark/pull/9875>. I believe this patch should fix 
the issue here.

Thanks,
Hari Shreedharan




> On Nov 11, 2015, at 1:59 PM, Ted Yu  wrote:
> 
> Please take a look at 
> yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
>  where this config is described
> 
> Cheers
> 
> On Wed, Nov 11, 2015 at 1:45 PM, Michael V Le  <mailto:m...@us.ibm.com>> wrote:
> It looks like my config does not have "spark.yarn.credentials.file".
> 
> I executed:
> sc._conf.getAll()
> 
> [(u'spark.ssl.keyStore', u'xxx.keystore'), (u'spark.eventLog.enabled', 
> u'true'), (u'spark.ssl.keyStorePassword', u'XXX'), (u'spark.yarn.principal', 
> u'XXX'), (u'spark.master', u'yarn-client'), (u'spark.ssl.keyPassword', 
> u'XXX'), (u'spark.authenticate.sasl.serverAlwaysEncrypt', u'true'), 
> (u'spark.ssl.trustStorePassword', u'XXX'), (u'spark.ssl.protocol', 
> u'TLSv1.2'), (u'spark.authenticate.enableSaslEncryption', u'true'), 
> (u'spark.app.name <http://spark.app.name/>', u'PySparkShell'), 
> (u'spark.yarn.keytab', u'XXX.keytab'), (u'spark.yarn.historyServer.address', 
> u'xxx-001:18080'), (u'spark.rdd.compress', u'True'), (u'spark.eventLog.dir', 
> u'hdfs://xxx-001:9000/user/hadoop/sparklogs'), 
> (u'spark.ssl.enabledAlgorithms', 
> u'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA'), 
> (u'spark.serializer.objectStreamReset', u'100'), 
> (u'spark.history.fs.logDirectory', 
> u'hdfs://xxx-001:9000/user/hadoop/sparklogs'), (u'spark.yarn.isPython', 
> u'true'), (u'spark.submit.deployMode', u'client'), (u'spark.ssl.enabled', 
> u'true'), (u'spark.authenticate', u'true'), (u'spark.ssl.trustStore', 
> u'xxx.truststore')]
> 
> I am not really familiar with "spark.yarn.credentials.file" and had thought 
> it was created automatically after communicating with YARN to get tokens.
> 
> Thanks,
> Mike
> 
> 
> Ted Yu ---11/11/2015 03:35:41 PM---I assume your config contains 
> "spark.yarn.credentials.file" - otherwise startExecutorDelegationToken
> 
> From: Ted Yu mailto:yuzhih...@gmail.com>>
> To: Michael V Le/Watson/IBM@IBMUS
> Cc: user mailto:user@spark.apache.org>>
> Date: 11/11/2015 03:35 PM
> Subject: Re: Creating new Spark context when running in Secure YARN fails
> 
> 
> 
> 
> I assume your config contains "spark.yarn.credentials.file" - otherwise 
> startExecutorDelegationTokenRenewer(conf) call would be skipped.
> 
> On Wed, Nov 11, 2015 at 12:16 PM, Michael V Le  <mailto:m...@us.ibm.com>> wrote:
> Hi Ted,
> 
> Thanks for reply.
> 
> I tried your patch but am having the same problem.
> 
> I ran:
> 
> ./bin/pyspark --master yarn-client
> 
> >> sc.stop()
> >> sc = SparkContext()
> 
> Same error dump as below.
> 
> Do I need to pass something to the new sparkcontext ?
> 
> Thanks,
> Mike
> 
> Ted Yu ---11/11/2015 01:55:02 PM---Looks like the delegation 
> token should be renewed. Mind trying the following ?
> 
> From: Ted Yu mailto:yuzhih...@gmail.com>>
> To: Michael V Le/Watson/IBM@IBMUS
> Cc: user mailto:user@spark.apache.org>>
> Date: 11/11/2015 01:55 PM
> Subject: Re: Creating new Spark context when running in Secure YARN fails
> 
> 
> 
> 
> Looks like the delegation token should be renewed.
> 
> Mind trying the following ?
> 
> Thanks
> 
> diff --git 
> a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
>  b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerB
> index 20771f6..e3c4a5a 100644
> --- 
> a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
> +++ 
> b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
> @@ -53,6 +53,12 @@ private[spark] class YarnClientSchedulerBackend(
>  logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
>  val args = new ClientArguments(argsArrayBuf.toArray, conf)
>  totalExpectedExecutors = args.numExecutors
> +// SPARK-8851: In yarn-client mode, the AM still does the credentials 
> refresh. The driver
> +// re

Re: Pass spark partition explicitly ?

2015-10-18 Thread sri hari kali charan Tummala
Hi Richard,

Thanks so my take from your discussion is we want pass explicitly partition
values it have to be written inside the code.

Thanks
Sri

On Sun, Oct 18, 2015 at 7:05 PM, Richard Eggert 
wrote:

> If you want to override the default partitioning behavior,  you have to do
> so in your code where you create each RDD. Different RDDs usually have
> different numbers of partitions (except when one RDD is directly derived
> from another without shuffling) because they usually have different sizes,
> so it wouldn't make sense to have some sort of "global" notion of how many
> partitions to create.  You could,  if you wanted,  pass partition counts in
> as command line options to your application and use those values in your
> code that creates the RDDs, of course.
>
> Rich
> On Oct 18, 2015 1:57 PM, "kali.tumm...@gmail.com" 
> wrote:
>
>> Hi All,
>>
>> can I pass number of partitions to all the RDD explicitly while submitting
>> the spark Job or di=o I need to mention in my spark code itself ?
>>
>> Thanks
>> Sri
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Pass-spark-partition-explicitly-tp25113.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
Thanks & Regards
Sri Tummala


Re: Monitoring tools for spark streaming

2015-09-28 Thread Hari Shreedharan
+1. The Streaming UI should give you more than enough information.




Thanks, Hari

On Mon, Sep 28, 2015 at 9:55 PM, Shixiong Zhu  wrote:

> Which version are you using? Could you take a look at the new Streaming UI
> in 1.4.0?
> Best Regards,
> Shixiong Zhu
> 2015-09-29 7:52 GMT+08:00 Siva :
>> Hi,
>>
>> Could someone recommend the monitoring tools for spark streaming?
>>
>> By extending StreamingListener we can dump the delay in processing of
>> batches and some alert messages.
>>
>> But are there any Web UI tools where we can monitor failures, see delays
>> in processing, error messages and setup alerts etc.
>>
>> Thanks
>>
>>

Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Hari Shreedharan
e:/Users/abc/github/spark/python/lib/pyspark.zip -> hdfs://
>>>> 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip
>>>>
>>>> On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V >>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a cluster of 1 master and 2 slaves. I'm running a spark
>>>>> streaming in master and I want to utilize all nodes in my cluster. i had
>>>>> specified some parameters like driver memory and executor memory in my
>>>>> code. when i give --deploy-mode cluster --master yarn-cluster in my
>>>>> spark-submit, it gives the following error.
>>>>>
>>>>> Log link : *http://pastebin.com/kfyVWDGR
>>>>> <http://pastebin.com/kfyVWDGR>*
>>>>>
>>>>> How to fix this issue ? Please help me if i'm doing wrong.
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> Ramkumar V
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>

-- 

Thanks,
Hari


Seeing message about receiver not being de-registered on invoking Streaming context stop

2015-04-06 Thread Hari Polisetty

 My application is running Spark in local mode and  I have a Spark Streaming 
Listener as well as a Custom Receiver. When the receiver is done fetching all 
documents, it invokes “stop” on itself.
I see the StreamingListener  getting a callback on “onReceiverStopped” where I 
stop the streaming context.

However, I see the following message in my logs:

2015-04-06 16:41:51,193 WARN [Thread-66] 
com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop 
- Stopped receiver
2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] 
org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: 
AlHURLEY
2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] 
org.apache.spark.Logging$class.logWarning - Stopped executor without error
2015-04-06 16:41:51,203 WARN [StreamingListenerBus] 
org.apache.spark.Logging$class.logWarning - All of the receivers have not 
deregistered, Map(0 -> 
ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY))

What am I missing or doing wrong?

Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
Thanks. I’ll look into it. But the JSON string I push via receiver goes through 
a series of transformations, before it ends up in the final RDD. I need to take 
care to ensure that this magic value propagates all the way down to the last 
one that I’m iterating on.

Currently, I’m calling “stop" from the receiver once its done fetching all the 
records and have a StreamingListener to act on  it via the “onReceiverStopped” 
hook through which I’m stopping the streamingContext and it seems to be working 
except that I see this message "2015-04-06 16:41:48,002 WARN 
[StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the 
receivers have not deregistered, Map(0 -> 
ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,XYZ,)):

Is this not advised? BTW I’m running in local mode.
 

> On Apr 7, 2015, at 1:43 AM, Michael Malak  <mailto:michaelma...@yahoo.com>> wrote:
> 
> You could have your receiver send a "magic value" when it is done. I discuss 
> this Spark Streaming pattern in my presentation "Spark Gotchas and 
> Anti-Patterns". In the PDF version, it's slides 34-36.
> http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language
>  
> <http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language>
> 
> YouTube version cued to that place: 
> http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s 
> <http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s> 
>  
> 
> From: Hari Polisetty mailto:hpoli...@icloud.com>>
> To: Tathagata Das mailto:t...@databricks.com>> 
> Cc: user mailto:user@spark.apache.org>> 
> Sent: Monday, April 6, 2015 2:02 PM
> Subject: Re: How to restrict foreach on a streaming RDD only once upon 
> receiver completion
> 
> Yes, I’m using updateStateByKey and it works. But then I need to perform 
> further computation on this Stateful RDD (see code snippet below). I perform 
> forEach on the final RDD and get the top 10 records. I just don’t want the 
> foreach to be performed every time a new batch is received. Only when the 
> receiver is done fetching all the records.
> 
> My requirements are to programmatically invoke the E.S query (it varies by 
> usecase) , get all the records and apply certain transformations and get the 
> top 10 results based on certain criteria back into the driver program for 
> further processing. I’m able to apply the transformations on the batches of 
> records fetched from E.S  using streaming. So, I don’t need to wait for all 
> the records to be fetched. The RDD transformations are happening all the time 
> and the top k results are getting updated constantly until all the records 
> are fetched by the receiver. Is there any drawback with this approach?
> 
> Can you give more pointers on what you mean by creating a custom RDD that 
> reads from ElasticSearch? 
> 
> Here is the relevant portion of my Spark streaming code:
> 
>   //Create a custom streaming receiver to query for relevant data 
> from E.S
>   JavaReceiverInputDStream jsonStrings = 
> ssc.receiverStream(
>   new ElasticSearchResponseReceiver(query…….));
> 
>   //Apply JSON Paths to extract specific value(s) from each record
>   JavaDStream fieldVariations = jsonStrings.flatMap(new 
> FlatMapFunction() {
>   private static final long serialVersionUID = 
> 465237345751948L;
> 
>   @Override
>   public Iterable call(String jsonString) {
>   List r = JsonPath.read(jsonString,
>   attributeDetail.getJsonPath());
>   return r;
>   }
> 
>   });
> 
>   //Perform a stateful map reduce on each variation
>   JavaPairDStream fieldVariationCounts = 
> fieldVariations.mapToPair(
>   new PairFunction() {
>   private static final long 
> serialVersionUID = -1241276515559408238L;
> 
>   @Override public Tuple2 Integer> call(String s) {
>   return new Tuple2 Integer>(s, 1);
>   }
>   }).updateStateByKey(new Function2,
>   Optional, 
> Optional> () {
>   private static final long 
> serialVersionUID = 7598681835161199865L;
> 
>   public Optional 
> c

Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
alVersionUID = 2186144129973051920L;

public Void call(JavaPairRDD rdd) {
resultList.clear();
for (Tuple2 t: 
rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) {
resultList.add(new 
Tuple3(t._2(), t._1(), (double) 
(100*t._1())/totalProcessed.value()));
}
return null;
}
}
);  

> On Apr 7, 2015, at 1:14 AM, Tathagata Das  wrote:
> 
> So you want to sort based on the total count of the all the records received 
> through receiver? In that case, you have to combine all the counts using 
> updateStateByKey 
> (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
>  
> <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala>)
>  
> But stepping back, if you want to get the final results at the end of the 
> receiving all the data (as opposed to continuously), why are you even using 
> streaming? You could create a custom RDD that reads from ElasticSearch and 
> then use it in a Spark program. I think that's more natural as your 
> application is more batch-like than streaming-like as you are using the 
> results in real-time.
> 
> TD
> 
> On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty  <mailto:hpoli...@icloud.com>> wrote:
> I have created a Custom Receiver to fetch records pertaining to a specific 
> query from Elastic Search and have implemented Streaming RDD transformations 
> to process the data generated by the receiver.
> 
> The final RDD is a sorted list of name value pairs and I want to read the top 
> 20 results programmatically rather than write to an external file.
> I use "foreach" on the RDD and take the top 20 values into a list. I see that 
> forEach is processed every time there is a new microbatch from the receiver.
> 
> However, I want the foreach computation to be done only once when the 
> receiver has finished fetching all the records from Elastic Search and before 
> the streaming context is killed so that I can populate the results into a 
> list and process it in my driver program.
> 
> Appreciate any guidance in this regard.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
I have created a Custom Receiver to fetch records pertaining to a specific 
query from Elastic Search and have implemented Streaming RDD transformations to 
process the data generated by the receiver. 

The final RDD is a sorted list of name value pairs and I want to read the top 
20 results programmatically rather than write to an external file.
I use "foreach" on the RDD and take the top 20 values into a list. I see that 
forEach is processed every time there is a new microbatch from the receiver.

However, I want the foreach computation to be done only once when the receiver 
has finished fetching all the records from Elastic Search and before the 
streaming context is killed so that I can populate the results into a list and 
process it in my driver program. 

Appreciate any guidance in this regard.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Flume or Kafka?

2014-11-19 Thread Hari Shreedharan
Btw, if you want to write to Spark Streaming from Flume -- there is a sink
(it is a part of Spark, not Flume). See Approach 2 here:
http://spark.apache.org/docs/latest/streaming-flume-integration.html



On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan <
hshreedha...@cloudera.com> wrote:

> As of now, you can feed Spark Streaming from both kafka and flume.
> Currently though there is no API to write data back to either of the two
> directly.
>
> I sent a PR which should eventually add something like this:
> https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala
> that would allow Spark Streaming to write back to Kafka. This will likely
> be reviewed and committed after 1.2.
>
> I would consider writing something similar to push data to Flume as well,
> if there is a sufficient use-case for it. I have seen people talk about
> writing back to kafka quite a bit - hence the above patch.
>
> Which one is better is upto your use-case and existing infrastructure and
> preference. Both would work as is, but writing back to Flume would usually
> be if you want to write to HDFS/HBase/Solr etc -- which you could write
> back directly from Spark Streaming (of course, there are benefits of
> writing back using Flume like the additional buffering etc Flume gives),
> but it is still possible to do so from Spark Streaming itself.
>
> But for Kafka, the usual use-case is a variety of custom applications
> reading the same data -- for which it makes a whole lot of sense to write
> back to Kafka. An example is to sanitize incoming data in Spark Streaming
> (from Flume or Kafka or something else) and make it available for a variety
> of apps via Kafka.
>
> Hope this helps!
>
> Hari
>
>
> On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz 
> wrote:
>
>> Hi,
>>
>> I'm starting with Spark and I just trying to understand if I want to
>> use Spark Streaming, should I use to feed it Flume or Kafka? I think
>> there's not a official Sink for Flume to Spark Streaming and it seems
>> that Kafka it fits better since gives you readibility.
>>
>> Could someone give a good scenario for each alternative? When would it
>> make sense to use Kafka and when Flume for Spark Streaming?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming with Flume or Kafka?

2014-11-19 Thread Hari Shreedharan
As of now, you can feed Spark Streaming from both kafka and flume.
Currently though there is no API to write data back to either of the two
directly.

I sent a PR which should eventually add something like this:
https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala
that would allow Spark Streaming to write back to Kafka. This will likely
be reviewed and committed after 1.2.

I would consider writing something similar to push data to Flume as well,
if there is a sufficient use-case for it. I have seen people talk about
writing back to kafka quite a bit - hence the above patch.

Which one is better is upto your use-case and existing infrastructure and
preference. Both would work as is, but writing back to Flume would usually
be if you want to write to HDFS/HBase/Solr etc -- which you could write
back directly from Spark Streaming (of course, there are benefits of
writing back using Flume like the additional buffering etc Flume gives),
but it is still possible to do so from Spark Streaming itself.

But for Kafka, the usual use-case is a variety of custom applications
reading the same data -- for which it makes a whole lot of sense to write
back to Kafka. An example is to sanitize incoming data in Spark Streaming
(from Flume or Kafka or something else) and make it available for a variety
of apps via Kafka.

Hope this helps!

Hari


On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz 
wrote:

> Hi,
>
> I'm starting with Spark and I just trying to understand if I want to
> use Spark Streaming, should I use to feed it Flume or Kafka? I think
> there's not a official Sink for Flume to Spark Streaming and it seems
> that Kafka it fits better since gives you readibility.
>
> Could someone give a good scenario for each alternative? When would it
> make sense to use Kafka and when Flume for Spark Streaming?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark and Scala

2014-09-12 Thread Hari Shreedharan
No, Scala primitives remain primitives. Unless you create an RDD using one
of the many methods - you would not be able to access any of the RDD
methods. There is no automatic porting. Spark is an application as far as
scala is concerned - there is no compilation (except of course, the scala,
JIT compilation etc).

On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan 
wrote:

> I know that unpersist is a method on RDD.
> But my confusion is that, when we port our Scala programs to Spark,
> doesn't everything change to RDDs?
>
> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> unpersist is a method on RDDs. RDDs are abstractions introduced by Spark.
>>
>> An Int is just a Scala Int. You can't call unpersist on Int in Scala, and
>> that doesn't change in Spark.
>>
>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan > > wrote:
>>
>>> There is one thing that I am confused about.
>>> Spark has codes that have been implemented in Scala. Now, can we run any
>>> Scala code on the Spark framework? What will be the difference in the
>>> execution of the scala code in normal systems and on Spark?
>>> The reason for my question is the following:
>>> I had a variable
>>> *val temp = *
>>> This temp was being created inside the loop, so as to manually throw it
>>> out of the cache, every time the loop ends I was calling
>>> *temp.unpersist()*, this was returning an error saying that *value
>>> unpersist is not a method of Int*, which means that temp is an Int.
>>> Can some one explain to me why I was not able to call *unpersist* on
>>> *temp*?
>>>
>>> Thank You
>>>
>>
>>
>


Re: Spark Streaming on Yarn Input from Flume

2014-08-07 Thread Hari Shreedharan
Do you see anything suspicious in the logs? How did you run the application?


On Thu, Aug 7, 2014 at 10:02 PM, XiaoQinyu 
wrote:

> Hi~
>
> I run a spark streaming app to receive data from flume event.When I run on
> standalone,Spark Streaming can receive the Flume event normally .But if I
> run this app on yarn,no matter on yarn-client or yarn-cluster. This app can
> not receive data from flume,and I check the net stat,I find the port which
> connect to flume,didn't be listened.
>
> I am not sure,if spark streaming run on Yarn,will affect connect to flume.
>
> And then I run the FlumeEventCount  example, when I run this example on
> Yarn, it also can not receive data from flume.
>
> And  I will be very pleasure if some one can help me.
>
>
> XiaoQinyu
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-Yarn-Input-from-Flume-tp11755.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: store spark streaming dstream in hdfs or cassandra

2014-07-31 Thread Hari Shreedharan
Off the top of my head, you can use the ForEachDStream to which you pass 
in the code that writes to Hadoop, and then register that as an output 
stream, so the function you pass in is periodically executed and causes 
the data to be written to HDFS. If you are ok with the data being in 
text format - simply use saveAsTextFiles method in the RDD class.




salemi wrote:


Hi,

I was wondering what is the best way to store off dstreams in hdfs or
casandra.
Could somebody provide an example?

Thanks,
Ali



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/store-spark-streaming-dstream-in-hdfs-or-cassandra-tp11064.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread Hari Shreedharan

Hi,

Deploying spark with Flume is pretty simple. What you'd need to do is:

1. Start your spark Flume DStream Receiver on some machine using one of 
the FlumeUtils.createStream methods - where you need to specify the 
hostname and port of the worker node on which you want the spark 
executor to run - say a.b.c.d: 4585. This is where Spark will receive 
the data from Flume.


2. Once you application has started, start the flume agent(s) which are 
going to be sending the data, with Avro sinks with hostname set to: 
a.b.c.d and port set to 4585.


And you are done!

Tathagata Das wrote:


Hari, can you help?

TD

On Tue, Jul 29, 2014 at 12:13 PM, dapooley  wrote:


Hi,

I am trying to integrate Spark onto a Flume log sink and avro source. The
sink is on one machine (the application), and the source is on 
another. Log
events are being sent from the application server to the avro source 
server

(a log directory sink on the arvo source prints to verify)

The aim is to get Spark to also receive the same events that the avro 
source

is getting. The steps, I believe, are:

1. install/start Spark master (on avro source machine).
2. write spark application, deploy (on avro source machine).
3. add spark application as a worker to the master.
4. have spark application configured to same port as avro source

Test setup is using 2 ubuntu VMs on a Windows host.

Flume configuration:

# application ##
## Tail application log file
# /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f
conf/flume-conf.properties
# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = tomcat
source_agent.sources.tomcat.type = exec
source_agent.sources.tomcat.command = tail -F
/var/lib/tomcat/logs/application.log
source_agent.sources.tomcat.batchSize = 1
source_agent.sources.tomcat.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.0.2.2
source_agent.sinks.avro_sink.port = 41414


 avro source ##
## Receive Flume events for Spark streaming

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
agent1.channels = memoryChannel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100

## Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-source
agent1.sources = avroSource
agent1.sources.avroSource.type = avro
agent1.sources.avroSource.channels = memoryChannel
agent1.sources.avroSource.bind = 0.0.0.0
agent1.sources.avroSource.port = 41414

#Sinks
agent1.sinks = localout

#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
agent1.sinks.localout.type = file_roll
agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs
agent1.sinks.localout.sink.rollInterval = 0
agent1.sinks.localout.channel = memoryChannel

thank you in advance for any assistance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Tathagata Das <mailto:tathagata.das1...@gmail.com>
July 29, 2014 at 1:52 PM
Hari, can you help?

TD


--

Thanks,
Hari


Benchmarking Graphx

2014-05-17 Thread Hari
HI, I want to do some benchmarking tests (run-time and memory) for one of
GraphX examples, lets say PageRank on my single processor PC to start with. 
a) Is there a way to get the total time taken for the execution from start
to finish? 
b) log4j properties need to be modified to turn off logging, but its not
clear how to. 
c) how can this be extended to a cluster?
d) also how to quantify memory overhead if i added more functionality to the
execution?
e) any scripts? reports generated?

i am new to GraphX and Clusters, any help would be appreciated?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Graphx-tp5965.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.