I am using the below code and using kryo serializer .when i run this code i
got this error : Task not serializable in commented line2) how broadcast
variables are treated in exceotu.are they local variables or can be used in any
function defined as global variables.
object StreamingLogInput { def main(args: Array[String]) { val master =
args(0) val conf = new SparkConf().setAppName("StreamingLogInput") //
Create a StreamingContext with a 1 second batch size val sc = new
SparkContext(conf) val lines=sc.parallelize(List("eoore is test","testing is
error report")) //val ssc = new StreamingContext(sc, Seconds(30)) //val
lines = ssc.socketTextStream("localhost", 7777) val
filter=sc.textFile("/user/nadabas/filters/fltr") val
filarr=filter.collect().toArray val broadcastVar = sc.broadcast(filarr)
// val out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}
val out=lines.filter(x=>fil(broadcastVar.value,x)) //error is coming
out.collect() } def fil(x1:Array[String],y1:String)={ val y=y1 // val
x=broadcastVar.value val x=x1 var flag:Boolean=false for(a<-x) {
if(y.contains(a)) flag=true } flag } }