Hi, 

I'm getting an exception using the Jackson with Spark & Scala.
I'm using Scala & Case Classes to represent the Json that i'm trying to
parse. 

Its being caused by: 

*Caused by: java.io.NotSerializableException:
com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier$
*

I using a modification of a sample from 'learning spark' (book from
oreilly). 

Any help would be greatly appreciated, as I'm blocked on this currently.


/

package com.companies


import org.apache.spark._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature

case class Address (
        city: Option[String]=None, 
        country: Option[String]=None, 
        @JsonProperty("post_code")
        postCode: Option[String]=None
)

case class Office (
        name: Option[String]=None,
        addresses:List[Address]
)

case class Company(
        @JsonProperty("name")
        name: Option[String]=None, 
        @JsonProperty("vat_number")
        vatNumber: Option[String]=None, 
        offices:Option[List[Office]]=None
)


object ParseCompanyInformationJson {

def main(args: Array[String]) {
val inputFile = "/data/companies.json"
val outputFile = "/data/companies-output.json"

val conf = new SparkConf().setMaster("local").setAppName("Company
Information Json")
val sc = new SparkContext(conf)
val input = sc.textFile(inputFile)

val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
val result = input.flatMap(record => {
  try {
    Some(mapper.readValue(record, classOf[Company]))
  } catch {
    case e: Exception => None
  }
})

result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
}
}/

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:278)
at
com.elevate.companies.ParseCompanyInformationJson$.main(ParseCompanyInformationJson.scala:26)
at
com.elevate.companies.ParseCompanyInformationJson.main(ParseCompanyInformationJson.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier$
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to