Did you try this?

*val out=lines.filter(xx=>{*

val y=xx
  val x=broadcastVar.value

  var flag:Boolean=false
     for(a<-x)
  {
    if(y.contains(a))
    flag=true
    }
    flag
    }


*})*


Thanks
Best Regards

On Wed, Jul 15, 2015 at 8:10 PM, Naveen Dabas <[email protected]> wrote:

>
>
>
>
>
> I am using the below code and using kryo serializer .when i run this code
> i got this error : Task not serializable in commented line
> 2) how broadcast variables are treated in exceotu.are they local variables
> or can be used in any function defined as global variables.
>
> object StreamingLogInput {
>   def main(args: Array[String]) {
>     val master = args(0)
>     val conf = new SparkConf().setAppName("StreamingLogInput")
>     // Create a StreamingContext with a 1 second batch size
>
>     val sc = new SparkContext(conf)
>     val lines=sc.parallelize(List("eoore is test","testing is error
> report"))
>     //val ssc = new StreamingContext(sc, Seconds(30))
>     //val lines = ssc.socketTextStream("localhost", 7777)
>     val filter=sc.textFile("/user/nadabas/filters/fltr")
>     val filarr=filter.collect().toArray
>     val broadcastVar = sc.broadcast(filarr)
>
>     // val
> out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}
>
> *val out=lines.filter(x=>fil(broadcastVar.value,x))  //error is coming*
>
>     out.collect()
>
>   }
>   def fil(x1:Array[String],y1:String)={
>     val y=y1
>  // val x=broadcastVar.value
>     val x=x1
>   var flag:Boolean=false
>      for(a<-x)
>   {
>     if(y.contains(a))
>     flag=true
>     }
>     flag
>     }
>    }
>
>
>

Reply via email to