Re: Confused by groupByKey() and the default partitioner
Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list concatenation operations, and found that the performance becomes even worse. So groupByKey is not that bad in my code. Best regards, - Guanhua From: Aaron Davidson ilike...@gmail.com Reply-To: user@spark.apache.org Date: Sat, 12 Jul 2014 16:32:22 -0700 To: user@spark.apache.org Subject: Re: Confused by groupByKey() and the default partitioner Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s park/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Confused by groupByKey() and the default partitioner
Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: java.lang.StackOverflowError when calling count()
Thanks Xiangrui. After some debugging efforts, it turns out that the problem results from a bug in my code. But it's good to know that a long lineage could also lead to this problem. I will also try checkpointing to see whether the performance can be improved. Best regards, - Guanhua On 5/13/14 12:10 AM, Xiangrui Meng men...@gmail.com wrote: You have a long lineage that causes the StackOverflow error. Try rdd.checkPoint() and rdd.count() for every 20~30 iterations. checkPoint can cut the lineage. -Xiangrui On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote: Dear Sparkers: I am using Python spark of version 0.9.0 to implement some iterative algorithm. I got some errors shown at the end of this email. It seems that it's due to the Java Stack Overflow error. The same error has been duplicated on a mac desktop and a linux workstation, both running the same version of Spark. The same line of code works correctly after quite some iterations. At the line of error, rdd__new.count() could be 0. (In some previous rounds, this was also 0 without any problem). Any thoughts on this? Thank you very much, - Guanhua CODE:print round, round, rdd__new.count() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ rdd.py, line 542, in count 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to java.lang.StackOverflowError [duplicate 1] return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; aborting job File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ rdd.py, line 533, in sum 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED from TID 1774 because its task set is gone return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ rdd.py, line 499, in reduce vals = self.mapPartitions(func).collect() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ rdd.py, line 463, in collect bytesInJava = self._jrdd.collect().iterator() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j -0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j -0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect. : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal a:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSch eduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA GScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA GScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:6 19) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun $receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract Dispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav a:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j ava:107) == The stack overflow error is shown as follows: == 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 java.lang.StackOverflowError at java.util.zip.Inflater.inflate(Inflater.java:259) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:231 0) at java.io.ObjectInputStream
java.lang.StackOverflowError when calling count()
Dear Sparkers: I am using Python spark of version 0.9.0 to implement some iterative algorithm. I got some errors shown at the end of this email. It seems that it's due to the Java Stack Overflow error. The same error has been duplicated on a mac desktop and a linux workstation, both running the same version of Spark. The same line of code works correctly after quite some iterations. At the line of error, rdd__new.count() could be 0. (In some previous rounds, this was also 0 without any problem). Any thoughts on this? Thank you very much, - Guanhua CODE:print round, round, rdd__new.count() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 542, in count 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to java.lang.StackOverflowError [duplicate 1] return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; aborting job File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 533, in sum 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED from TID 1774 because its task set is gone return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 499, in reduce vals = self.mapPartitions(func).collect() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 463, in collect bytesInJava = self._jrdd.collect().iterator() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect. : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5 9) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu ler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re ceive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis patcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1 339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java :107) == The stack overflow error is shown as follows: == 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 java.lang.StackOverflowError at java.util.zip.Inflater.inflate(Inflater.java:259) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2 323) at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav a:2818) at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Python Spark on YARN
Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com/ ===
Re: Python Spark on YARN
Thanks, Matei. Will take a look at it. Best regards, Guanhua From: Matei Zaharia matei.zaha...@gmail.com Reply-To: user@spark.apache.org Date: Tue, 29 Apr 2014 14:19:30 -0700 To: user@spark.apache.org Subject: Re: Python Spark on YARN This will be possible in 1.0 after this pull request: https://github.com/apache/spark/pull/30 Matei On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote: Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com http://ghyan.weebly.com/ / ===