You have to create input1Pair inside foreachRDD function. As your original code, one solution will look like as following:
val input2Pair = input2.map(x => (x._1, x)) input2Pair.cache() streamData.foreachRDD{rdd => if(!rdd.isEmpty()){ val input1Pair = rdd.map(x => (x._1, x)) val joinData = input1Pair.leftOuterJoin(input2Pair) val result = joinData.mapValues{ case(v, Some(a)) => 1L case(v, None) => 0 }.reduceByKey(_ + _).filter(_._2 > 1) //process result dstream } } 2016-06-21 0:03 GMT+07:00 Praseetha <prasikris...@gmail.com>: > Thanks a lot for the response. > input1Pair is a DStream. I tried with the code snippet below, > > result.foreachRDD{externalRDD => > if(!externalRDD.isEmpty()){ > val ss = input1Pair.transform{ rdd => > input2Pair.leftOuterJoin(rdd)} > }else{ > val ss = input1Pair.transform{ rdd => > input2Pair.leftOuterJoin(rdd)} > } > } > > I'm getting the following exception: > *java.lang.IllegalStateException: Adding new inputs, transformations, and > output operations after starting a context is not supported* > * at > org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)* > * at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)* > * at > org.apache.spark.streaming.dstream.TransformedDStream.<init>(TransformedDStream.scala:25)* > * at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)* > * at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)* > > I don't think we can perform transformation on RDDs,that are outside for > foreachRDD. > My requirement is to figure out if the DStream 'result' is empty or not > and based on the result, perform some operation on input1Pair DStream and > input2Pair RDD. > > > On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan <newvalu...@gmail.com> > wrote: > >> Hi Praseetha, >> In order to check if DStream is empty or not, using isEmpty method is >> correct. I think the problem here is calling >> input1Pair.lefOuterJoin(input2Pair). >> I guess input1Pair rdd comes from above transformation. You should do it >> on DStream instead. In this case, do any transformation with x variable >> instead. >> If you use input2Pair rdd a lot, you can consider caching it for better >> performance. >> >> 2016-06-20 19:30 GMT+07:00 Praseetha <prasikris...@gmail.com>: >> >>> >>> Hi Experts, >>> >>> I have 2 inputs, where first input is stream (say input1) and the second >>> one is batch (say input2). I want to figure out if the keys in first input >>> matches single row or more than one row in the second input. The further >>> transformations/logic depends on the number of rows matching, whether >>> single row matches or multiple rows match (for atleast one key in the first >>> input) >>> >>> if(single row matches){ >>> // do some tranformation >>> }else{ >>> // do some transformation >>> } >>> >>> Code that i tried so far >>> >>> val input1Pair = streamData.map(x => (x._1, x)) >>> val input2Pair = input2.map(x => (x._1, x)) >>> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} >>> val result = joinData.mapValues{ >>> case(v, Some(a)) => 1L >>> case(v, None) => 0 >>> }.reduceByKey(_ + _).filter(_._2 > 1) >>> >>> I have done the above coding. When I do result.print, it prints nothing >>> if all the keys matches only one row in the input2. With the fact that the >>> DStream may have multiple RDDs, not sure how to figure out if the DStream >>> is empty or not. >>> >>> I tried using foreachRDD, but the streaming app stops abruptly. >>> >>> Inside foreachRDD i was performing transformations with other RDDs. like, >>> >>> result.foreachRDD{ x=> >>> >>> if(x.isEmpty){ >>> >>> val out = input1Pair.lefOuterJoin(input2Pair) >>> >>> }else{ >>> >>> val out = input1Pair.rightOuterJoin(input2Pair) >>> >>> } >>> >>> Can you please suggest. >>> >>> >>> Regds, >>> --Praseetha >>> >> >> >