Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel?
On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > You need to use mapPartitions (or foreachPartition) to instantiate your > client in each partition as it is not serializable by the pickle library. > Something like > > def mapper(iter): > db = MongoClient()['spark_test_db'] > * collec = db['programs']* > * for val in iter:* > asc = val.encode('ascii','ignore') > json = convertToJSON(asc, indexMap) > yield collec.insert(json) > > > > def convertToJSON(string, indexMap): > values = string.strip().split(",") > json = {} > for i in range(len(values)): > json[indexMap[i]] = values[i] > return json > > *doc_ids = data.mapPartitions(mapper)* > > > > > On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist < > mailinglistsama...@gmail.com> wrote: > >> db = MongoClient()['spark_test_db'] >> *collec = db['programs']* >> >> def mapper(val): >> asc = val.encode('ascii','ignore') >> json = convertToJSON(asc, indexMap) >> collec.insert(json) # *this is not working* >> >> def convertToJSON(string, indexMap): >> values = string.strip().split(",") >> json = {} >> for i in range(len(values)): >> json[indexMap[i]] = values[i] >> return json >> >> *jsons = data.map(mapper)* >> >> >> >> *The last line does the mapping. I am very new to Spark, can you explain >> what explicit serialization, etc is in the context of spark? The error I >> am getting:* >> *Traceback (most recent call last): File "<stdin>", line 1, in <module> >> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in >> saveAsTextFile >> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File >> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd >> pickled_command = CloudPickleSerializer().dumps(command) File >> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps >> def dumps(self, obj): return cloudpickle.dumps(obj, 2) File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps >> cp.dump(obj) File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump >> return pickle.Pickler.dump(self, obj) File >> "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File >> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >> unbound method with explicit self File "/usr/lib/python2.7/pickle.py", >> line 548, in save_tuple save(element) File >> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >> unbound method with explicit self File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in >> save_function self.save_function_tuple(obj, [themodule]) File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in >> save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", >> line 286, in save f(self, obj) # Call unbound method with explicit >> self File "/usr/lib/python2.7/pickle.py", line 600, in save_list >> self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line >> 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", >> line 286, in save f(self, obj) # Call unbound method with explicit self >> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in >> save_function self.save_function_tuple(obj, [themodule]) File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in >> save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", >> line 286, in save f(self, obj) # Call unbound method with explicit >> self File "/usr/lib/python2.7/pickle.py", line 600, in save_list >> self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line >> 636, in _batch_appends save(tmp[0]) File >> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >> unbound method with explicit self File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in >> save_function self.save_function_tuple(obj, modList) File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in >> save_function_tuple save(f_globals) File >> "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call >> unbound method with explicit self File >> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in >> save_dict pickle.Pickler.save_dict(self, obj) File >> "/usr/lib/python2.7/pickle.py", line 649, in save_dict >> self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", >> line 681, in _batch_setitems save(v) File >> "/usr/lib/python2.7/pickle.py", line 306, in save rv = >> reduce(self.proto) File >> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489, >> in __call__ self.__name.split(".")[-1])TypeError: 'Collection' object >> is not callable. If you meant to call the '__getnewargs__' method on a >> 'Collection' object it is failing because no such method exists. * >> >> >> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rust...@gmail.com> >> wrote: >> >>> You have to ideally pass the mongoclient object along with your data in >>> the mapper(python should be try to serialize your mongoclient, but explicit >>> is better).... >>> if client is serializable then all should end well.. if not then you are >>> better off using map partition & initilizing the driver in each iteration & >>> load data of each partition. Thr is a similar discussion in the list in the >>> past. >>> Regards >>> Mayur >>> >>> Mayur Rustagi >>> Ph: +1 (760) 203 3257 >>> http://www.sigmoidanalytics.com >>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>> >>> >>> >>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas < >>> nicholas.cham...@gmail.com> wrote: >>> >>>> Where's your driver code (the code interacting with the RDDs)? Are you >>>> getting serialization errors? >>>> >>>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsama...@gmail.com>님이 >>>> 작성한 메시지: >>>> >>>> Hi all, >>>>> >>>>> I am trying to store the results of a reduce into mongo. >>>>> I want to share the variable "collection" in the mappers. >>>>> >>>>> >>>>> Here's what I have so far (I'm using pymongo) >>>>> >>>>> db = MongoClient()['spark_test_db'] >>>>> collec = db['programs'] >>>>> db = MongoClient()['spark_test_db'] >>>>> *collec = db['programs']* >>>>> >>>>> def mapper(val): >>>>> asc = val.encode('ascii','ignore') >>>>> json = convertToJSON(asc, indexMap) >>>>> collec.insert(json) # *this is not working* >>>>> >>>>> def convertToJSON(string, indexMap): >>>>> values = string.strip().split(",") >>>>> json = {} >>>>> for i in range(len(values)): >>>>> json[indexMap[i]] = values[i] >>>>> return json >>>>> >>>>> How do I do this? >>>>> >>>> >>> >> >