Yes initialization each turn is hard.. you seem to using python. Another risky thing you can try is to serialize the mongoclient object using any serializer (like kryo wrappers in python) & pass it on to mappers.. then in each mapper you'll just have to unserialize it & use it directly... This may or may not work for you depending on internals of Mongodb client.
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist < mailinglistsama...@gmail.com> wrote: > Thanks a lot, sorry for the really late reply! (Didn't have my laptop) > > This is working, but it's dreadfully slow and seems to not run in > parallel? > > > On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: > >> You need to use mapPartitions (or foreachPartition) to instantiate your >> client in each partition as it is not serializable by the pickle library. >> Something like >> >> def mapper(iter): >> db = MongoClient()['spark_test_db'] >> * collec = db['programs']* >> * for val in iter:* >> asc = val.encode('ascii','ignore') >> json = convertToJSON(asc, indexMap) >> yield collec.insert(json) >> >> >> >> def convertToJSON(string, indexMap): >> values = string.strip().split(",") >> json = {} >> for i in range(len(values)): >> json[indexMap[i]] = values[i] >> return json >> >> *doc_ids = data.mapPartitions(mapper)* >> >> >> >> >> On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist < >> mailinglistsama...@gmail.com> wrote: >> >>> db = MongoClient()['spark_test_db'] >>> *collec = db['programs']* >>> >>> def mapper(val): >>> asc = val.encode('ascii','ignore') >>> json = convertToJSON(asc, indexMap) >>> collec.insert(json) # *this is not working* >>> >>> def convertToJSON(string, indexMap): >>> values = string.strip().split(",") >>> json = {} >>> for i in range(len(values)): >>> json[indexMap[i]] = values[i] >>> return json >>> >>> *jsons = data.map(mapper)* >>> >>> >>> >>> *The last line does the mapping. I am very new to Spark, can you explain >>> what explicit serialization, etc is in the context of spark? The error I >>> am getting:* >>> *Traceback (most recent call last): File "<stdin>", line 1, in >>> <module> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in >>> saveAsTextFile >>> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File >>> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd >>> pickled_command = CloudPickleSerializer().dumps(command) File >>> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps >>> def dumps(self, obj): return cloudpickle.dumps(obj, 2) File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps >>> cp.dump(obj) File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump >>> return pickle.Pickler.dump(self, obj) File >>> "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File >>> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >>> unbound method with explicit self File "/usr/lib/python2.7/pickle.py", >>> line 548, in save_tuple save(element) File >>> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >>> unbound method with explicit self File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in >>> save_function self.save_function_tuple(obj, [themodule]) File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in >>> save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", >>> line 286, in save f(self, obj) # Call unbound method with explicit >>> self File "/usr/lib/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line >>> 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", >>> line 286, in save f(self, obj) # Call unbound method with explicit self >>> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in >>> save_function self.save_function_tuple(obj, [themodule]) File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in >>> save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", >>> line 286, in save f(self, obj) # Call unbound method with explicit >>> self File "/usr/lib/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line >>> 636, in _batch_appends save(tmp[0]) File >>> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >>> unbound method with explicit self File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in >>> save_function self.save_function_tuple(obj, modList) File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in >>> save_function_tuple save(f_globals) File >>> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >>> unbound method with explicit self File >>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in >>> save_dict pickle.Pickler.save_dict(self, obj) File >>> "/usr/lib/python2.7/pickle.py", line 649, in save_dict >>> self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", >>> line 681, in _batch_setitems save(v) File >>> "/usr/lib/python2.7/pickle.py", line 306, in save rv = >>> reduce(self.proto) File >>> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489, >>> in __call__ self.__name.split(".")[-1])TypeError: 'Collection' object >>> is not callable. If you meant to call the '__getnewargs__' method on a >>> 'Collection' object it is failing because no such method exists. * >>> >>> >>> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rust...@gmail.com> >>> wrote: >>> >>>> You have to ideally pass the mongoclient object along with your data in >>>> the mapper(python should be try to serialize your mongoclient, but explicit >>>> is better).... >>>> if client is serializable then all should end well.. if not then you >>>> are better off using map partition & initilizing the driver in each >>>> iteration & load data of each partition. Thr is a similar discussion in the >>>> list in the past. >>>> Regards >>>> Mayur >>>> >>>> Mayur Rustagi >>>> Ph: +1 (760) 203 3257 >>>> http://www.sigmoidanalytics.com >>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>> >>>> >>>> >>>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas < >>>> nicholas.cham...@gmail.com> wrote: >>>> >>>>> Where's your driver code (the code interacting with the RDDs)? Are you >>>>> getting serialization errors? >>>>> >>>>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsama...@gmail.com>님이 >>>>> 작성한 메시지: >>>>> >>>>> Hi all, >>>>>> >>>>>> I am trying to store the results of a reduce into mongo. >>>>>> I want to share the variable "collection" in the mappers. >>>>>> >>>>>> >>>>>> Here's what I have so far (I'm using pymongo) >>>>>> >>>>>> db = MongoClient()['spark_test_db'] >>>>>> collec = db['programs'] >>>>>> db = MongoClient()['spark_test_db'] >>>>>> *collec = db['programs']* >>>>>> >>>>>> def mapper(val): >>>>>> asc = val.encode('ascii','ignore') >>>>>> json = convertToJSON(asc, indexMap) >>>>>> collec.insert(json) # *this is not working* >>>>>> >>>>>> def convertToJSON(string, indexMap): >>>>>> values = string.strip().split(",") >>>>>> json = {} >>>>>> for i in range(len(values)): >>>>>> json[indexMap[i]] = values[i] >>>>>> return json >>>>>> >>>>>> How do I do this? >>>>>> >>>>> >>>> >>> >> >