Thanks a lot, sorry for the really late reply! (Didn't have my laptop)
This is working, but it's dreadfully slow and seems to not run in parallel?
On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:
You need to use mapPartitions (or foreachPartition) to instantiate your
client in each partition as it is not serializable by the pickle library.
Something like
def mapper(iter):
db = MongoClient()['spark_test_db']
*collec = db['programs']*
*for val in iter:*
asc = val.encode('ascii','ignore')
json = convertToJSON(asc, indexMap)
yield collec.insert(json)
def convertToJSON(string, indexMap):
values = string.strip().split(,)
json = {}
for i in range(len(values)):
json[indexMap[i]] = values[i]
return json
*doc_ids = data.mapPartitions(mapper)*
On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist
mailinglistsama...@gmail.com wrote:
db = MongoClient()['spark_test_db']
*collec = db['programs']*
def mapper(val):
asc = val.encode('ascii','ignore')
json = convertToJSON(asc, indexMap)
collec.insert(json) # *this is not working*
def convertToJSON(string, indexMap):
values = string.strip().split(,)
json = {}
for i in range(len(values)):
json[indexMap[i]] = values[i]
return json
*jsons = data.map(mapper)*
*The last line does the mapping. I am very new to Spark, can you explain
what explicit serialization, etc is in the context of spark? The error I
am getting:*
*Traceback (most recent call last): File stdin, line 1, in module
File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in
saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File
/usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd
pickled_command = CloudPickleSerializer().dumps(command) File
/usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps
def dumps(self, obj): return cloudpickle.dumps(obj, 2) File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps
cp.dump(obj) File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump
return pickle.Pickler.dump(self, obj) File
/usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File
/usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
unbound method with explicit self File /usr/lib/python2.7/pickle.py,
line 548, in save_tuple save(element) File
/usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
unbound method with explicit self File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
save_function self.save_function_tuple(obj, [themodule]) File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py,
line 286, in save f(self, obj) # Call unbound method with explicit
self File /usr/lib/python2.7/pickle.py, line 600, in save_list
self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line
633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py,
line 286, in savef(self, obj) # Call unbound method with explicit self
File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
save_function self.save_function_tuple(obj, [themodule]) File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py,
line 286, in save f(self, obj) # Call unbound method with explicit
self File /usr/lib/python2.7/pickle.py, line 600, in save_list
self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line
636, in _batch_appends save(tmp[0]) File
/usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
unbound method with explicit self File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in
save_function self.save_function_tuple(obj, modList) File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in
save_function_tuplesave(f_globals) File
/usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call
unbound method with explicit self File
/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in
save_dictpickle.Pickler.save_dict(self, obj) File
/usr/lib/python2.7/pickle.py, line 649, in save_dict
self._batch_setitems(obj.iteritems()) File /usr/lib/python2.7/pickle.py,
line 681, in _batch_setitems save(v) File
/usr/lib/python2.7/pickle.py, line 306, in saverv =
reduce(self.proto) File
/usr/local/lib/python2.7/dist-packages/pymongo/collection.py, line 1489,
in __call__ self.__name.split(.)[-1])TypeError: 'Collection' object
is not callable. If you meant to call the '__getnewargs__' method on a
'Collection' object it is failing because no such method exists. *
On Sat, May 17, 2014 at 9:30 PM