I have seen that the problem is on the Geohash class that can not be picked.. but in groupByKey i use an other custom class an there is no problem...
2015-11-06 13:44 GMT+01:00 Iker Perez de Albeniz <iker.pe...@fon.com>: > Hi All, > > I am new at this list. Before sending this mail i have searched on archive > but i have not found a solution for me. > > i am using spark to process user locations based on RSSI. My spark script > look like this.. > > text_files = sc.textFile(','.join(files[startime])) > > result = text_files.flatMap( > lambda x : s3_files_to_records(x, startime) # Load from JSON > converted CSV logs > ).filter( > lambda x: x['mac_time'] # Filter empty fusids > ).map( > lambda x: (x['mac_time'], x)# Generate K-V store > ).combineByKey( > preprocess_create_combinator, > preprocess_merge_value, > preprocess_merge_combiners > ) > > Using combineByKey i bilaterate/trilaterate to get device positions > > so if a do a > > result.collect() > > i get something like this: > > [ > (u'B4:3A:28:2A:AF:84_1444299600', {'latitude': 43.348926, 'iters': 1, > 'longitude': -3.011294, 'error': 388.82514299844314}), > (u'30:A8:DB:9F:A0:35_1444299600', {'latitude': 43.348926, 'iters': 2, > 'longitude': -3.011294, 'error': 61.62463221959518}), > (u'00:18:DE:94:F2:DF_1444299600', {'latitude': 43.348679, 'iters': 1, > 'longitude': -3.010883, 'error': 436.2689859408533}), > (u'98:03:D8:65:E5:94_1444299600', {'latitude': 43.348722, 'iters': 1, > 'longitude': -3.011031, 'error': 346.54077346735033}), > (u'68:DF:DD:EF:ED:21_1444299600', {'latitude': 43.348722, 'iters': 1, > 'longitude': -3.011031, 'error': 436.2689859408533}) > ] > > I want to conver rdd record to object again to the start an other > map/filter/map/combinebykey process from that result so for that i have > creaded a new function that converts every record to a new tuple > > def convert_rdd_to_object(rdd): > geohash = Geohash() > > result_object = rdd[1] > result_object["mac"] = rdd[0].split("_")[0] > result_object["timestamp"] = rdd[0].split("_")[1] > result_object["geohash"] = > str(geohash.encode(result_object["latitude"],result_object["longitude"])) > return (result_object["mac"], result_object) > > the idea was to get something like this: > > [ > (u'B4:3A:28:2A:AF:84', {'latitude': 43.348926, 'iters': 1, > 'longitude': -3.011294, 'error': 388.82514299844314, 'mac': > 'B4:3A:28:2A:AF:84', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), > (u'30:A8:DB:9F:A0:35', {'latitude': 43.348926, 'iters': 2, > 'longitude': -3.011294, 'error': 61.62463221959518, 'mac': > '30:A8:DB:9F:A0:35', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), > (u'00:18:DE:94:F2:DF', {'latitude': 43.348679, 'iters': 1, > 'longitude': -3.010883, 'error': 436.2689859408533, 'mac': > '00:18:DE:94:F2:DF', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), > (u'98:03:D8:65:E5:94', {'latitude': 43.348722, 'iters': 1, > 'longitude': -3.011031, 'error': 346.54077346735033, 'mac': > '98:03:D8:65:E5:94', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), > (u'68:DF:DD:EF:ED:21', {'latitude': 43.348722, 'iters': 1, > 'longitude': -3.011031, 'error': 436.2689859408533, 'mac': > '68:DF:DD:EF:ED:21', 'timestamp': 1444299600, 'geohash': rwqerqwerw}) > ] > > > So mi attempt was to add an other map at the end > > result = text_files.flatMap( > lambda x : s3_files_to_records(x, startime) # Load from JSON > converted CSV logs > ).filter( > lambda x: x['mac_time'] # Filter empty fusids > ).map( > lambda x: (x['mac_time'], x)# Generate K-V store > ).combineByKey( > preprocess_create_combinator, > preprocess_merge_value, > preprocess_merge_combiners > ).map( > lambda x : convert_rdd_to_object(x) > ) > > print result.collect() > > but i am getting this error: > > Traceback (most recent call last): > File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 758, in > <module> > main() > File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 754, in > main > spark_pipeline("/home/iker/Workspace/scripts/1443901500CSV", > "/home/iker/Workspace/scripts/") > File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 707, in > spark_pipeline > print result.collect() > File > "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", > line 757, in collect > File > "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", > line 2363, in _jrdd > File > "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", > line 2283, in _prepare_for_python_RDD > File > "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", > line 427, in dumps > File > "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 622, in dumps > File > "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 107, in dump > File "/usr/lib/python2.7/pickle.py", line 224, in dump > self.save(obj) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 199, in save_function > File > "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 236, in save_function_tuple > > > i do not know if the problems is that i am not understanding how spark > works or what else but i do not see how to make it work and continue > map/filter/reducing the data i several concatenated steps. > > > Regards, > > -- > [image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D > Engineer / Technical Lead+34 946545843Skype: iker.perez.fonAll > information in this email is confidential > <http://corp.fon.com/legal/email-disclaimer> > > -- [image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D Engineer / Technical Lead+34 946545843Skype: iker.perez.fonAll information in this email is confidential <http://corp.fon.com/legal/email-disclaimer>