hi ,all

there two examples one is throw Task not serializable when execute in spark 
shell,the other one is ok,i am very puzzled,can anyone give what's different 
about this two code and why the other is ok

1.The one which throw Task not serializable :

import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._



@transient val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("/a.log")





val testFun = (line:String) => {
            if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
                true
            }
            else{
                false
            }
        }

val p_date_bc = sc.broadcast("^\\w+ \\w<file://\\w>+ \\d<file://\\d>+ 
\\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>} \\d{4}".r<file://\\d{4}".r>)
val p_ORA_bc = sc.broadcast("^ORA-\\d+.+".r)
val A = (iter: 
Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex])
 => {
            val p_date = data_bc.value
            val p_ORA = ORA_bc.value
            var res = List[String]()
            var lasttime = ""

            while (iter.hasNext) {
                val line = iter.next.toString
                val currentcode = p_ORA findFirstIn line getOrElse null
                if (currentcode != null){
                    res ::= lasttime + " | " + currentcode
                }else{
                    val currentdate = p_date findFirstIn line getOrElse null
                    if (currentdate != null){
                        lasttime = currentdate
                    }
                }
            }
            res.iterator
        }

val cdd = lines.filter(testFun).mapPartitions(x => A(x,p_date_bc,p_ORA_bc))  
//org.apache.spark.SparkException: Task not serializable



2.The other one is ok:



import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._



val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("/a.log")





val testFun = (line:String) => {
            if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
                true
            }
            else{
                false
            }
        }



val A = (iter: Iterator[String]) => {

            var res = List[String]()
            var lasttime = ""
            while (iter.hasNext) {
                val line = iter.next.toString
                val currentcode = "^\\w+ \\w<file://\\w>+ \\d<file://\\d>+ 
\\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>} 
\\d{4}".r.findFirstIn(line).getOrElse(null<file://\\d{4}".r.findFirstIn(line).getOrElse(null>)
                if (currentcode != null){
                    res ::= lasttime + " | " + currentcode
                }else{
                 val currentdate = 
"^ORA-\\d+.+".r.findFirstIn(line).getOrElse(null)
                    if (currentdate != null){
                        lasttime = currentdate
                    }
                }
            }
            res.iterator
        }



val cdd= lines.filter(testFun).mapPartitions(A)








Reply via email to