In your original version, the object is referenced by the function but
it's on the driver, and so has to be serialized. This leads to an
error since it's not serializable. Instead, you want to recreate the
object locally on each of the remote machines.

In your third version you are holding the parser in a static member of
a class, in your Scala object. When you call the parse method, you're
calling it on the instance of the CSVParserPlus class that was loaded
on the remote worker. It loads and creates its own copy of the parser.

A maybe more compact solution is to use mapPartitions, and create the
parser once at the start. This avoids needing this static / singleton
pattern, but also means the parser is created only once per partition.

On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo
<fanilo.andrianas...@worldline.com> wrote:
> Hello Spark fellows J
>
>
>
> I’m a new user of Spark and Scala and have been using both for 6 months
> without too many problems.
>
> Here I’m looking for best practices for using non-serializable classes
> inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
>
>
>
> Suppose I am using OpenCSV parser to parse an input file. So inside my main
> :
>
>
>
> val sc = new SparkContext("local[2]", "App")
>
> val heyRDD = sc.textFile("…")
>
>
>
> val csvparser = new CSVParser(';')
>
> val heyMap = heyRDD.map { line =>
>
>       val temp = csvparser.parseLine(line)
>
>       (temp(1), temp(4))
>
> }
>
>
>
>
>
> This gives me a java.io.NotSerializableException:
> au.com.bytecode.opencsv.CSVParser, which seems reasonable.
>
>
>
> From here I could see 3 solutions :
>
> 1/ Extending CSVParser with Serialisable properties, which adds a lot of
> boilerplate code if you ask me
>
> 2/ Using Kryo Serialization (still need to define a serializer)
>
> 3/ Creating an object with an instance of the class I want to use, typically
> :
>
>
>
> object CSVParserPlus {
>
>
>
>   val csvParser = new CSVParser(';')
>
>
>
>   def parse(line: String) = {
>
>     csvParser.parseLine(line)
>
>   }
>
> }
>
>
>
>
>
>     val heyMap = heyRDD.map { line =>
>
>       val temp = CSVParserPlus.parse(line)
>
>       (temp(1), temp(4))
>
>     }
>
>
>
> Third solution works and I don’t get how, so I was wondering how worked the
> closure system inside Spark to be able to serialize an object with a
> non-serializable instance. How does that work ? Does it hinder performance ?
> Is it a good solution ? How do you manage this problem ?
>
>
>
> Any input would be greatly appreciated
>
>
>
> Best regards,
>
> Fanilo
>
>
> ________________________________
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended solely
> for the addressee; it may also be privileged. If you receive this e-mail in
> error, please notify the sender immediately and destroy it. As its integrity
> cannot be secured on the Internet, the Worldline liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to