You have to create input1Pair inside foreachRDD function. As your original
code, one solution will look like as following:

val input2Pair = input2.map(x => (x._1, x))

input2Pair.cache()

streamData.foreachRDD{rdd =>
 if(!rdd.isEmpty()){
   val input1Pair = rdd.map(x => (x._1, x))
   val joinData = input1Pair.leftOuterJoin(input2Pair)
   val result = joinData.mapValues{
case(v, Some(a)) => 1L
case(v, None) => 0

 ​  ​
}.reduceByKey(_ + _).filter(_._2 > 1)

​   //process result dstream

 }
}

2016-06-21 0:03 GMT+07:00 Praseetha <prasikris...@gmail.com>:

> Thanks a lot for the response.
> input1Pair is a DStream. I tried with the code snippet below,
>
>     result.foreachRDD{externalRDD =>
>        if(!externalRDD.isEmpty()){
>          val ss = input1Pair.transform{ rdd =>
> input2Pair.leftOuterJoin(rdd)}
>        }else{
>          val ss = input1Pair.transform{ rdd =>
> input2Pair.leftOuterJoin(rdd)}
>        }
>      }
>
> I'm getting the following exception:
> *java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after starting a context is not supported*
> * at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)*
> * at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)*
> * at
> org.apache.spark.streaming.dstream.TransformedDStream.<init>(TransformedDStream.scala:25)*
> * at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)*
> * at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)*
>
> I don't think we can perform transformation on RDDs,that are outside for
> foreachRDD.
> My requirement is to figure out if the DStream 'result' is empty or not
> and based on the result, perform some operation on input1Pair DStream and
> input2Pair RDD.
>
>
> On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan <newvalu...@gmail.com>
> wrote:
>
>> Hi Praseetha,
>> In order to check if DStream is empty or not, using isEmpty method is
>> correct. I think the problem here is calling  
>> input1Pair.lefOuterJoin(input2Pair).
>> I guess input1Pair rdd comes from above transformation. You should do it
>> on DStream instead. In this case, do any transformation with x variable
>> instead.
>> If you use input2Pair rdd a lot, you can consider caching it for better
>> performance.
>>
>> 2016-06-20 19:30 GMT+07:00 Praseetha <prasikris...@gmail.com>:
>>
>>>
>>> Hi Experts,
>>>
>>> I have 2 inputs, where first input is stream (say input1) and the second
>>> one is batch (say input2). I want to figure out if the keys in first input
>>> matches single row or more than one row in the second input. The further
>>> transformations/logic depends on the number of rows matching, whether
>>> single row matches or multiple rows match (for atleast one key in the first
>>> input)
>>>
>>> if(single row matches){
>>>      // do some tranformation
>>> }else{
>>>      // do some transformation
>>> }
>>>
>>> Code that i tried so far
>>>
>>> val input1Pair = streamData.map(x => (x._1, x))
>>> val input2Pair = input2.map(x => (x._1, x))
>>> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
>>> val result = joinData.mapValues{
>>>     case(v, Some(a)) => 1L
>>>     case(v, None) => 0
>>>  }.reduceByKey(_ + _).filter(_._2 > 1)
>>>
>>> I have done the above coding. When I do result.print, it prints nothing
>>> if all the keys matches only one row in the input2. With the fact that the
>>> DStream may have multiple RDDs, not sure how to figure out if the DStream
>>> is empty or not.
>>>
>>> I tried using foreachRDD, but the streaming app stops abruptly.
>>>
>>> Inside foreachRDD i was performing transformations with other RDDs. like,
>>>
>>> result.foreachRDD{ x=>
>>>
>>> if(x.isEmpty){
>>>
>>> val out = input1Pair.lefOuterJoin(input2Pair)
>>>
>>> }else{
>>>
>>> val out = input1Pair.rightOuterJoin(input2Pair)
>>>
>>> }
>>>
>>> Can you please suggest.
>>>
>>>
>>> Regds,
>>> --Praseetha
>>>
>>
>>
>

Reply via email to