[ 
https://issues.apache.org/jira/browse/SPARK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076849#comment-14076849
 ] 

Davies Liu commented on SPARK-1687:
-----------------------------------

Dill is implemented in pure Python, so it will have similar performance with 
pickle, but much slower than cPickle, which is the default serializer we use as 
default. So we could not switch the the default serializer to Dill.

We could provide an customized namedtuple (which can be serialized by cPickle), 
also replace the one in collections with it.

I will send an PR, if it make sense.

> Support NamedTuples in RDDs
> ---------------------------
>
>                 Key: SPARK-1687
>                 URL: https://issues.apache.org/jira/browse/SPARK-1687
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 1.0.0
>         Environment: Spark version 1.0.0-SNAPSHOT
> Python 2.7.5
>            Reporter: Pat McDonough
>            Assignee: Kan Zhang
>
> Add Support for NamedTuples in RDDs. Some sample code is below, followed by 
> the current error that comes from it.
> Based on a quick conversation with [~ahirreddy], 
> [Dill|https://github.com/uqfoundation/dill] might be a good solution here.
> {code}
> In [26]: from collections import namedtuple
> ...
> In [33]: Person = namedtuple('Person', 'id firstName lastName')
> In [34]: jon = Person(1, "Jon", "Doe")
> In [35]: jane = Person(2, "Jane", "Doe")
> In [36]: theDoes = sc.parallelize((jon, jane))
> In [37]: theDoes.collect()
> Out[37]: 
> [Person(id=1, firstName='Jon', lastName='Doe'),
>  Person(id=2, firstName='Jane', lastName='Doe')]
> In [38]: theDoes.count()
> PySpark worker failed with exception:
> PySpark worker failed with exception:
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 23
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 21
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>       at 
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting 
> job
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-38-8689b264fa46> in <module>()
> ----> 1 theDoes.count()
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self)
>     706         3
>     707         """
> --> 708         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>     709 
>     710     def stats(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self)
>     697         6.0
>     698         """
> --> 699         return self.mapPartitions(lambda x: 
> [sum(x)]).reduce(operator.add)
>     700 
>     701     def count(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f)
>     617             if acc is not None:
>     618                 yield acc
> --> 619         vals = self.mapPartitions(func).collect()
>     620         return reduce(f, vals)
>     621 
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self)
>     581         """
>     582         with _JavaStackTrace(self.context) as st:
> --> 583           bytesInJava = self._jrdd.collect().iterator()
>     584         return list(self._collect_iterator_through_file(bytesInJava))
>     585 
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py 
> in __call__(self, *args)
>     535         answer = self.gateway_client.send_command(command)
>     536         return_value = get_return_value(answer, self.gateway_client,
> --> 537                 self.target_id, self.name)
>     538 
>     539         for temp_arg in temp_args:
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o53.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
> 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on 
> host localhost: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in 
> pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in 
> <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, 
> in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, 
> in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>         
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>         
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>         
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>         
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to