I am using the below code and using kryo serializer .when i run this code i 
got this error : Task not serializable in commented line2) how broadcast 
variables are treated in exceotu.are they local variables or can be used in any 
function defined as global variables.
object StreamingLogInput {  def main(args: Array[String]) {    val master = 
args(0)    val conf = new SparkConf().setAppName("StreamingLogInput")    // 
Create a StreamingContext with a 1 second batch size        val sc = new 
SparkContext(conf)    val lines=sc.parallelize(List("eoore is test","testing is 
error report"))    //val ssc = new StreamingContext(sc, Seconds(30))    //val 
lines = ssc.socketTextStream("localhost", 7777)    val 
filter=sc.textFile("/user/nadabas/filters/fltr")    val 
filarr=filter.collect().toArray    val broadcastVar = sc.broadcast(filarr)      
  // val out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}    
val out=lines.filter(x=>fil(broadcastVar.value,x))  //error is coming        
out.collect()      }  def fil(x1:Array[String],y1:String)={    val y=y1 // val 
x=broadcastVar.value    val x=x1  var flag:Boolean=false     for(a<-x)  {    
if(y.contains(a))    flag=true    }    flag    }   }

   

Reply via email to