[ https://issues.apache.org/jira/browse/SPARK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076849#comment-14076849 ]
Davies Liu commented on SPARK-1687: ----------------------------------- Dill is implemented in pure Python, so it will have similar performance with pickle, but much slower than cPickle, which is the default serializer we use as default. So we could not switch the the default serializer to Dill. We could provide an customized namedtuple (which can be serialized by cPickle), also replace the one in collections with it. I will send an PR, if it make sense. > Support NamedTuples in RDDs > --------------------------- > > Key: SPARK-1687 > URL: https://issues.apache.org/jira/browse/SPARK-1687 > Project: Spark > Issue Type: Improvement > Components: PySpark > Affects Versions: 1.0.0 > Environment: Spark version 1.0.0-SNAPSHOT > Python 2.7.5 > Reporter: Pat McDonough > Assignee: Kan Zhang > > Add Support for NamedTuples in RDDs. Some sample code is below, followed by > the current error that comes from it. > Based on a quick conversation with [~ahirreddy], > [Dill|https://github.com/uqfoundation/dill] might be a good solution here. > {code} > In [26]: from collections import namedtuple > ... > In [33]: Person = namedtuple('Person', 'id firstName lastName') > In [34]: jon = Person(1, "Jon", "Doe") > In [35]: jane = Person(2, "Jane", "Doe") > In [36]: theDoes = sc.parallelize((jon, jane)) > In [37]: theDoes.collect() > Out[37]: > [Person(id=1, firstName='Jon', lastName='Doe'), > Person(id=2, firstName='Jane', lastName='Doe')] > In [38]: theDoes.count() > PySpark worker failed with exception: > PySpark worker failed with exception: > Traceback (most recent call last): > File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func > def func(s, iterator): return f(iterator) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <lambda> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <genexpr> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, > in load_stream > yield self._read_with_length(stream) > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, > in _read_with_length > return self.loads(obj) > AttributeError: 'module' object has no attribute 'Person' > Traceback (most recent call last): > File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func > def func(s, iterator): return f(iterator) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <lambda> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <genexpr> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, > in load_stream > yield self._read_with_length(stream) > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, > in _read_with_length > return self.loads(obj) > AttributeError: 'module' object has no attribute 'Person' > 14/04/30 14:43:53 ERROR Executor: Exception in task ID 23 > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func > def func(s, iterator): return f(iterator) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <lambda> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <genexpr> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, > in load_stream > yield self._read_with_length(stream) > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, > in _read_with_length > return self.loads(obj) > AttributeError: 'module' object has no attribute 'Person' > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190) > at > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > 14/04/30 14:43:53 ERROR Executor: Exception in task ID 21 > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func > def func(s, iterator): return f(iterator) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <lambda> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <genexpr> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, > in load_stream > yield self._read_with_length(stream) > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, > in _read_with_length > return self.loads(obj) > AttributeError: 'module' object has no attribute 'Person' > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190) > at > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > 14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting > job > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-38-8689b264fa46> in <module>() > ----> 1 theDoes.count() > /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self) > 706 3 > 707 """ > --> 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > 709 > 710 def stats(self): > /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self) > 697 6.0 > 698 """ > --> 699 return self.mapPartitions(lambda x: > [sum(x)]).reduce(operator.add) > 700 > 701 def count(self): > /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f) > 617 if acc is not None: > 618 yield acc > --> 619 vals = self.mapPartitions(func).collect() > 620 return reduce(f, vals) > 621 > /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self) > 581 """ > 582 with _JavaStackTrace(self.context) as st: > --> 583 bytesInJava = self._jrdd.collect().iterator() > 584 return list(self._collect_iterator_through_file(bytesInJava)) > 585 > /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 535 answer = self.gateway_client.send_command(command) > 536 return_value = get_return_value(answer, self.gateway_client, > --> 537 self.target_id, self.name) > 538 > 539 for temp_arg in temp_args: > /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > Py4JJavaError: An error occurred while calling o53.collect. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on > host localhost: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in > pipeline_func > return func(split, prev_func(split, iterator)) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func > def func(s, iterator): return f(iterator) > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <lambda> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in > <genexpr> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, > in load_stream > yield self._read_with_length(stream) > File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, > in _read_with_length > return self.loads(obj) > AttributeError: 'module' object has no attribute 'Person' > > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190) > > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214) > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > org.apache.spark.scheduler.Task.run(Task.scala:51) > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210) > > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)