Did you try this?
*val out=lines.filter(xx={*
val y=xx
val x=broadcastVar.value
var flag:Boolean=false
for(a-x)
{
if(y.contains(a))
flag=true
}
flag
}
*})*
Thanks
Best Regards
On Wed, Jul 15, 2015 at 8:10 PM, Naveen Dabas naveen.u...@ymail.com wrote:
I am using the below code and using kryo serializer .when i run this code
i got this error : Task not serializable in commented line
2) how broadcast variables are treated in exceotu.are they local variables
or can be used in any function defined as global variables.
object StreamingLogInput {
def main(args: Array[String]) {
val master = args(0)
val conf = new SparkConf().setAppName(StreamingLogInput)
// Create a StreamingContext with a 1 second batch size
val sc = new SparkContext(conf)
val lines=sc.parallelize(List(eoore is test,testing is error
report))
//val ssc = new StreamingContext(sc, Seconds(30))
//val lines = ssc.socketTextStream(localhost, )
val filter=sc.textFile(/user/nadabas/filters/fltr)
val filarr=filter.collect().toArray
val broadcastVar = sc.broadcast(filarr)
// val
out=lines.transform{rdd=rdd.filter(x=fil(broadcastVar.value,x))}
*val out=lines.filter(x=fil(broadcastVar.value,x)) //error is coming*
out.collect()
}
def fil(x1:Array[String],y1:String)={
val y=y1
// val x=broadcastVar.value
val x=x1
var flag:Boolean=false
for(a-x)
{
if(y.contains(a))
flag=true
}
flag
}
}