You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like
def mapper(iter): db = MongoClient()['spark_test_db'] * collec = db['programs']* * for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(",") json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist < mailinglistsama...@gmail.com> wrote: > db = MongoClient()['spark_test_db'] > *collec = db['programs']* > > def mapper(val): > asc = val.encode('ascii','ignore') > json = convertToJSON(asc, indexMap) > collec.insert(json) # *this is not working* > > def convertToJSON(string, indexMap): > values = string.strip().split(",") > json = {} > for i in range(len(values)): > json[indexMap[i]] = values[i] > return json > > *jsons = data.map(mapper)* > > > > *The last line does the mapping. I am very new to Spark, can you explain > what explicit serialization, etc is in the context of spark? The error I > am getting:* > *Traceback (most recent call last): File "<stdin>", line 1, in <module> > File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in > saveAsTextFile > keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File > "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd > pickled_command = CloudPickleSerializer().dumps(command) File > "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps > def dumps(self, obj): return cloudpickle.dumps(obj, 2) File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps > cp.dump(obj) File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump > return pickle.Pickler.dump(self, obj) File > "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File > "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call > unbound method with explicit self File "/usr/lib/python2.7/pickle.py", > line 548, in save_tuple save(element) File > "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call > unbound method with explicit self File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in > save_function self.save_function_tuple(obj, [themodule]) File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in > save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", > line 286, in save f(self, obj) # Call unbound method with explicit > self File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line > 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", > line 286, in save f(self, obj) # Call unbound method with explicit self > File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in > save_function self.save_function_tuple(obj, [themodule]) File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in > save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", > line 286, in save f(self, obj) # Call unbound method with explicit > self File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line > 636, in _batch_appends save(tmp[0]) File > "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call > unbound method with explicit self File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in > save_function self.save_function_tuple(obj, modList) File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in > save_function_tuple save(f_globals) File > "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call > unbound method with explicit self File > "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in > save_dict pickle.Pickler.save_dict(self, obj) File > "/usr/lib/python2.7/pickle.py", line 649, in save_dict > self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", > line 681, in _batch_setitems save(v) File > "/usr/lib/python2.7/pickle.py", line 306, in save rv = > reduce(self.proto) File > "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489, > in __call__ self.__name.split(".")[-1])TypeError: 'Collection' object > is not callable. If you meant to call the '__getnewargs__' method on a > 'Collection' object it is failing because no such method exists. * > > > On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rust...@gmail.com>wrote: > >> You have to ideally pass the mongoclient object along with your data in >> the mapper(python should be try to serialize your mongoclient, but explicit >> is better).... >> if client is serializable then all should end well.. if not then you are >> better off using map partition & initilizing the driver in each iteration & >> load data of each partition. Thr is a similar discussion in the list in the >> past. >> Regards >> Mayur >> >> Mayur Rustagi >> Ph: +1 (760) 203 3257 >> http://www.sigmoidanalytics.com >> @mayur_rustagi <https://twitter.com/mayur_rustagi> >> >> >> >> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> Where's your driver code (the code interacting with the RDDs)? Are you >>> getting serialization errors? >>> >>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsama...@gmail.com>님이 >>> 작성한 메시지: >>> >>> Hi all, >>>> >>>> I am trying to store the results of a reduce into mongo. >>>> I want to share the variable "collection" in the mappers. >>>> >>>> >>>> Here's what I have so far (I'm using pymongo) >>>> >>>> db = MongoClient()['spark_test_db'] >>>> collec = db['programs'] >>>> db = MongoClient()['spark_test_db'] >>>> *collec = db['programs']* >>>> >>>> def mapper(val): >>>> asc = val.encode('ascii','ignore') >>>> json = convertToJSON(asc, indexMap) >>>> collec.insert(json) # *this is not working* >>>> >>>> def convertToJSON(string, indexMap): >>>> values = string.strip().split(",") >>>> json = {} >>>> for i in range(len(values)): >>>> json[indexMap[i]] = values[i] >>>> return json >>>> >>>> How do I do this? >>>> >>> >> >