reduce, transform, combine
I am currently using the RDD aggregate operation to reduce (fold) per partition and then combine using the RDD aggregate operation. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U) = U): U I need to perform a transform operation after the seqOp and before the combOp. The signature would look like def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue: U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U This is especially useful in the scenario where the transformOp is expensive and should be performed once per partition before combining. Is there a way to accomplish this with existing RDD operations? If yes, great but if not, should we consider adding such a general transformation to the list of RDD operations? -Manish
Re: reduce, transform, combine
You could easily achieve this by mapPartition. However, it seems that it can not be done by using aggregate type of operation. I can see that it's a general useful operation. For now, you could use mapPartition. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 4, 2014 at 1:12 AM, Manish Amde manish...@gmail.com wrote: I am currently using the RDD aggregate operation to reduce (fold) per partition and then combine using the RDD aggregate operation. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U) = U): U I need to perform a transform operation after the seqOp and before the combOp. The signature would look like def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue: U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U This is especially useful in the scenario where the transformOp is expensive and should be performed once per partition before combining. Is there a way to accomplish this with existing RDD operations? If yes, great but if not, should we consider adding such a general transformation to the list of RDD operations? -Manish
Re: Mailing list
Le 4 mai 2014 à 06:30, Matei Zaharia matei.zaha...@gmail.com a écrit : Hi Nicolas, Good catches on these things. Your website seems a little bit incomplete. I have found this page [1] with list the two main mailing lists, users and dev. But I see a reference to a mailing list about issues which tracks the sparks issues when it was hosted at Atlassian. I guess it has moved ? where ? And is there any mailing about the commits ? Good catch, this was an old link and I’ve fixed it now. I also added the one for commits. Also, I found it weird that there is no page that is referencing the true code source, the git at the ASF, I only found references to the git at github. The GitHub repo is actually a mirror managed by the ASF, but the “git tag” link at http://spark.apache.org/downloads.html also points to the source repo. The problem is that our contribution process is through GitHub so it’s easier to point people to something that they can use to contribute. I am also interested in your workflow, because Ant is moving from svn to git and we're still a little bit in the grey about the workflow. I am thus intrigued how do you work with github pull requests. Take a look at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark and https://cwiki.apache.org/confluence/display/SPARK/Reviewing+and+Merging+Patches to see our contribution process. In a nutshell, it works as follows: - Anyone can make a patch by forking the GitHub repo and sending a pull request (GitHub’s internal patch mechanism) - Committers review the patch and ask for changes; contributors can push additional changes into their pull request to respond - When the patch looks good, we use a script to merge it into the source Apache repo; this also squashes the changes into one commit, making the Git history sane and facilitating reverts, cherry-picks into other branches, etc. The script you're talking about, is it merge_spark_pr.py [1] ? Note by the way that using GitHub is not at all necessary for using Git. We happened to do our development on GitHub before moving to the ASF, and all our developers were used to its interface, so we stuck with it. It definitely beats attaching patches on JIRA but it may not be the first step you want to take in moving to Git. Your workflow is indeed interesting. I guess most of Ant committers and potential contributors have experience with github too, so at some point we'll have to handle it. I'll discuss with the Ant dev community. Thank you Matei for the fix on the site and for the clear response. Nicolas [1] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py
Re: reduce, transform, combine
Thanks DB. I will work with mapPartition for now. Question to the community in general: should we consider adding such an operation to RDDs especially as a developer API? On Sun, May 4, 2014 at 1:41 AM, DB Tsai dbt...@stanford.edu wrote: You could easily achieve this by mapPartition. However, it seems that it can not be done by using aggregate type of operation. I can see that it's a general useful operation. For now, you could use mapPartition. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 4, 2014 at 1:12 AM, Manish Amde manish...@gmail.com wrote: I am currently using the RDD aggregate operation to reduce (fold) per partition and then combine using the RDD aggregate operation. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U) = U): U I need to perform a transform operation after the seqOp and before the combOp. The signature would look like def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue: U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U This is especially useful in the scenario where the transformOp is expensive and should be performed once per partition before combining. Is there a way to accomplish this with existing RDD operations? If yes, great but if not, should we consider adding such a general transformation to the list of RDD operations? -Manish
Re: Apache Spark running out of the spark shell
Hey AJ, If you plan to launch your job on a cluster, consider using the spark-submit command. Running this in the spark's home directory gives you a help on how to use this: $ ./bin/spark-submit I haven't tried it yet but considering this post, it will be the preferred way to launch jobs: http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-spark-submit-compatible-app-in-spark-shell-td4905.html Cheers Le 2014-05-04 à 13:35, Ajay Nair prodig...@gmail.com a écrit : Thank you. I am trying this now -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6472.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. Nicolas Garneau ngarn...@ngarneau.com
Re: bug using kryo as closure serializer
I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended. Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote: apologies for the cross-list posts, but I've gotten zero response in the user list and I guess this list is probably more appropriate. According to the documentation, using the KryoSerializer for closures is supported. However, when I try to set `spark.closure.serializer` to `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. The first thing that happens it that is throws exceptions over and over that it cannot locate my registrator class, which is located in my assembly jar like so: 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run spark.kryo.registrator java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) at scala.Option.foreach(Option.scala:236) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Now, I would expect it not to be able to find this class since it hasn't yet fetched my assembly jar to the executors. Once it does fetch my jar, those expections stop. Next, all the executor task die with the following exception: java.nio.ReadOnlyBufferException at java.nio.ByteBuffer.array(ByteBuffer.java:961) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and using the registrator mechanism to register a couple custom serializers. The reason I tried turning on kryo for closure in the first place is because of a different bug that I was hitting during fetching and deserializing of tasks from my executors, which I detailed here: http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html Here's hoping some on this list can help me track down what's happening as I didn't get a single reply on the user list.
Re: bug using kryo as closure serializer
On a slightly related note (apologies Soren for hijacking the thread), Reynold how much better is kryo from spark's usage point of view compared to the default java serialization (in general, not for closures) ? The numbers on kyro site are interesting, but since you have played the most with kryo in context of spark (i think) - how do you rate it along lines of : 1) computational overhead compared to java serialization. 2) memory overhead. 3) generated byte[] size. Particularly given the bugs Patrick and I had looked into in past along flush, etc I was always skeptical about using kyro. But given the pretty nasty issues with OOM's via java serialization we are seeing, wanted to know your thoughts on use of kyro with spark. (Will be slightly involved to ensure everything gets registered, but I want to go down the path assuming I hear good things in context of spark) Thanks, Mridul On Mon, May 5, 2014 at 1:20 AM, Reynold Xin r...@databricks.com wrote: I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended. Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote: apologies for the cross-list posts, but I've gotten zero response in the user list and I guess this list is probably more appropriate. According to the documentation, using the KryoSerializer for closures is supported. However, when I try to set `spark.closure.serializer` to `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. The first thing that happens it that is throws exceptions over and over that it cannot locate my registrator class, which is located in my assembly jar like so: 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run spark.kryo.registrator java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) at scala.Option.foreach(Option.scala:236) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Now, I would expect it not to be able to find this class since it hasn't yet fetched my assembly jar to the executors. Once it does fetch my jar, those expections stop. Next, all the executor task die with the following exception: java.nio.ReadOnlyBufferException at java.nio.ByteBuffer.array(ByteBuffer.java:961) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at
Re: Apache Spark running out of the spark shell
Now I got it to work .. well almost. However I needed to copy the project/ folder to the spark-standalone folder as the package build was failing because it could not find buil properties. After the copy the build was successful. However when I run it I get errors but it still gives me the output. [error] 14/05/04 21:58:19 INFO spark.SparkContext: Job finished: count at SimpleApp.scala:11, took 0.040651597 s [error] 14/05/04 21:58:19 INFO scheduler.TaskSetManager: Finished TID 3 in 17 ms on localhost (progress: 2/2) [info] Lines with a: 3, Lines with b: 2 [error] 14/05/04 21:58:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool [success] Total time: 5 s, completed May 4, 2014 9:58:20 PM You can see the [info] that contains the output. All the lines i get mention [errors], any reason why ? I have configured my ec2 machines master and slave nodes and this code I think tries to run in the local mode. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6478.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: bug using kryo as closure serializer
Thanks for the reply! Ok, if that's the case, I'd recommend a note to that affect in the docs at least. Just to give some more context here, I'm working on a Clojure DSL for Spark called Flambo, which I plan to open source shortly. If I could I'd like to focus on the initial bug that I hit. Exception in thread main org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set final scala.collection.convert.Wrappers field scala.collection.convert.Wrappers$SeqWrapper.$outer to clojure.lang.PersistentVector Serialization trace: $outer (scala.collection.convert.Wrappers$SeqWrapper) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This happens immediately after all the tasks of a reduce stage complete successfully. Here is the function throwing the exception: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43 This is where I get lost. From googling around, it seems that scala is trying to wrap the result of my task, which contain clojure.lang.PersistentVector objects in a scala collection, but I don't know why it's doing that. I have a registered kryo serializer for clojure.lang.PersistentVector. based on this line is looks like it's trying to use the closure serializer, yet the expection thrown is from com.esotericsoftware.kryo.KryoException: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from trying to deal with my clojure.lang.PeristentVector class? Where do I go from here? On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote: I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended. Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote: apologies for the cross-list posts, but I've gotten zero response in the user list and I guess this list is probably more appropriate. According to the documentation, using the KryoSerializer for closures is supported. However, when I try to set `spark.closure.serializer` to `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. The first thing that happens it that is throws exceptions over and over that it cannot locate my registrator class, which is located in my assembly jar like so: 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run spark.kryo.registrator java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
Re: Apache Spark running out of the spark shell
Hey AJ, I have tried to run on a cluster yet, only on local mode. I'll try to get something running on a cluster soon and keep you posted. Nicolas Garneau On May 4, 2014, at 6:23 PM, Ajay Nair prodig...@gmail.com wrote: Now I got it to work .. well almost. However I needed to copy the project/ folder to the spark-standalone folder as the package build was failing because it could not find buil properties. After the copy the build was successful. However when I run it I get errors but it still gives me the output. [error] 14/05/04 21:58:19 INFO spark.SparkContext: Job finished: count at SimpleApp.scala:11, took 0.040651597 s [error] 14/05/04 21:58:19 INFO scheduler.TaskSetManager: Finished TID 3 in 17 ms on localhost (progress: 2/2) [info] Lines with a: 3, Lines with b: 2 [error] 14/05/04 21:58:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool [success] Total time: 5 s, completed May 4, 2014 9:58:20 PM You can see the [info] that contains the output. All the lines i get mention [errors], any reason why ? I have configured my ec2 machines master and slave nodes and this code I think tries to run in the local mode. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6478.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: bug using kryo as closure serializer
Good idea. I submitted a pull request for the doc update here: https://github.com/apache/spark/pull/642 On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com wrote: Thanks for the reply! Ok, if that's the case, I'd recommend a note to that affect in the docs at least. Just to give some more context here, I'm working on a Clojure DSL for Spark called Flambo, which I plan to open source shortly. If I could I'd like to focus on the initial bug that I hit. Exception in thread main org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set final scala.collection.convert.Wrappers field scala.collection.convert.Wrappers$SeqWrapper.$outer to clojure.lang.PersistentVector Serialization trace: $outer (scala.collection.convert.Wrappers$SeqWrapper) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This happens immediately after all the tasks of a reduce stage complete successfully. Here is the function throwing the exception: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43 This is where I get lost. From googling around, it seems that scala is trying to wrap the result of my task, which contain clojure.lang.PersistentVector objects in a scala collection, but I don't know why it's doing that. I have a registered kryo serializer for clojure.lang.PersistentVector. based on this line is looks like it's trying to use the closure serializer, yet the expection thrown is from com.esotericsoftware.kryo.KryoException: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from trying to deal with my clojure.lang.PeristentVector class? Where do I go from here? On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote: I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended. Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote: apologies for the cross-list posts, but I've gotten zero response in the user list and I guess this list is probably more appropriate. According to the documentation, using the KryoSerializer for closures is supported. However, when I try to set `spark.closure.serializer` to `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. The first thing that happens it that is throws exceptions over and over that it cannot locate my registrator class, which is located in my assembly
Re: bug using kryo as closure serializer
Thanks. Do you mind playing with chill-scala a little bit and see if it actually works well for closures? One way to try is to hard code the serializer to use Kryo with chill-scala, and then run through all the unit tests. If it works well, we can incorporate that in the next release (probably not 1.0, but after that). On Sun, May 4, 2014 at 9:08 PM, Soren Macbeth so...@yieldbot.com wrote: fwiw, it seems like it wouldn't be very difficult to integrate chill-scala, since you're already chill-java and probably get kryo serialization of closures and all sorts of other scala stuff for free. All that would be needed would be to include the dependency and then update KryoSerializer to register the stuff in chill-scala. In that case, you could probably safely make kryo the default serializer, which I think would be desirable in general. On Sun, May 4, 2014 at 8:48 PM, Reynold Xin r...@databricks.com wrote: Good idea. I submitted a pull request for the doc update here: https://github.com/apache/spark/pull/642 On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com wrote: Thanks for the reply! Ok, if that's the case, I'd recommend a note to that affect in the docs at least. Just to give some more context here, I'm working on a Clojure DSL for Spark called Flambo, which I plan to open source shortly. If I could I'd like to focus on the initial bug that I hit. Exception in thread main org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set final scala.collection.convert.Wrappers field scala.collection.convert.Wrappers$SeqWrapper.$outer to clojure.lang.PersistentVector Serialization trace: $outer (scala.collection.convert.Wrappers$SeqWrapper) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This happens immediately after all the tasks of a reduce stage complete successfully. Here is the function throwing the exception: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43 This is where I get lost. From googling around, it seems that scala is trying to wrap the result of my task, which contain clojure.lang.PersistentVector objects in a scala collection, but I don't know why it's doing that. I have a registered kryo serializer for clojure.lang.PersistentVector. based on this line is looks like it's trying to use the closure serializer, yet the expection thrown is from com.esotericsoftware.kryo.KryoException: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from trying to deal with my clojure.lang.PeristentVector class? Where do I go from here? On Sun, May 4, 2014 at 12:50 PM, Reynold Xin