I faced the same problem and ended up using the same approach that Sean suggested https://github.com/AyasdiOpenSource/df/blob/master/src/main/scala/com/ayasdi/df/DF.scala#L313
Option 3 also seems reasonable. It should create a CSVParser per executor. On Thu, Sep 4, 2014 at 6:58 AM, Andrianasolo Fanilo < fanilo.andrianas...@worldline.com> wrote: > Thank you for the quick answer, looks good to me > > Though that brings me to another question. Suppose we want to open a > connection to a database, an ElasticSearch, etc... > > I now have two proceedings : > 1/ use .mapPartitions and setup the connection at the start of each > partition, so I get a connection per partition > 2/ use a singleton object, which loads a connection per executor if my > understanding is correct > > I would have used the second possibility, so I don't create a new > connection for a partition each time the partition fails to compute for > whatever reason. I also don't have a lot of connections in parallel > because I have only one connection per worker. If I have 200 partitions in > parallel, that makes 200 connections. > But in the second case a partition could kill the connection on the worker > during computation and because that connection is shared for all tasks of > the executor, all partitions would fail. Also, only one connection object > would have to manage 200 partitions trying to output to > Elasticsearch/database/etc...that may be bad performance-wise. > > Can't see a case where second is preferable for now. Doesn't seem I could > use that singleton object to share data within an executor sadly... > > Thanks for the input > Fanilo > > > -----Message d'origine----- > De : Sean Owen [mailto:so...@cloudera.com] > Envoyé : jeudi 4 septembre 2014 15:36 > À : Andrianasolo Fanilo > Cc : user@spark.apache.org > Objet : Re: Object serialisation inside closures > > In your original version, the object is referenced by the function but > it's on the driver, and so has to be serialized. This leads to an error > since it's not serializable. Instead, you want to recreate the object > locally on each of the remote machines. > > In your third version you are holding the parser in a static member of a > class, in your Scala object. When you call the parse method, you're calling > it on the instance of the CSVParserPlus class that was loaded on the remote > worker. It loads and creates its own copy of the parser. > > A maybe more compact solution is to use mapPartitions, and create the > parser once at the start. This avoids needing this static / singleton > pattern, but also means the parser is created only once per partition. > > On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo < > fanilo.andrianas...@worldline.com> wrote: > > Hello Spark fellows J > > > > > > > > I’m a new user of Spark and Scala and have been using both for 6 > > months without too many problems. > > > > Here I’m looking for best practices for using non-serializable classes > > inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2. > > > > > > > > Suppose I am using OpenCSV parser to parse an input file. So inside my > > main > > : > > > > > > > > val sc = new SparkContext("local[2]", "App") > > > > val heyRDD = sc.textFile("…") > > > > > > > > val csvparser = new CSVParser(';') > > > > val heyMap = heyRDD.map { line => > > > > val temp = csvparser.parseLine(line) > > > > (temp(1), temp(4)) > > > > } > > > > > > > > > > > > This gives me a java.io.NotSerializableException: > > au.com.bytecode.opencsv.CSVParser, which seems reasonable. > > > > > > > > From here I could see 3 solutions : > > > > 1/ Extending CSVParser with Serialisable properties, which adds a lot > > of boilerplate code if you ask me > > > > 2/ Using Kryo Serialization (still need to define a serializer) > > > > 3/ Creating an object with an instance of the class I want to use, > > typically > > : > > > > > > > > object CSVParserPlus { > > > > > > > > val csvParser = new CSVParser(';') > > > > > > > > def parse(line: String) = { > > > > csvParser.parseLine(line) > > > > } > > > > } > > > > > > > > > > > > val heyMap = heyRDD.map { line => > > > > val temp = CSVParserPlus.parse(line) > > > > (temp(1), temp(4)) > > > > } > > > > > > > > Third solution works and I don’t get how, so I was wondering how > > worked the closure system inside Spark to be able to serialize an > > object with a non-serializable instance. How does that work ? Does it > hinder performance ? > > Is it a good solution ? How do you manage this problem ? > > > > > > > > Any input would be greatly appreciated > > > > > > > > Best regards, > > > > Fanilo > > > > > > ________________________________ > > > > Ce message et les pièces jointes sont confidentiels et réservés à > > l'usage exclusif de ses destinataires. Il peut également être protégé > > par le secret professionnel. Si vous recevez ce message par erreur, > > merci d'en avertir immédiatement l'expéditeur et de le détruire. > > L'intégrité du message ne pouvant être assurée sur Internet, la > > responsabilité de Worldline ne pourra être recherchée quant au contenu > > de ce message. Bien que les meilleurs efforts soient faits pour > > maintenir cette transmission exempte de tout virus, l'expéditeur ne > > donne aucune garantie à cet égard et sa responsabilité ne saurait être > > recherchée pour tout dommage résultant d'un virus transmis. > > > > This e-mail and the documents attached are confidential and intended > > solely for the addressee; it may also be privileged. If you receive > > this e-mail in error, please notify the sender immediately and destroy > > it. As its integrity cannot be secured on the Internet, the Worldline > > liability cannot be triggered for the message content. Although the > > sender endeavours to maintain a computer virus-free network, the > > sender does not warrant that this transmission is virus-free and will > > not be liable for any damages resulting from any virus transmitted. > > > Ce message et les pièces jointes sont confidentiels et réservés à l'usage > exclusif de ses destinataires. Il peut également être protégé par le secret > professionnel. Si vous recevez ce message par erreur, merci d'en avertir > immédiatement l'expéditeur et de le détruire. L'intégrité du message ne > pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra > être recherchée quant au contenu de ce message. Bien que les meilleurs > efforts soient faits pour maintenir cette transmission exempte de tout > virus, l'expéditeur ne donne aucune garantie à cet égard et sa > responsabilité ne saurait être recherchée pour tout dommage résultant d'un > virus transmis. > > This e-mail and the documents attached are confidential and intended > solely for the addressee; it may also be privileged. If you receive this > e-mail in error, please notify the sender immediately and destroy it. As > its integrity cannot be secured on the Internet, the Worldline liability > cannot be triggered for the message content. Although the sender endeavours > to maintain a computer virus-free network, the sender does not warrant that > this transmission is virus-free and will not be liable for any damages > resulting from any virus transmitted. >