hi ,all there two examples one is throw Task not serializable when execute in spark shell,the other one is ok,i am very puzzled,can anyone give what's different about this two code and why the other is ok
1.The one which throw Task not serializable : import org.apache.spark._ import SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.broadcast._ @transient val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.textFileStream("/a.log") val testFun = (line:String) => { if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){ true } else{ false } } val p_date_bc = sc.broadcast("^\\w+ \\w<file://\\w>+ \\d<file://\\d>+ \\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>} \\d{4}".r<file://\\d{4}".r>) val p_ORA_bc = sc.broadcast("^ORA-\\d+.+".r) val A = (iter: Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex]) => { val p_date = data_bc.value val p_ORA = ORA_bc.value var res = List[String]() var lasttime = "" while (iter.hasNext) { val line = iter.next.toString val currentcode = p_ORA findFirstIn line getOrElse null if (currentcode != null){ res ::= lasttime + " | " + currentcode }else{ val currentdate = p_date findFirstIn line getOrElse null if (currentdate != null){ lasttime = currentdate } } } res.iterator } val cdd = lines.filter(testFun).mapPartitions(x => A(x,p_date_bc,p_ORA_bc)) //org.apache.spark.SparkException: Task not serializable 2.The other one is ok: import org.apache.spark._ import SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.broadcast._ val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.textFileStream("/a.log") val testFun = (line:String) => { if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){ true } else{ false } } val A = (iter: Iterator[String]) => { var res = List[String]() var lasttime = "" while (iter.hasNext) { val line = iter.next.toString val currentcode = "^\\w+ \\w<file://\\w>+ \\d<file://\\d>+ \\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>} \\d{4}".r.findFirstIn(line).getOrElse(null<file://\\d{4}".r.findFirstIn(line).getOrElse(null>) if (currentcode != null){ res ::= lasttime + " | " + currentcode }else{ val currentdate = "^ORA-\\d+.+".r.findFirstIn(line).getOrElse(null) if (currentdate != null){ lasttime = currentdate } } } res.iterator } val cdd= lines.filter(testFun).mapPartitions(A)