Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

This is working, but it's dreadfully slow and seems to not run in parallel?


On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> You need to use mapPartitions (or foreachPartition) to instantiate your
> client in each partition as it is not serializable by the pickle library.
> Something like
>
> def mapper(iter):
>     db = MongoClient()['spark_test_db']
> *    collec = db['programs']*
> *    for val in iter:*
>         asc = val.encode('ascii','ignore')
>         json = convertToJSON(asc, indexMap)
>         yield collec.insert(json)
>
>
>
> def convertToJSON(string, indexMap):
>     values = string.strip().split(",")
>     json = {}
>     for i in range(len(values)):
>         json[indexMap[i]] = values[i]
>     return json
>
> *doc_ids = data.mapPartitions(mapper)*
>
>
>
>
> On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist <
> mailinglistsama...@gmail.com> wrote:
>
>> db = MongoClient()['spark_test_db']
>> *collec = db['programs']*
>>
>> def mapper(val):
>>     asc = val.encode('ascii','ignore')
>>     json = convertToJSON(asc, indexMap)
>>     collec.insert(json) # *this is not working*
>>
>> def convertToJSON(string, indexMap):
>>     values = string.strip().split(",")
>>      json = {}
>>     for i in range(len(values)):
>>         json[indexMap[i]] = values[i]
>>     return json
>>
>> *jsons = data.map(mapper)*
>>
>>
>>
>> *The last line does the mapping. I am very new to Spark, can you explain
>> what explicit serialization, etc is in the context of spark?  The error I
>> am getting:*
>> *Traceback (most recent call last):  File "<stdin>", line 1, in <module>
>> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in
>> saveAsTextFile
>> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
>> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd
>> pickled_command = CloudPickleSerializer().dumps(command)   File
>> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps
>>   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps
>>     cp.dump(obj)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump
>>   return pickle.Pickler.dump(self, obj)  File
>> "/usr/lib/python2.7/pickle.py", line 224, in dump     self.save(obj)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File "/usr/lib/python2.7/pickle.py",
>> line 548, in save_tuple     save(element)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>> save_function     self.save_function_tuple(obj, [themodule])  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save     f(self, obj) # Call unbound method with explicit
>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>> 633, in _batch_appends     save(x)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save    f(self, obj) # Call unbound method with explicit self
>> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>> save_function     self.save_function_tuple(obj, [themodule])  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save     f(self, obj) # Call unbound method with explicit
>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>> 636, in _batch_appends     save(tmp[0])  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in
>> save_function     self.save_function_tuple(obj, modList)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in
>> save_function_tuple    save(f_globals)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save     f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in
>> save_dict    pickle.Pickler.save_dict(self, obj)   File
>> "/usr/lib/python2.7/pickle.py", line 649, in save_dict
>> self._batch_setitems(obj.iteritems())  File "/usr/lib/python2.7/pickle.py",
>> line 681, in _batch_setitems     save(v)  File
>> "/usr/lib/python2.7/pickle.py", line 306, in save    rv =
>> reduce(self.proto)  File
>> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489,
>> in __call__     self.__name.split(".")[-1])TypeError: 'Collection' object
>> is not callable. If you meant to call the '__getnewargs__' method on a
>> 'Collection' object it is failing because no such method exists. *
>>
>>
>> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rust...@gmail.com>
>> wrote:
>>
>>> You have to ideally pass the mongoclient object along with your data in
>>> the mapper(python should be try to serialize your mongoclient, but explicit
>>> is better)....
>>> if client is serializable then all should end well.. if not then you are
>>> better off using map partition & initilizing the driver in each iteration &
>>> load data of each partition. Thr is a similar discussion in the list in the
>>> past.
>>> Regards
>>> Mayur
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
>>>> Where's your driver code (the code interacting with the RDDs)? Are you
>>>> getting serialization errors?
>>>>
>>>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsama...@gmail.com>님이
>>>> 작성한 메시지:
>>>>
>>>> Hi all,
>>>>>
>>>>> I am trying to store the results of a reduce into mongo.
>>>>> I want to share the variable "collection" in the mappers.
>>>>>
>>>>>
>>>>> Here's what I have so far (I'm using pymongo)
>>>>>
>>>>> db = MongoClient()['spark_test_db']
>>>>> collec = db['programs']
>>>>> db = MongoClient()['spark_test_db']
>>>>> *collec = db['programs']*
>>>>>
>>>>> def mapper(val):
>>>>>     asc = val.encode('ascii','ignore')
>>>>>     json = convertToJSON(asc, indexMap)
>>>>>     collec.insert(json) # *this is not working*
>>>>>
>>>>> def convertToJSON(string, indexMap):
>>>>>     values = string.strip().split(",")
>>>>>     json = {}
>>>>>     for i in range(len(values)):
>>>>>         json[indexMap[i]] = values[i]
>>>>>     return json
>>>>>
>>>>> How do I do this?
>>>>>
>>>>
>>>
>>
>

Reply via email to