I have seen that the problem is on the Geohash class that can not be
picked..  but in groupByKey i use an other custom class an there is no
problem...

2015-11-06 13:44 GMT+01:00 Iker Perez de Albeniz <iker.pe...@fon.com>:

> Hi All,
>
> I am new at this list. Before sending this mail i have searched on archive
> but i have not found a solution for me.
>
> i am using spark to process user locations based on RSSI. My spark script
> look like this..
>
> text_files = sc.textFile(','.join(files[startime]))
>
>         result = text_files.flatMap(
>             lambda x : s3_files_to_records(x, startime) # Load from JSON
> converted CSV logs
>         ).filter(
>             lambda x: x['mac_time'] # Filter empty fusids
>         ).map(
>             lambda x: (x['mac_time'], x)# Generate K-V store
>         ).combineByKey(
>             preprocess_create_combinator,
>             preprocess_merge_value,
>             preprocess_merge_combiners
>         )
>
> Using combineByKey i bilaterate/trilaterate to get device positions
>
> so if a do a
>
>         result.collect()
>
> i get something like this:
>
> [
>     (u'B4:3A:28:2A:AF:84_1444299600', {'latitude': 43.348926, 'iters': 1,
> 'longitude': -3.011294, 'error': 388.82514299844314}),
>     (u'30:A8:DB:9F:A0:35_1444299600', {'latitude': 43.348926, 'iters': 2,
> 'longitude': -3.011294, 'error': 61.62463221959518}),
>     (u'00:18:DE:94:F2:DF_1444299600', {'latitude': 43.348679, 'iters': 1,
> 'longitude': -3.010883, 'error': 436.2689859408533}),
>     (u'98:03:D8:65:E5:94_1444299600', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 346.54077346735033}),
>     (u'68:DF:DD:EF:ED:21_1444299600', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 436.2689859408533})
> ]
>
> I want to conver rdd record to object again to the start an other
> map/filter/map/combinebykey process from that result so for that i have
> creaded a new function that converts every record to a new tuple
>
> def convert_rdd_to_object(rdd):
>     geohash = Geohash()
>
>     result_object = rdd[1]
>     result_object["mac"] = rdd[0].split("_")[0]
>     result_object["timestamp"] = rdd[0].split("_")[1]
>     result_object["geohash"] =
> str(geohash.encode(result_object["latitude"],result_object["longitude"]))
>     return (result_object["mac"], result_object)
>
> the idea was to get something like this:
>
> [
>     (u'B4:3A:28:2A:AF:84', {'latitude': 43.348926, 'iters': 1,
> 'longitude': -3.011294, 'error': 388.82514299844314, 'mac':
> 'B4:3A:28:2A:AF:84', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
>     (u'30:A8:DB:9F:A0:35', {'latitude': 43.348926, 'iters': 2,
> 'longitude': -3.011294, 'error': 61.62463221959518, 'mac':
> '30:A8:DB:9F:A0:35', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
>     (u'00:18:DE:94:F2:DF', {'latitude': 43.348679, 'iters': 1,
> 'longitude': -3.010883, 'error': 436.2689859408533, 'mac':
> '00:18:DE:94:F2:DF', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
>     (u'98:03:D8:65:E5:94', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 346.54077346735033, 'mac':
> '98:03:D8:65:E5:94', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
>     (u'68:DF:DD:EF:ED:21', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 436.2689859408533, 'mac':
> '68:DF:DD:EF:ED:21', 'timestamp': 1444299600, 'geohash': rwqerqwerw})
> ]
>
>
> So mi attempt was to add an other map at the end
>
>        result = text_files.flatMap(
>             lambda x : s3_files_to_records(x, startime) # Load from JSON
> converted CSV logs
>         ).filter(
>             lambda x: x['mac_time'] # Filter empty fusids
>         ).map(
>             lambda x: (x['mac_time'], x)# Generate K-V store
>         ).combineByKey(
>             preprocess_create_combinator,
>             preprocess_merge_value,
>             preprocess_merge_combiners
>         ).map(
>             lambda x : convert_rdd_to_object(x)
>         )
>
>         print result.collect()
>
> but i am getting this error:
>
> Traceback (most recent call last):
>   File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 758, in
> <module>
>     main()
>   File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 754, in
> main
>     spark_pipeline("/home/iker/Workspace/scripts/1443901500CSV",
> "/home/iker/Workspace/scripts/")
>   File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 707, in
> spark_pipeline
>     print result.collect()
>   File
> "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
> line 757, in collect
>   File
> "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
> line 2363, in _jrdd
>   File
> "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
> line 2283, in _prepare_for_python_RDD
>   File
> "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 427, in dumps
>   File
> "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 622, in dumps
>   File
> "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 107, in dump
>   File "/usr/lib/python2.7/pickle.py", line 224, in dump
>     self.save(obj)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File
> "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 199, in save_function
>   File
> "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 236, in save_function_tuple
>
>
> i do not know if the problems is that i am not understanding how spark
> works or what else but i do not see how to make it work and continue
> map/filter/reducing the data i several concatenated steps.
>
>
> Regards,
>
> --
> [image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D
> Engineer / Technical Lead+34 946545843Skype: iker.perez.fonAll
> information in this email is confidential
> <http://corp.fon.com/legal/email-disclaimer>
>
>


-- 
[image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D Engineer
/ Technical Lead+34 946545843Skype: iker.perez.fonAll information in this
email is confidential <http://corp.fon.com/legal/email-disclaimer>

Reply via email to