Hello, `itertools.groupby` is evaluated lazily and the `g`s in your code are generators not lists. This might cause your problem. Casting everything to lists might help here, e.g.:
grp2 = [(k, list(g)) for k,g in groupby(grp1, lambda e: e[1])] HTH Eike 2016-08-05 7:31 GMT+02:00 林家銘 <robin890...@gmail.com>: > Hi > I wrote a map function to aggregate data in a partition, and this function > using itertools.groupby for more than twice, then there comes the pickle > error . > > Here is what I do > > ===Driver Code=== > pair_count = df.mapPartitions(lambda iterable: pair_func_cnt(iterable)) > pair_count.collection() > > ===Map Function === > def pair_func_cnt(iterable): > from itertools import groupby > > ls = [[1,2,3],[1,2,5],[1,3,5],[2,4,6]] > grp1 = [(k,g) for k,g in groupby(ls, lambda e: e[0])] > grp2 = [(k,g) for k,g in groupby(grp1, lambda e: e[1])] > return iter(grp2) > > ===Error Message=== > > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", > line 111, in main > process() > File > "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", > line 106, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", > line 267, in dump_stream > bytes = self.serializer.dumps(vs) > File > "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", > line 415, in dumps > return pickle.dumps(obj, protocol)PicklingError: Can't pickle <type > 'itertools._grouper'>: attribute lookup itertools._grouper failed > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) > at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > >