I faced the same problem and ended up using the same approach that Sean
suggested
https://github.com/AyasdiOpenSource/df/blob/master/src/main/scala/com/ayasdi/df/DF.scala#L313

Option 3 also seems reasonable. It should create a CSVParser per executor.


On Thu, Sep 4, 2014 at 6:58 AM, Andrianasolo Fanilo <
fanilo.andrianas...@worldline.com> wrote:

> Thank you for the quick answer, looks good to me
>
> Though that brings me to another question. Suppose we want to open a
> connection to a database, an ElasticSearch, etc...
>
> I now have two proceedings :
> 1/ use .mapPartitions and setup the connection at the start of each
> partition, so I get a connection per partition
> 2/ use a singleton object, which loads a connection per executor if my
> understanding is correct
>
> I would have used the second possibility, so I don't create a new
> connection for a partition each time the partition fails to compute for
> whatever reason.  I also don't have a lot of connections in parallel
> because I have only one connection per worker. If I have 200 partitions in
> parallel, that makes 200 connections.
> But in the second case a partition could kill the connection on the worker
> during computation and because that connection is shared for all tasks of
> the executor, all partitions would fail. Also, only one connection object
> would have to manage 200 partitions trying to output to
> Elasticsearch/database/etc...that may be bad performance-wise.
>
> Can't see a case where second is preferable for now. Doesn't seem I could
> use that singleton object to share data within an executor sadly...
>
> Thanks for the input
> Fanilo
>
>
> -----Message d'origine-----
> De : Sean Owen [mailto:so...@cloudera.com]
> Envoyé : jeudi 4 septembre 2014 15:36
> À : Andrianasolo Fanilo
> Cc : user@spark.apache.org
> Objet : Re: Object serialisation inside closures
>
> In your original version, the object is referenced by the function but
> it's on the driver, and so has to be serialized. This leads to an error
> since it's not serializable. Instead, you want to recreate the object
> locally on each of the remote machines.
>
> In your third version you are holding the parser in a static member of a
> class, in your Scala object. When you call the parse method, you're calling
> it on the instance of the CSVParserPlus class that was loaded on the remote
> worker. It loads and creates its own copy of the parser.
>
> A maybe more compact solution is to use mapPartitions, and create the
> parser once at the start. This avoids needing this static / singleton
> pattern, but also means the parser is created only once per partition.
>
> On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo <
> fanilo.andrianas...@worldline.com> wrote:
> > Hello Spark fellows J
> >
> >
> >
> > I’m a new user of Spark and Scala and have been using both for 6
> > months without too many problems.
> >
> > Here I’m looking for best practices for using non-serializable classes
> > inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
> >
> >
> >
> > Suppose I am using OpenCSV parser to parse an input file. So inside my
> > main
> > :
> >
> >
> >
> > val sc = new SparkContext("local[2]", "App")
> >
> > val heyRDD = sc.textFile("…")
> >
> >
> >
> > val csvparser = new CSVParser(';')
> >
> > val heyMap = heyRDD.map { line =>
> >
> >       val temp = csvparser.parseLine(line)
> >
> >       (temp(1), temp(4))
> >
> > }
> >
> >
> >
> >
> >
> > This gives me a java.io.NotSerializableException:
> > au.com.bytecode.opencsv.CSVParser, which seems reasonable.
> >
> >
> >
> > From here I could see 3 solutions :
> >
> > 1/ Extending CSVParser with Serialisable properties, which adds a lot
> > of boilerplate code if you ask me
> >
> > 2/ Using Kryo Serialization (still need to define a serializer)
> >
> > 3/ Creating an object with an instance of the class I want to use,
> > typically
> > :
> >
> >
> >
> > object CSVParserPlus {
> >
> >
> >
> >   val csvParser = new CSVParser(';')
> >
> >
> >
> >   def parse(line: String) = {
> >
> >     csvParser.parseLine(line)
> >
> >   }
> >
> > }
> >
> >
> >
> >
> >
> >     val heyMap = heyRDD.map { line =>
> >
> >       val temp = CSVParserPlus.parse(line)
> >
> >       (temp(1), temp(4))
> >
> >     }
> >
> >
> >
> > Third solution works and I don’t get how, so I was wondering how
> > worked the closure system inside Spark to be able to serialize an
> > object with a non-serializable instance. How does that work ? Does it
> hinder performance ?
> > Is it a good solution ? How do you manage this problem ?
> >
> >
> >
> > Any input would be greatly appreciated
> >
> >
> >
> > Best regards,
> >
> > Fanilo
> >
> >
> > ________________________________
> >
> > Ce message et les pièces jointes sont confidentiels et réservés à
> > l'usage exclusif de ses destinataires. Il peut également être protégé
> > par le secret professionnel. Si vous recevez ce message par erreur,
> > merci d'en avertir immédiatement l'expéditeur et de le détruire.
> > L'intégrité du message ne pouvant être assurée sur Internet, la
> > responsabilité de Worldline ne pourra être recherchée quant au contenu
> > de ce message. Bien que les meilleurs efforts soient faits pour
> > maintenir cette transmission exempte de tout virus, l'expéditeur ne
> > donne aucune garantie à cet égard et sa responsabilité ne saurait être
> > recherchée pour tout dommage résultant d'un virus transmis.
> >
> > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive
> > this e-mail in error, please notify the sender immediately and destroy
> > it. As its integrity cannot be secured on the Internet, the Worldline
> > liability cannot be triggered for the message content. Although the
> > sender endeavours to maintain a computer virus-free network, the
> > sender does not warrant that this transmission is virus-free and will
> > not be liable for any damages resulting from any virus transmitted.
>
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>

Reply via email to