Spark Job Opportunities !
Hi Spark Community, I'm a Hands on Apache Flink/Spark Software Engineer with over 13+ of Data Engineering experience looking for job opportunities in India,USA (with h1b transfer ) or the UK (with Tier 2 sponsorship). Point me to right direction if you come across such opportunities. Thanks, Sri Tummala
Job Opportunities in India or UK with Tier 2 Sponsorship - Spark Expert
Hi Spark Community, I'm a seasoned Data Engineering professional with 13+ years of experience and expertise in Apache Spark, particularly in Structured Streaming. I'm looking for job opportunities in India or the UK that offer Tier 2 sponsorship. If anyone knows of openings or can connect me with potential employers, please reach out. Thanks, Sri Tummala
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Dear Mich, Thank you for your detailed response and the suggested approach to handling retry logic. I appreciate you taking the time to outline the method of embedding custom retry mechanisms directly into the application code. While the solution of wrapping the main logic of the Spark job in a loop for controlling the number of retries is technically sound and offers a workaround, it may not be the most efficient or maintainable solution for organizations running a large number of Spark jobs. Modifying each application to include custom retry logic can be a significant undertaking, introducing variability in how retries are handled across different jobs, and require additional testing and maintenance. Ideally, operational concerns like retry behavior in response to infrastructure failures should be decoupled from the business logic of Spark applications. This separation allows data engineers and scientists to focus on the application logic without needing to implement and test infrastructure resilience mechanisms. Thank you again for your time and assistance. Best regards, Sri Potluri On Mon, Feb 19, 2024 at 5:03 PM Mich Talebzadeh wrote: > Went through your issue with the code running on k8s > > When an executor of a Spark application fails, the system attempts to > maintain the desired level of parallelism by automatically recreating a new > executor to replace the failed one. While this behavior is beneficial for > transient errors, ensuring that the application continues to run, it > becomes problematic in cases where the failure is due to a persistent issue > (such as misconfiguration, inaccessible external resources, or incompatible > environment settings). In such scenarios, the application enters a loop, > continuously trying to recreate executors, which leads to resource wastage > and complicates application management. > > Well fault tolerance is built especially in k8s cluster. You can implement > your > own logic to control the retry attempts. You can do this by wrapping the > main logic of your Spark job in a loop and controlling the number of > retries. If a persistent issue is detected, you can choose to stop the job. > Today is the third time that looping control has come up :) > > Take this code > > import time > max_retries = 5 retries = 0 while retries < max_retries: try: # Your Spark > job logic here except Exception as e: # Log the exception print(f"Exception > in Spark job: {str(e)}") # Increment the retry count retries += 1 # Sleep > time.sleep(60) else: # Break out of the loop if the job completes > successfully break > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh > wrote: > >> Not that I am aware of any configuration parameter in Spark classic to >> limit executor creation. Because of fault tolerance Spark will try to >> recreate failed executors. Not really that familiar with the Spark operator >> for k8s. There may be something there. >> >> Have you considered custom monitoring and handling within Spark itself >> using max_retries = 5 etc? >> >> HTH >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Mon, 19 Feb 2024 at 18:34, Sri Potluri wrote: >> >>> Hello Spark Community, >>> >>> I am currently leveraging Spark on Kubernetes, managed by the Spark >>> Operator, for running various Spar
[Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Hello Spark Community, I am currently leveraging Spark on Kubernetes, managed by the Spark Operator, for running various Spark applications. While the system generally works well, I've encountered a challenge related to how Spark applications handle executor failures, specifically in scenarios where executors enter an error state due to persistent issues. *Problem Description* When an executor of a Spark application fails, the system attempts to maintain the desired level of parallelism by automatically recreating a new executor to replace the failed one. While this behavior is beneficial for transient errors, ensuring that the application continues to run, it becomes problematic in cases where the failure is due to a persistent issue (such as misconfiguration, inaccessible external resources, or incompatible environment settings). In such scenarios, the application enters a loop, continuously trying to recreate executors, which leads to resource wastage and complicates application management. *Desired Behavior* Ideally, I would like to have a mechanism to limit the number of retries for executor recreation. If the system fails to successfully create an executor more than a specified number of times (e.g., 5 attempts), the entire Spark application should fail and stop trying to recreate the executor. This behavior would help in efficiently managing resources and avoiding prolonged failure states. *Questions for the Community* 1. Is there an existing configuration or method within Spark or the Spark Operator to limit executor recreation attempts and fail the job after reaching a threshold? 2. Has anyone else encountered similar challenges and found workarounds or solutions that could be applied in this context? *Additional Context* I have explored Spark's task and stage retry configurations (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these do not directly address the issue of limiting executor creation retries. Implementing a custom monitoring solution to track executor failures and manually stop the application is a potential workaround, but it would be preferable to have a more integrated solution. I appreciate any guidance, insights, or feedback you can provide on this matter. Thank you for your time and support. Best regards, Sri P
India Scala & Big Data Job Referral
Hi Community, I was laid off from Apple in February 2023, which led to my relocation from the USA due to immigration issues related to my H1B visa. I have over 12 years of experience as a consultant in Big Data, Spark, Scala, Python, and Flink. Despite my move to India, I haven't secured a job yet. I am seeking referrals within product firms (preferably non-consulting) in India that work with Flink, Spark, Scala, Big Data, or in the fields of ML & AI. Can someone assist me with this? Thanks Sri
Spark Scala Contract Opportunity @USA
Hi All, Is anyone looking for a spark scala contract role inside the USA? A company called Maxonic has an open spark scala contract position (100% remote) inside the USA if anyone is interested, please send your CV to kali.tumm...@gmail.com. Thanks & Regards Sri Tummala
Big Data Contract Roles ?
Hi Flink Users/ Spark Users, Is anyone hiring contract corp to corp big Data spark scala or Flink scala roles ? Thanks Sri
unsubscribe
Sent from Mail for Windows 10
Re: sql to spark scala rdd
Make sense thanks. Thanks Sri Sent from my iPhone > On 2 Aug 2016, at 03:27, Jacek Laskowski wrote: > > Congrats! > > Whenever I was doing foreach(println) in the past I'm .toDF.show these > days. Give it a shot and you'll experience the feeling yourself! :) > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Tue, Aug 2, 2016 at 4:25 AM, sri hari kali charan Tummala > wrote: >> Hi All, >> >> Below code calculates cumulative sum (running sum) and moving average using >> scala RDD type of programming, I was using wrong function which is sliding >> use scalleft instead. >> >> >> sc.textFile("C:\\Users\\kalit_000\\Desktop\\Hadoop_IMP_DOC\\spark\\data.txt") >> .map(x => x.split("\\~")) >> .map(x => (x(0), x(1), x(2).toDouble)) >> .groupBy(_._1) >> .mapValues{(x => x.toList.sortBy(_._2).zip(Stream from >> 1).scanLeft(("","",0.0,0.0,0.0,0.0)) >> { (a,b) => (b._1._1,b._1._2,b._1._3,(b._1._3.toDouble + >> a._3.toDouble),(b._1._3.toDouble + a._3.toDouble)/b._2,b._2)}.tail)} >> .flatMapValues(x => x.sortBy(_._1)) >> .foreach(println) >> >> Input Data:- >> >> Headers:- >> Key,Date,balance >> >> 786~20160710~234 >> 786~20160709~-128 >> 786~20160711~-457 >> 987~20160812~456 >> 987~20160812~567 >> >> Output Data:- >> >> Column Headers:- >> key, (key,Date,balance , daily balance, running average , row_number based >> on key) >> >> (786,(786,20160709,-128.0,-128.0,-128.0,1.0)) >> (786,(786,20160710,234.0,106.0,53.0,2.0)) >> (786,(786,20160711,-457.0,-223.0,-74.33,3.0)) >> >> (987,(987,20160812,567.0,1023.0,511.5,2.0)) >> (987,(987,20160812,456.0,456.0,456.0,1.0)) >> >> Reference:- >> >> https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/ >> >> >> Thanks >> Sri >> >> >>> On Mon, Aug 1, 2016 at 12:07 AM, Sri wrote: >>> >>> Hi , >>> >>> I solved it using spark SQL which uses similar window functions mentioned >>> below , for my own knowledge I am trying to solve using Scala RDD which I am >>> unable to. >>> What function in Scala supports window function like SQL unbounded >>> preceding and current row ? Is it sliding ? >>> >>> >>> Thanks >>> Sri >>> >>> Sent from my iPhone >>> >>> On 31 Jul 2016, at 23:16, Mich Talebzadeh >>> wrote: >>> >>> hi >>> >>> You mentioned: >>> >>> I already solved it using DF and spark sql ... >>> >>> Are you referring to this code which is a classic analytics: >>> >>> SELECT DATE,balance, >>> >>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>> >>> AND >>> >>> CURRENT ROW) daily_balance >>> >>> FROM table >>> >>> >>> So how did you solve it using DF in the first place? >>> >>> >>> HTH >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>> loss, damage or destruction of data or any other property which may arise >>> from relying on this email's technical content is explicitly disclaimed. The >>> author will in no case be liable for any monetary damages arising from such >>> loss, damage or destruction. >>> >>> >>> >>> >>>> On 1 August 2016 at 07:04, Sri wrote: >>>> >>>> Hi , >>>> >>>> Just wondering how spark SQL works behind the scenes does it not convert >>>> SQL to some Scala RDD ? Or Scala ? >>>> >>>> How to write below SQL in Scala or Scala RDD >>>> >>>> SELECT DATE,balance, >>>> >>>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>>> >>>> AND >>>> >>>> CURRENT ROW) daily_balance >>>>
Re: sql to spark scala rdd
Hi All, Below code calculates cumulative sum (running sum) and moving average using scala RDD type of programming, I was using wrong function which is sliding use scalleft instead. sc.textFile("C:\\Users\\kalit_000\\Desktop\\Hadoop_IMP_DOC\\spark\\data.txt") .map(x => x.split("\\~")) .map(x => (x(0), x(1), x(2).toDouble)) .groupBy(_._1) .mapValues{(x => x.toList.sortBy(_._2).zip(Stream from 1).scanLeft(("","",0.0,0.0,0.0,0.0)) { (a,b) => (b._1._1,b._1._2,b._1._3,(b._1._3.toDouble + a._3.toDouble),(b._1._3.toDouble + a._3.toDouble)/b._2,b._2)}.tail)} .flatMapValues(x => x.sortBy(_._1)) .foreach(println) Input Data:- Headers:- Key,Date,balance 786~20160710~234 786~20160709~-128 786~20160711~-457 987~20160812~456 987~20160812~567 Output Data:- Column Headers:- key, (key,Date,balance , daily balance, running average , row_number based on key) (786,(786,20160709,-128.0,-128.0,-128.0,1.0)) (786,(786,20160710,234.0,106.0,53.0,2.0)) (786,(786,20160711,-457.0,-223.0,-74.33,3.0)) (987,(987,20160812,567.0,1023.0,511.5,2.0)) (987,(987,20160812,456.0,456.0,456.0,1.0)) Reference:- https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/ Thanks Sri On Mon, Aug 1, 2016 at 12:07 AM, Sri wrote: > Hi , > > I solved it using spark SQL which uses similar window functions mentioned > below , for my own knowledge I am trying to solve using Scala RDD which I > am unable to. > What function in Scala supports window function like SQL unbounded > preceding and current row ? Is it sliding ? > > > Thanks > Sri > > Sent from my iPhone > > On 31 Jul 2016, at 23:16, Mich Talebzadeh > wrote: > > hi > > You mentioned: > > I already solved it using DF and spark sql ... > > Are you referring to this code which is a classic analytics: > > SELECT DATE,balance, > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING > > AND > > CURRENT ROW) daily_balance > > FROM table > > > So how did you solve it using DF in the first place? > > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 1 August 2016 at 07:04, Sri wrote: > >> Hi , >> >> Just wondering how spark SQL works behind the scenes does it not convert >> SQL to some Scala RDD ? Or Scala ? >> >> How to write below SQL in Scala or Scala RDD >> >> SELECT DATE,balance, >> >> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >> >> AND >> >> CURRENT ROW) daily_balance >> >> FROM table >> >> >> Thanks >> Sri >> Sent from my iPhone >> >> On 31 Jul 2016, at 13:21, Jacek Laskowski wrote: >> >> Hi, >> >> Impossible - see >> >> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr] >> . >> >> I tried to show you why you ended up with "non-empty iterator" after >> println. You should really start with >> http://www.scala-lang.org/documentation/ >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala >> wrote: >> >> Tuple >> >> >> [Lscala.Tuple2;@65e4cb84 >> >> >> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski wrote: >> >> >> Hi, >> >> >> What's the result type of sliding(2,1)? >> >> >> Pozdrawiam, >> >> Jacek Laskowski >> >> >> >> https://medium.com/@jaceklaskowski/ >> >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> >> Follow me at https://twitter.com/jaceklaskowski >> >> >> >> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala >> >> wrote: >> >> tried this no luck, wht is non-empty iterator here ? >> &
Re: sql to spark scala rdd
Hi , I solved it using spark SQL which uses similar window functions mentioned below , for my own knowledge I am trying to solve using Scala RDD which I am unable to. What function in Scala supports window function like SQL unbounded preceding and current row ? Is it sliding ? Thanks Sri Sent from my iPhone > On 31 Jul 2016, at 23:16, Mich Talebzadeh wrote: > > hi > > You mentioned: > > I already solved it using DF and spark sql ... > > Are you referring to this code which is a classic analytics: > > >>>>>>>>> SELECT DATE,balance, >>>>>>>>> >>>>>>>>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>>>>>>>> AND >>>>>>>>> CURRENT ROW) daily_balance >>>>>>>>> FROM table > > So how did you solve it using DF in the first place? > > > HTH > > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > http://talebzadehmich.wordpress.com > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > >> On 1 August 2016 at 07:04, Sri wrote: >> Hi , >> >> Just wondering how spark SQL works behind the scenes does it not convert SQL >> to some Scala RDD ? Or Scala ? >> >> How to write below SQL in Scala or Scala RDD >> >>>>>>>>>> SELECT DATE,balance, >>>>>>>>>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>>>>>>>>> AND >>>>>>>>>> CURRENT ROW) daily_balance >>>>>>>>>> FROM table >> >> Thanks >> Sri >> Sent from my iPhone >> >>> On 31 Jul 2016, at 13:21, Jacek Laskowski wrote: >>> >>> Hi, >>> >>> Impossible - see >>> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr]. >>> >>> I tried to show you why you ended up with "non-empty iterator" after >>> println. You should really start with >>> http://www.scala-lang.org/documentation/ >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala >>> wrote: >>>> Tuple >>>> >>>> [Lscala.Tuple2;@65e4cb84 >>>> >>>>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski wrote: >>>>> >>>>> Hi, >>>>> >>>>> What's the result type of sliding(2,1)? >>>>> >>>>> Pozdrawiam, >>>>> Jacek Laskowski >>>>> >>>>> https://medium.com/@jaceklaskowski/ >>>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>>>> Follow me at https://twitter.com/jaceklaskowski >>>>> >>>>> >>>>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala >>>>> wrote: >>>>>> tried this no luck, wht is non-empty iterator here ? >>>>>> >>>>>> OP:- >>>>>> (-987,non-empty iterator) >>>>>> (-987,non-empty iterator) >>>>>> (-987,non-empty iterator) >>>>>> (-987,non-empty iterator) >>>>>> (-987,non-empty iterator) >>>>>> >>>>>> >>>>>> sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>>>>> .map(x => x._2.split("\\~")) >>>>>> .map(x => (x(0),x(2))) >>>>>>.map { case (key,value) => >>>>>> (key,value.toArray.toSeq.sliding(2,1).map(x >>>>>> => x.sum/x.size))}.foreach(println) >>>>>> >>>>>> >>>>>> On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala >>>>>> wrote: >>>>>>> >>>>>>
Re: sql to spark scala rdd
Hi , Just wondering how spark SQL works behind the scenes does it not convert SQL to some Scala RDD ? Or Scala ? How to write below SQL in Scala or Scala RDD >>>>>>>> SELECT DATE,balance, >>>>>>>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>>>>>>> AND >>>>>>>> CURRENT ROW) daily_balance >>>>>>>> FROM table Thanks Sri Sent from my iPhone > On 31 Jul 2016, at 13:21, Jacek Laskowski wrote: > > Hi, > > Impossible - see > http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr]. > > I tried to show you why you ended up with "non-empty iterator" after > println. You should really start with > http://www.scala-lang.org/documentation/ > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala > wrote: >> Tuple >> >> [Lscala.Tuple2;@65e4cb84 >> >>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski wrote: >>> >>> Hi, >>> >>> What's the result type of sliding(2,1)? >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala >>> wrote: >>>> tried this no luck, wht is non-empty iterator here ? >>>> >>>> OP:- >>>> (-987,non-empty iterator) >>>> (-987,non-empty iterator) >>>> (-987,non-empty iterator) >>>> (-987,non-empty iterator) >>>> (-987,non-empty iterator) >>>> >>>> >>>> sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>>> .map(x => x._2.split("\\~")) >>>> .map(x => (x(0),x(2))) >>>>.map { case (key,value) => >>>> (key,value.toArray.toSeq.sliding(2,1).map(x >>>> => x.sum/x.size))}.foreach(println) >>>> >>>> >>>> On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala >>>> wrote: >>>>> >>>>> Hi All, >>>>> >>>>> I managed to write using sliding function but can it get key as well in >>>>> my >>>>> output ? >>>>> >>>>> sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>>>> .map(x => x._2.split("\\~")) >>>>> .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => >>>>> (x,x.size)).foreach(println) >>>>> >>>>> >>>>> at the moment my output:- >>>>> >>>>> 75.0 >>>>> -25.0 >>>>> 50.0 >>>>> -50.0 >>>>> -100.0 >>>>> >>>>> I want with key how to get moving average output based on key ? >>>>> >>>>> >>>>> 987,75.0 >>>>> 987,-25 >>>>> 987,50.0 >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala >>>>> wrote: >>>>>> >>>>>> for knowledge just wondering how to write it up in scala or spark RDD. >>>>>> >>>>>> Thanks >>>>>> Sri >>>>>> >>>>>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski >>>>>> wrote: >>>>>>> >>>>>>> Why? >>>>>>> >>>>>>> Pozdrawiam, >>>>>>> Jacek Laskowski >>>>>>> >>>>>>> https://medium.com/@jaceklaskowski/ >>>>>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>>>>>> Follow me at https://twitter.com/jaceklaskowski >>>>>>> >>>>>>> >>>>>>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com >>>>>>> wrote: >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I managed to write business requirement in spark-sql and hive I am >>>>>>>> still >>>>>>>> learning scala how this below sql be written using spark RDD not >>>>>>>> spark >>>>>>>> data >>>>>>>> frames. >>>>>>>> >>>>>>>> SELECT DATE,balance, >>>>>>>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING >>>>>>>> AND >>>>>>>> CURRENT ROW) daily_balance >>>>>>>> FROM table >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> View this message in context: >>>>>>>> >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html >>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>> Nabble.com. >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sri Tummala >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>> >>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >> >> >> >> >> -- >> Thanks & Regards >> Sri Tummala >>
Re: sql to spark scala rdd
val test=sc.textFile(file).keyBy(x => x.split("\\~") (0)) .map(x => x._2.split("\\~")) .map(x => ((x(0),x(1),x(2 .map{case (account,datevalue,amount) => ((account,datevalue),(amount.toDouble))}.mapValues(x => x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ + _._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println) On Sun, Jul 31, 2016 at 12:15 PM, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Hi All, > > I already solved it using DF and spark sql I was wondering how to solve in > scala rdd, I just got the answer need to check my results compared to spark > sql thanks all for your time. > > I am trying to solve moving average using scala RDD group by key. > > > input:- > -987~20150728~100 > -987~20150729~50 > -987~20150730~-100 > -987~20150804~200 > -987~20150807~-300 > -987~20150916~100 > > > val test=sc.textFile(file).keyBy(x => x.split("\\~") (0)) > .map(x => x._2.split("\\~")) > .map(x => ((x(0),x(1),x(2 > .map{case (account,datevalue,amount) => > ((account,datevalue),(amount.toDouble))}.mapValues(x => > x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ + > _._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println) > > Op:- > > accountkey,date,balance_of_account, daily_average, sum_base_on_window > > ((-987,20150728),50.0,75.0,150.0) > ((-987,20150729),-100.0,-25.0,-50.0) > ((-987,20150730),200.0,50.0,100.0) > ((-987,20150804),-300.0,-50.0,-100.0) > ((-987,20150807),100.0,-100.0,-200.0) > > > below book is written for Hadoop Mapreduce the book has solution for > moving average but its in Java. > > > https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html > > > Sql:- > > > SELECT DATE,balance, > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) daily_balance > FROM table > > Thanks > Sri > > > > On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Check also this >> <https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html> >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> On 31 July 2016 at 19:49, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Tuple >>> >>> [Lscala.Tuple2;@65e4cb84 >>> >>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski >>> wrote: >>> >>>> Hi, >>>> >>>> What's the result type of sliding(2,1)? >>>> >>>> Pozdrawiam, >>>> Jacek Laskowski >>>> >>>> https://medium.com/@jaceklaskowski/ >>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>>> Follow me at https://twitter.com/jaceklaskowski >>>> >>>> >>>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala >>>> wrote: >>>> > tried this no luck, wht is non-empty iterator here ? >>>> > >>>> > OP:- >>>> > (-987,non-empty iterator) >>>> > (-987,non-empty iterator) >>>> > (-987,non-empty iterator) >>>> > (-987,non-empty iterator) >>>> > (-987,non-empty iterator) >>>> > >>>> > >>>> > sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>>> > .map(x => x._2.split("\\~")) >>>> > .map(x => (x(0),x(2))) >>>> > .map { case (key,value) => >>>> (key,value.toArray.toSeq.sliding(2,1).map(x >>>> > => x.sum/x.size))}.foreach(println) >>>> > >>>> > >>>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala >>>> > wrote: >>>> >> >>>> >> Hi
Re: sql to spark scala rdd
Hi All, I already solved it using DF and spark sql I was wondering how to solve in scala rdd, I just got the answer need to check my results compared to spark sql thanks all for your time. I am trying to solve moving average using scala RDD group by key. input:- -987~20150728~100 -987~20150729~50 -987~20150730~-100 -987~20150804~200 -987~20150807~-300 -987~20150916~100 val test=sc.textFile(file).keyBy(x => x.split("\\~") (0)) .map(x => x._2.split("\\~")) .map(x => ((x(0),x(1),x(2 .map{case (account,datevalue,amount) => ((account,datevalue),(amount.toDouble))}.mapValues(x => x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ + _._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println) Op:- accountkey,date,balance_of_account, daily_average, sum_base_on_window ((-987,20150728),50.0,75.0,150.0) ((-987,20150729),-100.0,-25.0,-50.0) ((-987,20150730),200.0,50.0,100.0) ((-987,20150804),-300.0,-50.0,-100.0) ((-987,20150807),100.0,-100.0,-200.0) below book is written for Hadoop Mapreduce the book has solution for moving average but its in Java. https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html Sql:- SELECT DATE,balance, SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) daily_balance FROM table Thanks Sri On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh wrote: > Check also this > <https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html> > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 31 July 2016 at 19:49, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Tuple >> >> [Lscala.Tuple2;@65e4cb84 >> >> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski wrote: >> >>> Hi, >>> >>> What's the result type of sliding(2,1)? >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala >>> wrote: >>> > tried this no luck, wht is non-empty iterator here ? >>> > >>> > OP:- >>> > (-987,non-empty iterator) >>> > (-987,non-empty iterator) >>> > (-987,non-empty iterator) >>> > (-987,non-empty iterator) >>> > (-987,non-empty iterator) >>> > >>> > >>> > sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>> > .map(x => x._2.split("\\~")) >>> > .map(x => (x(0),x(2))) >>> > .map { case (key,value) => >>> (key,value.toArray.toSeq.sliding(2,1).map(x >>> > => x.sum/x.size))}.foreach(println) >>> > >>> > >>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala >>> > wrote: >>> >> >>> >> Hi All, >>> >> >>> >> I managed to write using sliding function but can it get key as well >>> in my >>> >> output ? >>> >> >>> >> sc.textFile(file).keyBy(x => x.split("\\~") (0)) >>> >> .map(x => x._2.split("\\~")) >>> >> .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => >>> >> (x,x.size)).foreach(println) >>> >> >>> >> >>> >> at the moment my output:- >>> >> >>> >> 75.0 >>> >> -25.0 >>> >> 50.0 >>> >> -50.0 >>> >> -100.0 >>> >> >>> >> I want with key how to get moving average output based on key ? >>> >> >>> >> >>> >> 987,75.0 >>> >> 987,-25 >>> >> 987,50.0 >>> >> >>> >> Thanks >>> >> Sri >>> >> >>>
Re: sql to spark scala rdd
Tuple [Lscala.Tuple2;@65e4cb84 On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski wrote: > Hi, > > What's the result type of sliding(2,1)? > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala > wrote: > > tried this no luck, wht is non-empty iterator here ? > > > > OP:- > > (-987,non-empty iterator) > > (-987,non-empty iterator) > > (-987,non-empty iterator) > > (-987,non-empty iterator) > > (-987,non-empty iterator) > > > > > > sc.textFile(file).keyBy(x => x.split("\\~") (0)) > > .map(x => x._2.split("\\~")) > > .map(x => (x(0),x(2))) > > .map { case (key,value) => > (key,value.toArray.toSeq.sliding(2,1).map(x > > => x.sum/x.size))}.foreach(println) > > > > > > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala > > wrote: > >> > >> Hi All, > >> > >> I managed to write using sliding function but can it get key as well in > my > >> output ? > >> > >> sc.textFile(file).keyBy(x => x.split("\\~") (0)) > >> .map(x => x._2.split("\\~")) > >> .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => > >> (x,x.size)).foreach(println) > >> > >> > >> at the moment my output:- > >> > >> 75.0 > >> -25.0 > >> 50.0 > >> -50.0 > >> -100.0 > >> > >> I want with key how to get moving average output based on key ? > >> > >> > >> 987,75.0 > >> 987,-25 > >> 987,50.0 > >> > >> Thanks > >> Sri > >> > >> > >> > >> > >> > >> > >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala > >> wrote: > >>> > >>> for knowledge just wondering how to write it up in scala or spark RDD. > >>> > >>> Thanks > >>> Sri > >>> > >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski > >>> wrote: > >>>> > >>>> Why? > >>>> > >>>> Pozdrawiam, > >>>> Jacek Laskowski > >>>> > >>>> https://medium.com/@jaceklaskowski/ > >>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > >>>> Follow me at https://twitter.com/jaceklaskowski > >>>> > >>>> > >>>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com > >>>> wrote: > >>>> > Hi All, > >>>> > > >>>> > I managed to write business requirement in spark-sql and hive I am > >>>> > still > >>>> > learning scala how this below sql be written using spark RDD not > spark > >>>> > data > >>>> > frames. > >>>> > > >>>> > SELECT DATE,balance, > >>>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING > AND > >>>> > CURRENT ROW) daily_balance > >>>> > FROM table > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > -- > >>>> > View this message in context: > >>>> > > http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html > >>>> > Sent from the Apache Spark User List mailing list archive at > >>>> > Nabble.com. > >>>> > > >>>> > > - > >>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >>>> > > >>> > >>> > >>> > >>> > >>> -- > >>> Thanks & Regards > >>> Sri Tummala > >>> > >> > >> > >> > >> -- > >> Thanks & Regards > >> Sri Tummala > >> > > > > > > > > -- > > Thanks & Regards > > Sri Tummala > > > -- Thanks & Regards Sri Tummala
Re: sql to spark scala rdd
tried this no luck, wht is non-empty iterator here ? OP:- (-987,non-empty iterator) (-987,non-empty iterator) (-987,non-empty iterator) (-987,non-empty iterator) (-987,non-empty iterator) sc.textFile(file).keyBy(x => x.split("\\~") (0)) .map(x => x._2.split("\\~")) .map(x => (x(0),x(2))) .map { case (key,value) => (key,value.toArray.toSeq.sliding(2,1).map(x => x.sum/x.size))}.foreach(println) On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Hi All, > > I managed to write using sliding function but can it get key as well in my > output ? > > sc.textFile(file).keyBy(x => x.split("\\~") (0)) > .map(x => x._2.split("\\~")) > .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => > (x,x.size)).foreach(println) > > > at the moment my output:- > > 75.0 > -25.0 > 50.0 > -50.0 > -100.0 > > I want with key how to get moving average output based on key ? > > > 987,75.0 > 987,-25 > 987,50.0 > > Thanks > Sri > > > > > > > On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> for knowledge just wondering how to write it up in scala or spark RDD. >> >> Thanks >> Sri >> >> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski >> wrote: >> >>> Why? >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com >>> wrote: >>> > Hi All, >>> > >>> > I managed to write business requirement in spark-sql and hive I am >>> still >>> > learning scala how this below sql be written using spark RDD not spark >>> data >>> > frames. >>> > >>> > SELECT DATE,balance, >>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND >>> > CURRENT ROW) daily_balance >>> > FROM table >>> > >>> > >>> > >>> > >>> > >>> > -- >>> > View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html >>> > Sent from the Apache Spark User List mailing list archive at >>> Nabble.com. >>> > >>> > - >>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> > >>> >> >> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> > > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
Re: sql to spark scala rdd
Hi All, I managed to write using sliding function but can it get key as well in my output ? sc.textFile(file).keyBy(x => x.split("\\~") (0)) .map(x => x._2.split("\\~")) .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => (x,x.size)).foreach(println) at the moment my output:- 75.0 -25.0 50.0 -50.0 -100.0 I want with key how to get moving average output based on key ? 987,75.0 987,-25 987,50.0 Thanks Sri On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > for knowledge just wondering how to write it up in scala or spark RDD. > > Thanks > Sri > > On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski wrote: > >> Why? >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com >> wrote: >> > Hi All, >> > >> > I managed to write business requirement in spark-sql and hive I am still >> > learning scala how this below sql be written using spark RDD not spark >> data >> > frames. >> > >> > SELECT DATE,balance, >> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND >> > CURRENT ROW) daily_balance >> > FROM table >> > >> > >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> > > > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
Re: sql to spark scala rdd
for knowledge just wondering how to write it up in scala or spark RDD. Thanks Sri On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski wrote: > Why? > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com > wrote: > > Hi All, > > > > I managed to write business requirement in spark-sql and hive I am still > > learning scala how this below sql be written using spark RDD not spark > data > > frames. > > > > SELECT DATE,balance, > > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW) daily_balance > > FROM table > > > > > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > ----- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Thanks & Regards Sri Tummala
Re: spark local dir to HDFS ?
thanks makes sense, can anyone answer this below question ? http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-td27264.html Thanks Sri On Tue, Jul 5, 2016 at 8:15 PM, Saisai Shao wrote: > It is not worked to configure local dirs to HDFS. Local dirs are mainly > used for data spill and shuffle data persistence, it is not suitable to use > hdfs. If you met capacity problem, you could configure multiple dirs > located in different mounted disks. > > On Wed, Jul 6, 2016 at 9:05 AM, Sri wrote: > >> Hi , >> >> Space issue we are currently using /tmp and at the moment we don't have >> any mounted location setup yet. >> >> Thanks >> Sri >> >> >> Sent from my iPhone >> >> On 5 Jul 2016, at 17:22, Jeff Zhang wrote: >> >> Any reason why you want to set this on hdfs ? >> >> On Tue, Jul 5, 2016 at 3:47 PM, kali.tumm...@gmail.com < >> kali.tumm...@gmail.com> wrote: >> >>> Hi All, >>> >>> can I set spark.local.dir to HDFS location instead of /tmp folder ? >>> >>> I tried setting up temp folder to HDFS but it didn't worked can >>> spark.local.dir write to HDFS ? >>> >>> .set("spark.local.dir","hdfs://namednode/spark_tmp/") >>> >>> >>> 16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in >>> hdfs://namenode/spark_tmp/. Ignoring this directory. >>> java.io.IOException: Failed to create a temp directory (under >>> hdfs://namenode/spark_tmp/) after 10 attempts! >>> >>> >>> Thanks >>> Sri >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com>. >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> >> > -- Thanks & Regards Sri Tummala
Re: spark local dir to HDFS ?
Hi , Space issue we are currently using /tmp and at the moment we don't have any mounted location setup yet. Thanks Sri Sent from my iPhone > On 5 Jul 2016, at 17:22, Jeff Zhang wrote: > > Any reason why you want to set this on hdfs ? > >> On Tue, Jul 5, 2016 at 3:47 PM, kali.tumm...@gmail.com >> wrote: >> Hi All, >> >> can I set spark.local.dir to HDFS location instead of /tmp folder ? >> >> I tried setting up temp folder to HDFS but it didn't worked can >> spark.local.dir write to HDFS ? >> >> .set("spark.local.dir","hdfs://namednode/spark_tmp/") >> >> >> 16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in >> hdfs://namenode/spark_tmp/. Ignoring this directory. >> java.io.IOException: Failed to create a temp directory (under >> hdfs://namenode/spark_tmp/) after 10 attempts! >> >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > -- > Best Regards > > Jeff Zhang
Re: spark parquet too many small files ?
Hi Takeshi, I cant use coalesce in spark-sql shell right I know we can use coalesce in spark with scala application , here in my project we are not building jar or using python we are just executing hive query in spark-sql shell and submitting to yarn client . Example:- spark-sql --verbose --queue default --name wchargeback_event.sparksql.kali --master yarn-client --driver-memory 15g --executor-memory 15g --num-executors 10 --executor-cores 2 -f /x/home/pp_dt_fin_batch/users/ srtummala/run-spark/sql/wtr_full.sql --conf "spark.yarn.executor.memoryOverhead=8000" --conf "spark.sql.shuffle.partitions=50" --conf "spark.kyroserializer.buffer.max.mb=5g" --conf "spark.driver.maxResultSize=20g" --conf "spark.storage.memoryFraction=0.8" --conf "spark.hadoopConfiguration=2560" --conf "spark.dynamicAllocation.enabled=false$" --conf "spark.shuffle.service.enabled=false" --conf "spark.executor.instances=10" Thanks Sri On Sat, Jul 2, 2016 at 2:53 AM, Takeshi Yamamuro wrote: > Please also see https://issues.apache.org/jira/browse/SPARK-16188. > > // maropu > > On Fri, Jul 1, 2016 at 7:39 PM, kali.tumm...@gmail.com < > kali.tumm...@gmail.com> wrote: > >> I found the jira for the issue will there be a fix in future ? or no fix ? >> >> https://issues.apache.org/jira/browse/SPARK-6221 >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27267.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > --- > Takeshi Yamamuro > -- Thanks & Regards Sri Tummala
Re: set spark 1.6 with Hive 0.14 ?
Thanks Ted, I know in spark-she'll can we set same in spark-sql shell ? If I don't set hive context from my understanding spark is using its own SQL and date functions right ? Like for example interval ? Thanks Sri Sent from my iPhone > On 21 May 2016, at 08:19, Ted Yu wrote: > > In spark-shell: > > scala> import org.apache.spark.sql.hive.HiveContext > import org.apache.spark.sql.hive.HiveContext > > scala> var hc: HiveContext = new HiveContext(sc) > > FYI > >> On Sat, May 21, 2016 at 8:11 AM, Sri wrote: >> Hi , >> >> You mean hive-site.xml file right ?,I did placed the hive-site.xml in spark >> conf but not sure how spark certain date functions like interval is still >> working . >> Hive 0.14 don't have interval function but how spark is managing to do that ? >> Does spark has its own date functions ? I am using spark-sql shell for your >> information. >> >> Can I set hive context.sql in spark-Sql shell ? As we do in traditional >> spark Scala application. >> >> Thanks >> Sri >> >> Sent from my iPhone >> >>> On 21 May 2016, at 02:24, Mich Talebzadeh wrote: >>> >>> Sou want to use hive version 0.14 when using Spark 1.6? >>> >>> Go to directory $SPARK_HOME/conf and create a softlink to hive-core.xml file >>> >>> cd $SPARK_HOME >>> hduser@rhes564: /usr/lib/spark-1.6.1-bin-hadoop2.6> cd conf >>> hduser@rhes564: /usr/lib/spark-1.6.1-bin-hadoop2.6/conf> ls -ltr >>> >>> lrwxrwxrwx 1 hduser hadoop 32 May 3 17:48 hive-site.xml -> >>> /usr/lib/hive/conf/hive-site.xml >>> - >>> >>> You can see the softlink in mine. Just create one as below >>> >>> ln -s /usr/lib/hive/conf/hive-site.xml hive-site.xml >>> >>> >>> That should work >>> >>> HTH >>> >>> Dr Mich Talebzadeh >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>>> On 21 May 2016 at 00:57, kali.tumm...@gmail.com >>>> wrote: >>>> Hi All , >>>> >>>> Is there a way to ask spark and spark-sql to use Hive 0.14 version instead >>>> of inbuilt hive 1.2.1. >>>> >>>> I am testing spark-sql locally by downloading spark 1.6 from internet , I >>>> want to execute my hive queries in spark sql using hive version 0.14 can I >>>> go back to previous version just for a simple test. >>>> >>>> Please share out the steps involved. >>>> >>>> >>>> Thanks >>>> Sri >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/set-spark-1-6-with-Hive-0-14-tp26989.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >
Re: set spark 1.6 with Hive 0.14 ?
Hi , You mean hive-site.xml file right ?,I did placed the hive-site.xml in spark conf but not sure how spark certain date functions like interval is still working . Hive 0.14 don't have interval function but how spark is managing to do that ? Does spark has its own date functions ? I am using spark-sql shell for your information. Can I set hive context.sql in spark-Sql shell ? As we do in traditional spark Scala application. Thanks Sri Sent from my iPhone > On 21 May 2016, at 02:24, Mich Talebzadeh wrote: > > Sou want to use hive version 0.14 when using Spark 1.6? > > Go to directory $SPARK_HOME/conf and create a softlink to hive-core.xml file > > cd $SPARK_HOME > hduser@rhes564: /usr/lib/spark-1.6.1-bin-hadoop2.6> cd conf > hduser@rhes564: /usr/lib/spark-1.6.1-bin-hadoop2.6/conf> ls -ltr > > lrwxrwxrwx 1 hduser hadoop 32 May 3 17:48 hive-site.xml -> > /usr/lib/hive/conf/hive-site.xml > - > > You can see the softlink in mine. Just create one as below > > ln -s /usr/lib/hive/conf/hive-site.xml hive-site.xml > > > That should work > > HTH > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > http://talebzadehmich.wordpress.com > > >> On 21 May 2016 at 00:57, kali.tumm...@gmail.com >> wrote: >> Hi All , >> >> Is there a way to ask spark and spark-sql to use Hive 0.14 version instead >> of inbuilt hive 1.2.1. >> >> I am testing spark-sql locally by downloading spark 1.6 from internet , I >> want to execute my hive queries in spark sql using hive version 0.14 can I >> go back to previous version just for a simple test. >> >> Please share out the steps involved. >> >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/set-spark-1-6-with-Hive-0-14-tp26989.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Re: how to run latest version of spark in old version of spark in cloudera cluster ?
Thank you very much, well documented. Thanks Sri On Wed, Jan 27, 2016 at 8:46 PM, Deenar Toraskar wrote: > Sri > > Look at the instructions here. They are for 1.5.1, but should also work > for 1.6 > > > https://www.linkedin.com/pulse/running-spark-151-cdh-deenar-toraskar-cfa?trk=hp-feed-article-title-publish&trkSplashRedir=true&forceNoSplash=true > > Deenar > > > On 27 January 2016 at 20:16, Koert Kuipers wrote: > >> you need to build spark 1.6 for your hadoop distro, and put that on the >> proxy node and configure it correctly to find your cluster (hdfs and yarn). >> then use the spark-submit script for that spark 1.6 version to launch your >> application on yarn >> >> On Wed, Jan 27, 2016 at 3:11 PM, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Hi Koert, >>> >>> I am submitting my code (spark jar ) using spark-submit in proxy node , >>> I checked the version of the cluster and node its says 1.2 I dint really >>> understand what you mean. >>> >>> can I ask yarn to use different version of spark ? or should I say >>> override the spark_home variables to look at 1.6 spark jar ? >>> >>> Thanks >>> Sri >>> >>> On Wed, Jan 27, 2016 at 7:45 PM, Koert Kuipers >>> wrote: >>> >>>> If you have yarn you can just launch your spark 1.6 job from a single >>>> machine with spark 1.6 available on it and ignore the version of spark >>>> (1.2) that is installed >>>> On Jan 27, 2016 11:29, "kali.tumm...@gmail.com" >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> Just realized cloudera version of spark on my cluster is 1.2, the jar >>>>> which >>>>> I built using maven is version 1.6 which is causing issue. >>>>> >>>>> Is there a way to run spark version 1.6 in 1.2 version of spark ? >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> >> > -- Thanks & Regards Sri Tummala
Re: how to run latest version of spark in old version of spark in cloudera cluster ?
Hi Koert, I am submitting my code (spark jar ) using spark-submit in proxy node , I checked the version of the cluster and node its says 1.2 I dint really understand what you mean. can I ask yarn to use different version of spark ? or should I say override the spark_home variables to look at 1.6 spark jar ? Thanks Sri On Wed, Jan 27, 2016 at 7:45 PM, Koert Kuipers wrote: > If you have yarn you can just launch your spark 1.6 job from a single > machine with spark 1.6 available on it and ignore the version of spark > (1.2) that is installed > On Jan 27, 2016 11:29, "kali.tumm...@gmail.com" > wrote: > >> Hi All, >> >> Just realized cloudera version of spark on my cluster is 1.2, the jar >> which >> I built using maven is version 1.6 which is causing issue. >> >> Is there a way to run spark version 1.6 in 1.2 version of spark ? >> >> Thanks >> Sri >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- Thanks & Regards Sri Tummala
Re: org.netezza.error.NzSQLException: ERROR: Invalid datatype - TEXT
Thanks Ted trick worked, need to commit this feature in next spark release. Thanks Sri Sent from my iPhone > On 26 Jan 2016, at 15:49, Ted Yu wrote: > > Please take a look at getJDBCType() in: > sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala > > You can register dialect for Netezza as shown in > sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala > > Cheers > >> On Tue, Jan 26, 2016 at 7:26 AM, kali.tumm...@gmail.com >> wrote: >> Hi All, >> >> I am using Spark jdbc df to store data into Netezza , I think spark is >> trying to create table using data type TEXT for string column , netezza >> doesn't support data type text. >> >> how to overwrite spark method to use VARCHAR instead of data type text ? >> >> val >> sourcedfmode=sourcedf.persist(StorageLevel.MEMORY_AND_DISK).write.mode("overwrite") >> sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops) >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Re: spark 1.6 Issue
Hi Mark, I did changes to VM options in edit configuration section for the main method and Scala test case class in IntelliJ which worked ok when I executed individually, but while running maven install to create jar file the test case is failing. Can I add VM options in spark conf set in Scala test class hard coded way? Thanks Sri Sent from my iPhone > On 6 Jan 2016, at 17:43, Mark Hamstra wrote: > > It's not a bug, but a larger heap is required with the new > UnifiedMemoryManager: > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L172 > >> On Wed, Jan 6, 2016 at 6:35 AM, kali.tumm...@gmail.com >> wrote: >> Hi All, >> >> I am running my app in IntelliJ Idea (locally) my config local[*] , the code >> worked ok with spark 1.5 but when I upgraded to 1.6 I am having below issue. >> >> is this a bug in 1.6 ? I change back to 1.5 it worked ok without any error >> do I need to pass executor memory while running in local in spark 1.6 ? >> >> Exception in thread "main" java.lang.IllegalArgumentException: System memory >> 259522560 must be at least 4.718592E8. Please use a larger heap size. >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Re: how to turn off spark streaming gracefully ?
Hi Cody, KafkaUtils.createRDD totally make sense now I can run my spark job once in 15 minutes extract data out of kafka and stop ..., I rely on kafka offset for Incremental data am I right ? so no duplicate data will be returned. Thanks Sri On Fri, Dec 18, 2015 at 2:41 PM, Cody Koeninger wrote: > If you're really doing a daily batch job, have you considered just using > KafkaUtils.createRDD rather than a streaming job? > > On Fri, Dec 18, 2015 at 5:04 AM, kali.tumm...@gmail.com < > kali.tumm...@gmail.com> wrote: > >> Hi All, >> >> Imagine I have a Production spark streaming kafka (direct connection) >> subscriber and publisher jobs running which publish and subscriber >> (receive) >> data from a kafka topic and I save one day's worth of data using >> dstream.slice to Cassandra daily table (so I create daily table before >> running spark streaming job). >> >> My question if all the above code runs in some scheduler like autosys how >> should I say to spark publisher to stop publishing as it is End of day and >> to spark subscriber to stop receiving to stop receiving without killing >> the >> jobs ? if I kill my autosys scheduler turns red saying the job had failed >> etc ... >> Is there a way to stop both subscriber and publisher with out killing or >> terminating the code. >> >> Thanks >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-turn-off-spark-streaming-gracefully-tp25734.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> ----- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Thanks & Regards Sri Tummala
Re: Release data for spark 1.6?
thanks Sean and Ted, I will wait for 1.6 to be out. Happy Christmas to all ! Thanks Sri On Sat, Dec 12, 2015 at 12:18 PM, Ted Yu wrote: > Please take a look at SPARK-9078 which allows jdbc dialects to override > the query for checking table existence. > > On Dec 12, 2015, at 7:12 PM, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > > Hi Michael, Ted, > > > https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48 > > In Present spark version in line 48 there is a bug, to check whether table > exists in a database using limit doesnt work for all databases sql server > for example. > > best way to check whehter table exists in any database is to use, select * > from table where 1=2; or select 1 from table where 1=2; this supports all > the databases. > > In spark 1.6 can this change be implemented, this lets > write.mode("append") bug to go away. > > > > def tableExists(conn: Connection, table: String): Boolean = { // Somewhat > hacky, but there isn't a good way to identify whether a table exists for all > // > SQL database systems, considering "table" could also include the database > name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1" > ).executeQuery().next()).isSuccess } > > Solution:- > def tableExists(conn: Connection, table: String): Boolean = { // Somewhat > hacky, but there isn't a good way to identify whether a table exists for all > // > SQL database systems, considering "table" could also include the database > name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2" > ).executeQuery().next()).isSuccess } > > > > Thanks > Sri > > > > On Wed, Dec 9, 2015 at 10:30 PM, Michael Armbrust > wrote: > >> The release date is "as soon as possible". In order to make an Apache >> release we must present a release candidate and have 72-hours of voting by >> the PMC. As soon as there are no known bugs, the vote will pass and 1.6 >> will be released. >> >> In the mean time, I'd love support from the community testing the most >> recent release candidate. >> >> On Wed, Dec 9, 2015 at 2:19 PM, Sri wrote: >> >>> Hi Ted, >>> >>> Thanks for the info , but there is no particular release date from my >>> understanding the package is in testing there is no release date mentioned. >>> >>> Thanks >>> Sri >>> >>> >>> >>> Sent from my iPhone >>> >>> > On 9 Dec 2015, at 21:38, Ted Yu wrote: >>> > >>> > See this thread: >>> > >>> > >>> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC&subj=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+ >>> > >>> >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" < >>> kali.tumm...@gmail.com> wrote: >>> >> >>> >> Hi All, >>> >> >>> >> does anyone know exact release data for spark 1.6 ? >>> >> >>> >> Thanks >>> >> Sri >>> >> >>> >> >>> >> >>> >> -- >>> >> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html >>> >> Sent from the Apache Spark User List mailing list archive at >>> Nabble.com <http://nabble.com>. >>> >> >>> >> - >>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
Re: spark data frame write.mode("append") bug
Hi All, https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48 In Present spark version in line 48 there is a bug, to check whether table exists in a database using limit doesnt work for all databases sql server for example. best way to check whehter table exists in any database is to use, select * from table where 1=2; or select 1 from table where 1=2; this supports all the databases. In spark 1.6 can this change be implemented, this lets write.mode("append") bug to go away. def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess } Solution:- def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2").executeQuery().next()).isSuccess } Thanks On Wed, Dec 9, 2015 at 4:24 PM, Seongduk Cheon wrote: > Not for sure, but I think it is bug as of 1.5. > > Spark is using LIMIT keyword whether a table exists. > > https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48 > > If your database does not support LIMIT keyword such as SQL Server, spark > try to create table > > https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L272-L275 > > This issue has already fixed and It will be released on 1.6 > https://issues.apache.org/jira/browse/SPARK-9078 > > > -- > Cheon > > 2015-12-09 22:54 GMT+09:00 kali.tumm...@gmail.com > : > >> Hi Spark Contributors, >> >> I am trying to append data to target table using df.write.mode("append") >> functionality but spark throwing up table already exists exception. >> >> Is there a fix scheduled in later spark release ?, I am using spark 1.5. >> >> val sourcedfmode=sourcedf.write.mode("append") >> sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops) >> >> Full Code:- >> >> https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala >> >> Spring Config File:- >> >> https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml >> >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > -- Thanks & Regards Sri Tummala
Re: Release data for spark 1.6?
Hi Michael, Ted, https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48 In Present spark version in line 48 there is a bug, to check whether table exists in a database using limit doesnt work for all databases sql server for example. best way to check whehter table exists in any database is to use, select * from table where 1=2; or select 1 from table where 1=2; this supports all the databases. In spark 1.6 can this change be implemented, this lets write.mode("append") bug to go away. def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1" ).executeQuery().next()).isSuccess } Solution:- def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2" ).executeQuery().next()).isSuccess } Thanks Sri On Wed, Dec 9, 2015 at 10:30 PM, Michael Armbrust wrote: > The release date is "as soon as possible". In order to make an Apache > release we must present a release candidate and have 72-hours of voting by > the PMC. As soon as there are no known bugs, the vote will pass and 1.6 > will be released. > > In the mean time, I'd love support from the community testing the most > recent release candidate. > > On Wed, Dec 9, 2015 at 2:19 PM, Sri wrote: > >> Hi Ted, >> >> Thanks for the info , but there is no particular release date from my >> understanding the package is in testing there is no release date mentioned. >> >> Thanks >> Sri >> >> >> >> Sent from my iPhone >> >> > On 9 Dec 2015, at 21:38, Ted Yu wrote: >> > >> > See this thread: >> > >> > >> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC&subj=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+ >> > >> >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" < >> kali.tumm...@gmail.com> wrote: >> >> >> >> Hi All, >> >> >> >> does anyone know exact release data for spark 1.6 ? >> >> >> >> Thanks >> >> Sri >> >> >> >> >> >> >> >> -- >> >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html >> >> Sent from the Apache Spark User List mailing list archive at >> Nabble.com. >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Thanks & Regards Sri Tummala
Re: Release data for spark 1.6?
Hi Ted, Thanks for the info , but there is no particular release date from my understanding the package is in testing there is no release date mentioned. Thanks Sri Sent from my iPhone > On 9 Dec 2015, at 21:38, Ted Yu wrote: > > See this thread: > > http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC&subj=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+ > >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" >> wrote: >> >> Hi All, >> >> does anyone know exact release data for spark 1.6 ? >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql current time stamp function ?
Hi Ted, Gave and exception am I following right approach ? val test=sqlContext.sql("select *, monotonicallyIncreasingId() from kali") On Mon, Dec 7, 2015 at 4:52 PM, Ted Yu wrote: > Have you tried using monotonicallyIncreasingId ? > > Cheers > > On Mon, Dec 7, 2015 at 7:56 AM, Sri wrote: > >> Thanks , I found the right function current_timestamp(). >> >> different Question:- >> Is there a row_number() function in spark SQL ? Not in Data frame just >> spark SQL? >> >> >> Thanks >> Sri >> >> Sent from my iPhone >> >> On 7 Dec 2015, at 15:49, Ted Yu wrote: >> >> Does unix_timestamp() satisfy your needs ? >> See sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala >> >> On Mon, Dec 7, 2015 at 6:54 AM, kali.tumm...@gmail.com < >> kali.tumm...@gmail.com> wrote: >> >>> I found a way out. >>> >>> import java.text.SimpleDateFormat >>> import java.util.Date; >>> >>> val format = new SimpleDateFormat("-M-dd hh:mm:ss") >>> >>> val testsql=sqlContext.sql("select >>> column1,column2,column3,column4,column5 >>> ,'%s' as TIME_STAMP from TestTable limit 10".format(format.format(new >>> Date( >>> >>> >>> Thanks >>> Sri >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620p25621.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > -- Thanks & Regards Sri Tummala
Re: spark sql current time stamp function ?
Thanks , I found the right function current_timestamp(). different Question:- Is there a row_number() function in spark SQL ? Not in Data frame just spark SQL? Thanks Sri Sent from my iPhone > On 7 Dec 2015, at 15:49, Ted Yu wrote: > > Does unix_timestamp() satisfy your needs ? > See sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala > >> On Mon, Dec 7, 2015 at 6:54 AM, kali.tumm...@gmail.com >> wrote: >> I found a way out. >> >> import java.text.SimpleDateFormat >> import java.util.Date; >> >> val format = new SimpleDateFormat("-M-dd hh:mm:ss") >> >> val testsql=sqlContext.sql("select column1,column2,column3,column4,column5 >> ,'%s' as TIME_STAMP from TestTable limit 10".format(format.format(new >> Date( >> >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620p25621.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >
Re: Pass spark partition explicitly ?
Hi Richard, Thanks so my take from your discussion is we want pass explicitly partition values it have to be written inside the code. Thanks Sri On Sun, Oct 18, 2015 at 7:05 PM, Richard Eggert wrote: > If you want to override the default partitioning behavior, you have to do > so in your code where you create each RDD. Different RDDs usually have > different numbers of partitions (except when one RDD is directly derived > from another without shuffling) because they usually have different sizes, > so it wouldn't make sense to have some sort of "global" notion of how many > partitions to create. You could, if you wanted, pass partition counts in > as command line options to your application and use those values in your > code that creates the RDDs, of course. > > Rich > On Oct 18, 2015 1:57 PM, "kali.tumm...@gmail.com" > wrote: > >> Hi All, >> >> can I pass number of partitions to all the RDD explicitly while submitting >> the spark Job or di=o I need to mention in my spark code itself ? >> >> Thanks >> Sri >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Pass-spark-partition-explicitly-tp25113.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- Thanks & Regards Sri Tummala
Re: Create hashmap using two RDD's
Thanks Richard , will give a try tomorrow... Thanks Sri Sent from my iPhone > On 10 Oct 2015, at 19:15, Richard Eggert wrote: > > You should be able to achieve what you're looking for by using foldByKey to > find the latest record for each key. If you're relying on the order elements > within the file to determine which ones are the "latest" (rather than sorting > by some field within the file itself), call zipWithIndex first to give each > element a numeric index that you can use for comparisons. > > For example (type annotations are unnecessary but included for clarity): > val parsedRecords : RDD[(Key, Value)] = ??? > val indexedRecords : RDD[(Key, (Value, Int))] = parsedRecords.zipWithIndex > map {case ((k, v), n) => k -> (v,n)} > val latestRecords : RDD[(Key, Value)] = indexedRecords foldByKey(null) {(a, > b) => (a,b) match { >case (null, _) => b >case ((_, an), (_, bn)) if an < bn => b >case _ => a > } mapValues {case (v, _) => v} > > You can then write "latestRecords" out to a file however you like. Note that > I would recommend using string interpolation or the CSV output format (for > DataFrames) over that string replacement you are currently using to format > your output. > >> On Sat, Oct 10, 2015 at 1:11 PM, Kali wrote: >> Hi Richard, >> >> Requirement is to get latest records using a key i think hash map is a good >> choice for this task. >> As of now data comes from third party and we are not sure what's the latest >> record is so hash map is chosen. >> Is there anything better than hash map please let me know. >> >> Thanks >> Sri >> >> Sent from my iPad >> >>> On 10 Oct 2015, at 17:10, Richard Eggert wrote: >>> >>> Do you need the HashMap for anything else besides writing out to a file? If >>> not, there is really no need to create one at all. You could just keep >>> everything as RDDs. >>> >>>> On Oct 10, 2015 11:31 AM, "kali.tumm...@gmail.com" >>>> wrote: >>>> Got it ..., created hashmap and saved it to file please follow below steps >>>> .. >>>> >>>> val QuoteRDD=quotefile.map(x => x.split("\\|")). >>>> filter(line => line(0).contains("1017")). >>>> map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) , >>>> if (x(15) =="B") >>>> ( >>>> {if (x(25) == "") x(9) else x(25)}, >>>> {if (x(37) == "") x(11) else x(37)} >>>> ) >>>> else if (x(15) =="C" ) >>>> ( >>>>{if (x(24) == "") (x(9)) else x(24)}, >>>>{if (x(30) == "") (x(11)) else x(30)} >>>> ) >>>> else if (x(15) =="A") >>>> {(x(9),x(11))} >>>> ))) >>>> >>>> >>>> val QuoteHashMap=QuoteRDD.collect().toMap >>>> val test=QuoteHashMap.values.toSeq >>>> val test2=sc.parallelize(test.map(x => >>>> x.toString.replace("(","").replace(")",""))) >>>> test2.saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\mkdata\\test.txt") >>>> test2.collect().foreach(println) >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25014.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org > > > > -- > Rich
Re: Spark Ingestion into Relational DB
Hi, Thanks for the reply. Let me explain our scenario little bit more. Currently we have multiple data feeds through files from different systems. We run batch jobs to extract the data from files, normalize that data, match that data against Oracle database and finally consolidate the cleaned data in Oracle. I am evaluating rather than running batch jobs, can I run spark streaming from the data files to finally write the cleansed data into Oracle database. Once the data is consolidated in Oracle, it serves as the source of truth for external users. Regards, Sri Eswari. On Mon, Sep 21, 2015 at 10:55 PM, Jörn Franke wrote: > You do not need Hadoop. However, you should think about using it. If you > use Spark to load data directly from Oracle then your database might have > unexpected loads of data once a Spark node may fail. Additionally, the > Oracle Database, if it is not based on local disk, may have a storage > bottleneck. Furthermore, Spark standalone has no resource management > mechanism for supporting different slas, you may need yarn (hadoop) for > that. Finally, using the Oracle Database for storing all the data may be an > expensive exercise. What I have seen often is that hadoop is used for > storing all the data and managing the resources. Spark can be used for > machine learning over this data and the Oracle Database (or any relational > datastore, Nosql database, in-memory db) is used to serve the data to a lot > of users. This is also the basic idea behind the lambda architecture. > > Le mar. 22 sept. 2015 à 7:13, Sri a écrit : > >> Hi, >> >> We have a usecase where we get the dated from different systems and >> finally >> data will be consolidated into Oracle Database. Does spark is a valid >> useless for this scenario. Currently we also don't have any big data >> component. In case if we go with Spark to ingest data, does it require >> hadoop. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Spark Ingestion into Relational DB
Hi, We have a usecase where we get the dated from different systems and finally data will be consolidated into Oracle Database. Does spark is a valid useless for this scenario. Currently we also don't have any big data component. In case if we go with Spark to ingest data, does it require hadoop. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org