HiveContext.getOrCreate not accessible
Hi All, I have a streaming app and when i try invoking the HiveContext.getOrCreate, it errors out with the following stmt. 'object HiveContext in package hive cannot be accessed in package org.apache.spark.sql.hive' I would require HiveContext instead of SQLContext for my application and creating new HiveContext everytime would not be a feasible solution. Here is my code snippet: object sampleStreamingApp { def createStreamingContext(checkpointDirectory: String): StreamingContext = { val conf = new SparkConf().setAppName("sampleStreaming") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Milliseconds(5000)) ssc.checkpoint(checkpointDirectory) val smDStream = ssc.textFileStream("/user/hdpuser/data") val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq( x ) ) smSplitted.foreachRDD { rdd => val sqlContext = HiveContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ } } ssc } def main(args: Array[String]) { val checkpointDirectory = "hdfs://localhost:8020/user/dfml/checkpointing/AAA" val ssc = StreamingContext.getActiveOrCreate(checkpointDirectory, () => createStreamingContext(checkpointDirectory)) ssc.start() ssc.awaitTermination() } } Any help would be appreciated. Regds, --Praseetha
Re: Unable to compare SparkSQL Date columns
Hi Mich, Even i'm getting similar output. The dates that are passed as input are different from the one in the output. Since its an inner join, the expected result is [2015-12-31,2015-12-31,1,105] [2016-01-27,2016-01-27,5,101] Thanks & Regds, --Praseetha On Tue, Sep 13, 2016 at 11:21 PM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > Hi Praseetha, > > This is how I have written this. > > case class TestDate (id: String, loginTime: java.sql.Date) > val formate = new SimpleDateFormat("-MM-DD") > val TestDateData = sc.parallelize(List( > ("1", new java.sql.Date(formate.parse("2016-01-31").getTime)), > ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)), > ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)), > ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)), > ("5", new java.sql.Date(formate.parse("2016-01-27").getTime)) > )) > val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2))) > val fp = firstPair.toDF > case class TestDate2 (id2: String, loginTime2: java.sql.Date) > val TestDateData1 = sc.parallelize(List( > ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)), > ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)), > ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)), > ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)), > ("105", new java.sql.Date(formate.parse("2016-01-31").getTime)) > )) > val secondPair = TestDateData1.map(x => ( new TestDate2(x._1, x._2))) > val sp = secondPair.toDF > val rs = > fp.join(sp,fp("loginTime")===sp("loginTime2"),"inner").select('loginTime,'loginTime2, > 'id,'id2).show > > This is what I get > > [2015-12-27,2015-12-27,1,101] > [2015-12-27,2015-12-27,1,102] > [2015-12-27,2015-12-27,1,103] > [2015-12-27,2015-12-27,1,104] > [2015-12-27,2015-12-27,1,105] > [2015-12-27,2015-12-27,3,101] > [2015-12-27,2015-12-27,3,102] > [2015-12-27,2015-12-27,3,103] > [2015-12-27,2015-12-27,3,104] > [2015-12-27,2015-12-27,3,105] > [2015-12-27,2015-12-27,4,101] > [2015-12-27,2015-12-27,4,102] > [2015-12-27,2015-12-27,4,103] > [2015-12-27,2015-12-27,4,104] > [2015-12-27,2015-12-27,4,105] > [2015-12-27,2015-12-27,5,101] > [2015-12-27,2015-12-27,5,102] > [2015-12-27,2015-12-27,5,103] > [2015-12-27,2015-12-27,5,104] > [2015-12-27,2015-12-27,5,105] > rs: Unit = () > > > Is this what you are expecting? > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 13 September 2016 at 16:46, Praseetha <prasikris...@gmail.com> wrote: > >> Hi Mich, >> >> val formate = new SimpleDateFormat("-MM-DD") >> >> Thanks & Regds, >> --Praseetha >> >> On Tue, Sep 13, 2016 at 8:50 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi Praseetha. >>> >>> :32: error: not found: value formate >>> Error occurred in an application involving default arguments. >>>("1", new java.sql.Date(formate.parse("2 >>> 016-01-31").getTime)), >>> >>> What is that formate? >>> >>> Thanks >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >
Re: Unable to compare SparkSQL Date columns
Hi Mich, Thanks a lot for your reply. Here is the sample case class TestDate (id: String, loginTime: java.sql.Date) val formate = new SimpleDateFormat("-MM-DD") val TestDateData = sc.parallelize(List( ("1", new java.sql.Date(formate.parse("2016-01-31").getTime)), ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)), ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)), ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)), ("5", new java.sql.Date(formate.parse("2016-01-27").getTime)) )) val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2))) val TestDateData1 = sc.parallelize(List( ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)), ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)), ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)), ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)), ("105", new java.sql.Date(formate.parse("2016-01-31").getTime)) )) val secondPair = TestDateData1.map(x => ( new TestDate(x._1, x._2))) firstPair.toDF.registerTempTable("firstTable") secondPair.toDF.registerTempTable("secondTable") val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on firstTable.loginTime = secondTable.loginTime") I tried the following query, sqlContext.sql("select loginTime from firstTable") Even this query gives the wrong dates. Regds, --Praseetha On Tue, Sep 13, 2016 at 6:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Can you send the rdds that just creates those two dates? > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 13 September 2016 at 13:54, Praseetha <prasikris...@gmail.com> wrote: > >> >> Hi All, >> >> I have a case class in scala case class TestDate (id: String, loginTime: >> java.sql.Date) >> >> I created 2 RDD's of type TestDate >> >> I wanted to do an inner join on two rdd's where the values of loginTime >> column is equal. Please find the code snippet below, >> >> firstRDD.toDF.registerTempTable("firstTable") >> secondRDD.toDF.registerTempTable("secondTable") >> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on >> to_date(firstTable.loginTime) = to_date(secondTable.loginTime)") >> >> I'm not getting any exception. But i'm not getting correct answer too. It >> does a cartesian and some random dates are generated in the result. >> >> >> Regds, >> --Praseetha >> > >
Unable to compare SparkSQL Date columns
Hi All, I have a case class in scala case class TestDate (id: String, loginTime: java.sql.Date) I created 2 RDD's of type TestDate I wanted to do an inner join on two rdd's where the values of loginTime column is equal. Please find the code snippet below, firstRDD.toDF.registerTempTable("firstTable") secondRDD.toDF.registerTempTable("secondTable") val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on to_date(firstTable.loginTime) = to_date(secondTable.loginTime)") I'm not getting any exception. But i'm not getting correct answer too. It does a cartesian and some random dates are generated in the result. Regds, --Praseetha
Re: Verifying if DStream is empty
Thanks a lot for the response. input1Pair is a DStream. I tried with the code snippet below, result.foreachRDD{externalRDD => if(!externalRDD.isEmpty()){ val ss = input1Pair.transform{ rdd => input2Pair.leftOuterJoin(rdd)} }else{ val ss = input1Pair.transform{ rdd => input2Pair.leftOuterJoin(rdd)} } } I'm getting the following exception: *java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported* * at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)* * at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)* * at org.apache.spark.streaming.dstream.TransformedDStream.(TransformedDStream.scala:25)* * at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)* * at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)* I don't think we can perform transformation on RDDs,that are outside for foreachRDD. My requirement is to figure out if the DStream 'result' is empty or not and based on the result, perform some operation on input1Pair DStream and input2Pair RDD. On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan <newvalu...@gmail.com> wrote: > Hi Praseetha, > In order to check if DStream is empty or not, using isEmpty method is > correct. I think the problem here is calling > input1Pair.lefOuterJoin(input2Pair). > I guess input1Pair rdd comes from above transformation. You should do it > on DStream instead. In this case, do any transformation with x variable > instead. > If you use input2Pair rdd a lot, you can consider caching it for better > performance. > > 2016-06-20 19:30 GMT+07:00 Praseetha <prasikris...@gmail.com>: > >> >> Hi Experts, >> >> I have 2 inputs, where first input is stream (say input1) and the second >> one is batch (say input2). I want to figure out if the keys in first input >> matches single row or more than one row in the second input. The further >> transformations/logic depends on the number of rows matching, whether >> single row matches or multiple rows match (for atleast one key in the first >> input) >> >> if(single row matches){ >> // do some tranformation >> }else{ >> // do some transformation >> } >> >> Code that i tried so far >> >> val input1Pair = streamData.map(x => (x._1, x)) >> val input2Pair = input2.map(x => (x._1, x)) >> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} >> val result = joinData.mapValues{ >> case(v, Some(a)) => 1L >> case(v, None) => 0 >> }.reduceByKey(_ + _).filter(_._2 > 1) >> >> I have done the above coding. When I do result.print, it prints nothing >> if all the keys matches only one row in the input2. With the fact that the >> DStream may have multiple RDDs, not sure how to figure out if the DStream >> is empty or not. >> >> I tried using foreachRDD, but the streaming app stops abruptly. >> >> Inside foreachRDD i was performing transformations with other RDDs. like, >> >> result.foreachRDD{ x=> >> >> if(x.isEmpty){ >> >> val out = input1Pair.lefOuterJoin(input2Pair) >> >> }else{ >> >> val out = input1Pair.rightOuterJoin(input2Pair) >> >> } >> >> Can you please suggest. >> >> >> Regds, >> --Praseetha >> > >
Verifying if DStream is empty
Hi Experts, I have 2 inputs, where first input is stream (say input1) and the second one is batch (say input2). I want to figure out if the keys in first input matches single row or more than one row in the second input. The further transformations/logic depends on the number of rows matching, whether single row matches or multiple rows match (for atleast one key in the first input) if(single row matches){ // do some tranformation }else{ // do some transformation } Code that i tried so far val input1Pair = streamData.map(x => (x._1, x)) val input2Pair = input2.map(x => (x._1, x)) val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} val result = joinData.mapValues{ case(v, Some(a)) => 1L case(v, None) => 0 }.reduceByKey(_ + _).filter(_._2 > 1) I have done the above coding. When I do result.print, it prints nothing if all the keys matches only one row in the input2. With the fact that the DStream may have multiple RDDs, not sure how to figure out if the DStream is empty or not. I tried using foreachRDD, but the streaming app stops abruptly. Inside foreachRDD i was performing transformations with other RDDs. like, result.foreachRDD{ x=> if(x.isEmpty){ val out = input1Pair.lefOuterJoin(input2Pair) }else{ val out = input1Pair.rightOuterJoin(input2Pair) } Can you please suggest. Regds, --Praseetha