to get past this you can move the mapper creation code down into the closure.
its then created on the worker node so it doesnt need to be serialized. 


// Parse it into a specific case class. We use flatMap to handle errors 
// by returning an empty list (None) if we encounter an issue and a 
// list with one element if everything is ok (Some(_)). 
val result = input.flatMap(record => { 
  try { 
 val mapper = new ObjectMapper with ScalaObjectMapper 
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
mapper.registerModule(DefaultScalaModule) 
    Some(mapper.readValue(record, classOf[Company])) 
  } catch { 
    case e: Exception => None 
  } 
}) 

result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile) 
} 
}


BUT for more efficiency look into creating the mapper in a *mapPartitions* 
iterator, which means it'll be created on the worker node but only per
partition and not for every row like above.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to