reduce, transform, combine

2014-05-04 Thread Manish Amde
I am currently using the RDD aggregate operation to reduce (fold) per
partition and then combine using the RDD aggregate operation.
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U)
= U): U

I need to perform a transform operation after the seqOp and before the
combOp. The signature would look like
def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue:
U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U

This is especially useful in the scenario where the transformOp is
expensive and should be performed once per partition before combining. Is
there a way to accomplish this with existing RDD operations? If yes, great
but if not, should we consider adding such a general transformation to the
list of RDD operations?

-Manish


Re: reduce, transform, combine

2014-05-04 Thread DB Tsai
You could easily achieve this by mapPartition. However, it seems that it
can not be done by using aggregate type of operation. I can see that it's a
general useful operation. For now, you could use mapPartition.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sun, May 4, 2014 at 1:12 AM, Manish Amde manish...@gmail.com wrote:

 I am currently using the RDD aggregate operation to reduce (fold) per
 partition and then combine using the RDD aggregate operation.
 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U)
 = U): U

 I need to perform a transform operation after the seqOp and before the
 combOp. The signature would look like
 def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue:
 U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U

 This is especially useful in the scenario where the transformOp is
 expensive and should be performed once per partition before combining. Is
 there a way to accomplish this with existing RDD operations? If yes, great
 but if not, should we consider adding such a general transformation to the
 list of RDD operations?

 -Manish



Re: Mailing list

2014-05-04 Thread Nicolas Lalevée

Le 4 mai 2014 à 06:30, Matei Zaharia matei.zaha...@gmail.com a écrit :

 Hi Nicolas,
 
 Good catches on these things.
 
 Your website seems a little bit incomplete. I have found this page [1] with 
 list the two main mailing lists, users and dev. But I see a reference to a 
 mailing list about issues which tracks the sparks issues when it was 
 hosted at Atlassian. I guess it has moved ? where ?
 And is there any mailing about the commits ?
 
 Good catch, this was an old link and I’ve fixed it now. I also added the one 
 for commits.
 
 Also, I found it weird that there is no page that is referencing the true 
 code source, the git at the ASF, I only found references to the git at 
 github.
 
 The GitHub repo is actually a mirror managed by the ASF, but the “git tag” 
 link at http://spark.apache.org/downloads.html also points to the source 
 repo. The problem is that our contribution process is through GitHub so it’s 
 easier to point people to something that they can use to contribute.
 
 I am also interested in your workflow, because Ant is moving from svn to git 
 and we're still a little bit in the grey about the workflow. I am thus 
 intrigued how do you work with github pull requests.
 
 Take a look at 
 https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark and 
 https://cwiki.apache.org/confluence/display/SPARK/Reviewing+and+Merging+Patches
  to see our contribution process. In a nutshell, it works as follows:
 
 - Anyone can make a patch by forking the GitHub repo and sending a pull 
 request (GitHub’s internal patch mechanism)
 - Committers review the patch and ask for changes; contributors can push 
 additional changes into their pull request to respond
 - When the patch looks good, we use a script to merge it into the source 
 Apache repo; this also squashes the changes into one commit, making the Git 
 history sane and facilitating reverts, cherry-picks into other branches, etc.

The script you're talking about, is it merge_spark_pr.py [1] ?

 Note by the way that using GitHub is not at all necessary for using Git. We 
 happened to do our development on GitHub before moving to the ASF, and all 
 our developers were used to its interface, so we stuck with it. It definitely 
 beats attaching patches on JIRA but it may not be the first step you want to 
 take in moving to Git.

Your workflow is indeed interesting. I guess most of Ant committers and 
potential contributors have experience with github too, so at some point we'll 
have to handle it. I'll discuss with the Ant dev community.

Thank you Matei for the fix on the site and for the clear response.

Nicolas

[1] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



Re: reduce, transform, combine

2014-05-04 Thread Manish Amde
Thanks DB. I will work with mapPartition for now. 


Question to the community in general: should we consider adding such an 
operation to RDDs especially as a developer API?

On Sun, May 4, 2014 at 1:41 AM, DB Tsai dbt...@stanford.edu wrote:

 You could easily achieve this by mapPartition. However, it seems that it
 can not be done by using aggregate type of operation. I can see that it's a
 general useful operation. For now, you could use mapPartition.
 Sincerely,
 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai
 On Sun, May 4, 2014 at 1:12 AM, Manish Amde manish...@gmail.com wrote:
 I am currently using the RDD aggregate operation to reduce (fold) per
 partition and then combine using the RDD aggregate operation.
 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) = U, combOp: (U, U)
 = U): U

 I need to perform a transform operation after the seqOp and before the
 combOp. The signature would look like
 def foldTransformCombine[U: ClassTag](zeroReduceValue: V, zeroCombineValue:
 U)(seqOp: (V, T) = V, transformOp: (V) = U, combOp: (U, U) = U): U

 This is especially useful in the scenario where the transformOp is
 expensive and should be performed once per partition before combining. Is
 there a way to accomplish this with existing RDD operations? If yes, great
 but if not, should we consider adding such a general transformation to the
 list of RDD operations?

 -Manish


Re: Apache Spark running out of the spark shell

2014-05-04 Thread Nicolas Garneau
Hey AJ,

If you plan to launch your job on a cluster, consider using the spark-submit 
command.
Running this in the spark's home directory gives you a help on how to use this:

$ ./bin/spark-submit

I haven't tried it yet but considering this post, it will be the preferred way 
to launch jobs:
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-spark-submit-compatible-app-in-spark-shell-td4905.html

Cheers

Le 2014-05-04 à 13:35, Ajay Nair prodig...@gmail.com a écrit :

 Thank you. I am trying this now
 
 
 
 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6472.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
 

Nicolas Garneau
ngarn...@ngarneau.com



Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
I added the config option to use the non-default serializer. However, at
the time, Kryo fails serializing pretty much any closures so that option
was never really used / recommended.

Since then the Scala ecosystem has developed, and some other projects are
starting to use Kryo to serialize more Scala data structures, so I wouldn't
be surprised if there is a way to work around this now. However, I don't
have enough time to look into it at this point. If you do, please do post
your findings. Thanks.



On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

 apologies for the cross-list posts, but I've gotten zero response in the
 user list and I guess this list is probably more appropriate.

 According to the documentation, using the KryoSerializer for closures is
 supported. However, when I try to set `spark.closure.serializer` to
 `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.

 The first thing that happens it that is throws exceptions over and over
 that it cannot locate my registrator class, which is located in my assembly
 jar like so:

 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
 at

 org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116)
 at

 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Now, I would expect it not to be able to find this class since it hasn't
 yet fetched my assembly jar to the executors. Once it does fetch my jar,
 those expections stop. Next, all the executor task die with the following
 exception:

 java.nio.ReadOnlyBufferException
 at java.nio.ByteBuffer.array(ByteBuffer.java:961)
 at

 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and
 using the registrator mechanism to register a couple custom serializers.

 The reason I tried turning on kryo for closure in the first place is
 because of a different bug that I was hitting during fetching and
 deserializing of tasks from my executors, which I detailed here:


 http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html

 Here's hoping some on this list can help me track down what's happening as
 I didn't get a single reply on the user list.



Re: bug using kryo as closure serializer

2014-05-04 Thread Mridul Muralidharan
On a slightly related note (apologies Soren for hijacking the thread),
Reynold how much better is kryo from spark's usage point of view
compared to the default java serialization (in general, not for
closures) ?
The numbers on kyro site are interesting, but since you have played
the most with kryo in context of spark (i think) - how do you rate it
along lines of :

1) computational overhead compared to java serialization.
2) memory overhead.
3) generated byte[] size.


Particularly given the bugs Patrick and I had looked into in past
along flush, etc I was always skeptical about using kyro.
But given the pretty nasty issues with OOM's via java serialization we
are seeing, wanted to know your thoughts on use of kyro with spark.
(Will be slightly involved to ensure everything gets registered, but I
want to go down the path assuming I hear good things in context of
spark)

Thanks,
Mridul


On Mon, May 5, 2014 at 1:20 AM, Reynold Xin r...@databricks.com wrote:
 I added the config option to use the non-default serializer. However, at
 the time, Kryo fails serializing pretty much any closures so that option
 was never really used / recommended.

 Since then the Scala ecosystem has developed, and some other projects are
 starting to use Kryo to serialize more Scala data structures, so I wouldn't
 be surprised if there is a way to work around this now. However, I don't
 have enough time to look into it at this point. If you do, please do post
 your findings. Thanks.



 On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

 apologies for the cross-list posts, but I've gotten zero response in the
 user list and I guess this list is probably more appropriate.

 According to the documentation, using the KryoSerializer for closures is
 supported. However, when I try to set `spark.closure.serializer` to
 `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.

 The first thing that happens it that is throws exceptions over and over
 that it cannot locate my registrator class, which is located in my assembly
 jar like so:

 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
 at

 org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116)
 at

 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Now, I would expect it not to be able to find this class since it hasn't
 yet fetched my assembly jar to the executors. Once it does fetch my jar,
 those expections stop. Next, all the executor task die with the following
 exception:

 java.nio.ReadOnlyBufferException
 at java.nio.ByteBuffer.array(ByteBuffer.java:961)
 at

 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 

Re: Apache Spark running out of the spark shell

2014-05-04 Thread Ajay Nair
Now I got it to work .. well almost. However I needed to copy the project/
folder to the spark-standalone folder as the package build was failing
because it could not find buil properties. After the copy the build was
successful. However when I run it I get errors but it still gives me the
output.

[error] 14/05/04 21:58:19 INFO spark.SparkContext: Job finished: count at
SimpleApp.scala:11, took 0.040651597 s
[error] 14/05/04 21:58:19 INFO scheduler.TaskSetManager: Finished TID 3 in
17 ms on localhost (progress: 2/2)
[info] Lines with a: 3, Lines with b: 2
[error] 14/05/04 21:58:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
1.0, whose tasks have all completed, from pool 
[success] Total time: 5 s, completed May 4, 2014 9:58:20 PM


You can see the [info] that contains the output. All the lines i get mention
[errors], any reason why ?

I have configured my ec2 machines master and slave nodes and this code I
think tries to run in the local mode.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6478.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: bug using kryo as closure serializer

2014-05-04 Thread Soren Macbeth
Thanks for the reply!

Ok, if that's the case, I'd recommend a note to that affect in the docs at
least.

Just to give some more context here, I'm working on a Clojure DSL for Spark
called Flambo, which I plan to open source shortly. If I could I'd like to
focus on the initial bug that I hit.

Exception in thread main org.apache.spark.SparkException: Job aborted:
Exception while deserializing and fetching task:
com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Can not set final
scala.collection.convert.Wrappers field
scala.collection.convert.Wrappers$SeqWrapper.$outer to
clojure.lang.PersistentVector
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This happens immediately after all the tasks of a reduce stage complete
successfully. Here is the function throwing the exception:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43

This is where I get lost. From googling around, it seems that scala is
trying to wrap the result of my task, which contain
clojure.lang.PersistentVector objects in a scala collection, but I don't
know why it's doing that. I have a registered kryo serializer for
clojure.lang.PersistentVector.

based on this line is looks like it's trying to use the closure serializer,
yet the expection thrown is from com.esotericsoftware.kryo.KryoException:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39

Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
trying to deal with my clojure.lang.PeristentVector class?

Where do I go from here?


On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote:

 I added the config option to use the non-default serializer. However, at
 the time, Kryo fails serializing pretty much any closures so that option
 was never really used / recommended.

 Since then the Scala ecosystem has developed, and some other projects are
 starting to use Kryo to serialize more Scala data structures, so I wouldn't
 be surprised if there is a way to work around this now. However, I don't
 have enough time to look into it at this point. If you do, please do post
 your findings. Thanks.



 On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

  apologies for the cross-list posts, but I've gotten zero response in the
  user list and I guess this list is probably more appropriate.
 
  According to the documentation, using the KryoSerializer for closures is
  supported. However, when I try to set `spark.closure.serializer` to
  `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.
 
  The first thing that happens it that is throws exceptions over and over
  that it cannot locate my registrator class, which is located in my
 assembly
  jar like so:
 
  14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
  spark.kryo.registrator
  java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at 

Re: Apache Spark running out of the spark shell

2014-05-04 Thread Nicolas Garneau
Hey AJ,

I have tried to run on a cluster yet, only on local mode.
I'll try to get something running on a cluster soon and keep you posted.

Nicolas Garneau

 On May 4, 2014, at 6:23 PM, Ajay Nair prodig...@gmail.com wrote:
 
 Now I got it to work .. well almost. However I needed to copy the project/
 folder to the spark-standalone folder as the package build was failing
 because it could not find buil properties. After the copy the build was
 successful. However when I run it I get errors but it still gives me the
 output.
 
 [error] 14/05/04 21:58:19 INFO spark.SparkContext: Job finished: count at
 SimpleApp.scala:11, took 0.040651597 s
 [error] 14/05/04 21:58:19 INFO scheduler.TaskSetManager: Finished TID 3 in
 17 ms on localhost (progress: 2/2)
 [info] Lines with a: 3, Lines with b: 2
 [error] 14/05/04 21:58:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
 1.0, whose tasks have all completed, from pool 
 [success] Total time: 5 s, completed May 4, 2014 9:58:20 PM
 
 
 You can see the [info] that contains the output. All the lines i get mention
 [errors], any reason why ?
 
 I have configured my ec2 machines master and slave nodes and this code I
 think tries to run in the local mode.
 
 
 
 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Spark-running-out-of-the-spark-shell-tp6459p6478.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
 


Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
Good idea. I submitted a pull request for the doc update here:
https://github.com/apache/spark/pull/642


On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com wrote:

 Thanks for the reply!

 Ok, if that's the case, I'd recommend a note to that affect in the docs at
 least.

 Just to give some more context here, I'm working on a Clojure DSL for Spark
 called Flambo, which I plan to open source shortly. If I could I'd like to
 focus on the initial bug that I hit.

 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Exception while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException:
 java.lang.IllegalArgumentException: Can not set final
 scala.collection.convert.Wrappers field
 scala.collection.convert.Wrappers$SeqWrapper.$outer to
 clojure.lang.PersistentVector
 Serialization trace:
 $outer (scala.collection.convert.Wrappers$SeqWrapper)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 This happens immediately after all the tasks of a reduce stage complete
 successfully. Here is the function throwing the exception:


 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43

 This is where I get lost. From googling around, it seems that scala is
 trying to wrap the result of my task, which contain
 clojure.lang.PersistentVector objects in a scala collection, but I don't
 know why it's doing that. I have a registered kryo serializer for
 clojure.lang.PersistentVector.

 based on this line is looks like it's trying to use the closure serializer,
 yet the expection thrown is from com.esotericsoftware.kryo.KryoException:


 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39

 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
 trying to deal with my clojure.lang.PeristentVector class?

 Where do I go from here?


 On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote:

  I added the config option to use the non-default serializer. However, at
  the time, Kryo fails serializing pretty much any closures so that option
  was never really used / recommended.
 
  Since then the Scala ecosystem has developed, and some other projects are
  starting to use Kryo to serialize more Scala data structures, so I
 wouldn't
  be surprised if there is a way to work around this now. However, I don't
  have enough time to look into it at this point. If you do, please do post
  your findings. Thanks.
 
 
 
  On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com
 wrote:
 
   apologies for the cross-list posts, but I've gotten zero response in
 the
   user list and I guess this list is probably more appropriate.
  
   According to the documentation, using the KryoSerializer for closures
 is
   supported. However, when I try to set `spark.closure.serializer` to
   `org.apache.spark.serializer.KryoSerializer` thing fail pretty
 miserably.
  
   The first thing that happens it that is throws exceptions over and over
   that it cannot locate my registrator class, which is located in my
  assembly
   

Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
Thanks. Do you mind playing with chill-scala a little bit and see if it
actually works well for closures? One way to try is to hard code the
serializer to use Kryo with chill-scala, and then run through all the unit
tests.

If it works well, we can incorporate that in the next release (probably not
1.0, but after that).


On Sun, May 4, 2014 at 9:08 PM, Soren Macbeth so...@yieldbot.com wrote:

 fwiw, it seems like it wouldn't be very difficult to integrate chill-scala,
 since you're already chill-java and probably get kryo serialization of
 closures and all sorts of other scala stuff for free. All that would be
 needed would be to include the dependency and then update KryoSerializer to
 register the stuff in chill-scala.

 In that case, you could probably safely make kryo the default serializer,
 which I think would be desirable in general.


 On Sun, May 4, 2014 at 8:48 PM, Reynold Xin r...@databricks.com wrote:

  Good idea. I submitted a pull request for the doc update here:
  https://github.com/apache/spark/pull/642
 
 
  On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com
 wrote:
 
   Thanks for the reply!
  
   Ok, if that's the case, I'd recommend a note to that affect in the docs
  at
   least.
  
   Just to give some more context here, I'm working on a Clojure DSL for
  Spark
   called Flambo, which I plan to open source shortly. If I could I'd like
  to
   focus on the initial bug that I hit.
  
   Exception in thread main org.apache.spark.SparkException: Job
 aborted:
   Exception while deserializing and fetching task:
   com.esotericsoftware.kryo.KryoException:
   java.lang.IllegalArgumentException: Can not set final
   scala.collection.convert.Wrappers field
   scala.collection.convert.Wrappers$SeqWrapper.$outer to
   clojure.lang.PersistentVector
   Serialization trace:
   $outer (scala.collection.convert.Wrappers$SeqWrapper)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
   at
  
  
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at
   scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at org.apache.spark.scheduler.DAGScheduler.org
  
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
  
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at
   scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
  
  
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
  
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
  
  
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  
   This happens immediately after all the tasks of a reduce stage complete
   successfully. Here is the function throwing the exception:
  
  
  
 
 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43
  
   This is where I get lost. From googling around, it seems that scala is
   trying to wrap the result of my task, which contain
   clojure.lang.PersistentVector objects in a scala collection, but I
 don't
   know why it's doing that. I have a registered kryo serializer for
   clojure.lang.PersistentVector.
  
   based on this line is looks like it's trying to use the closure
  serializer,
   yet the expection thrown is from
 com.esotericsoftware.kryo.KryoException:
  
  
  
 
 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39
  
   Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer
  from
   trying to deal with my clojure.lang.PeristentVector class?
  
   Where do I go from here?
  
  
   On Sun, May 4, 2014 at 12:50 PM, Reynold Xin