Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Guanhua Yan
Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list
concatenation operations, and found that the performance becomes even worse.
So groupByKey is not that bad in my code.

Best regards,
- Guanhua



From:  Aaron Davidson ilike...@gmail.com
Reply-To:  user@spark.apache.org
Date:  Sat, 12 Jul 2014 16:32:22 -0700
To:  user@spark.apache.org
Subject:  Re: Confused by groupByKey() and the default partitioner

Yes, groupByKey() does partition by the hash of the key unless you specify a
custom Partitioner.

(1) If you were to use groupByKey() when the data was already partitioned
correctly, the data would indeed not be shuffled. Here is the associated
code, you'll see that it simply checks that the Partitioner the groupBy() is
looking for is equal to the Partitioner of the pre-existing RDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s
park/rdd/PairRDDFunctions.scala#L89

By the way, I should warn you that groupByKey() is not a recommended
operation if you can avoid it, as it has non-obvious performance issues when
running with serious data.


On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:
 Hi:
 
 I have trouble understanding the default partitioner (hash) in Spark. Suppose
 that an RDD with two partitions is created as follows:
 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)
 Does spark partition x based on the hash of the key (e.g., a, b, c) by
 default?
 (1) Assuming this is correct, if I further use the groupByKey primitive,
 x.groupByKey(), all the records sharing the same key should be located in the
 same partition. Then it's not necessary to shuffle the data records around, as
 all the grouping operations can be done locally.
 (2) If it's not true, how could I specify a partitioner simply based on the
 hashing of the key (in Python)?
 Thank you,
 - Guanhua





Confused by groupByKey() and the default partitioner

2014-07-12 Thread Guanhua Yan
Hi:

I have trouble understanding the default partitioner (hash) in Spark.
Suppose that an RDD with two partitions is created as follows:
x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)
Does spark partition x based on the hash of the key (e.g., a, b, c) by
default?
(1) Assuming this is correct, if I further use the groupByKey primitive,
x.groupByKey(), all the records sharing the same key should be located in
the same partition. Then it's not necessary to shuffle the data records
around, as all the grouping operations can be done locally.
(2) If it's not true, how could I specify a partitioner simply based on the
hashing of the key (in Python)?
Thank you,
- Guanhua




Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Guanhua Yan
Thanks Xiangrui. After some debugging efforts, it turns out that the
problem results from a bug in my code. But it's good to know that a long
lineage could also lead to this problem. I will also try checkpointing to
see whether the performance can be improved.

Best regards,
- Guanhua

On 5/13/14 12:10 AM, Xiangrui Meng men...@gmail.com wrote:

You have a long lineage that causes the StackOverflow error. Try
rdd.checkPoint() and rdd.count() for every 20~30 iterations.
checkPoint can cut the lineage. -Xiangrui

On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote:
 Dear Sparkers:

 I am using Python spark of version 0.9.0 to implement some iterative
 algorithm. I got some errors shown at the end of this email. It seems
that
 it's due to the Java Stack Overflow error. The same error has been
 duplicated on a mac desktop and a linux workstation, both running the
same
 version of Spark.

 The same line of code works correctly after quite some iterations. At
the
 line of error, rdd__new.count() could be 0. (In some previous rounds,
this
 was also 0 without any problem).

 Any thoughts on this?

 Thank you very much,
 - Guanhua


 
 CODE:print round, round, rdd__new.count()
 
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 542, in count
 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
 java.lang.StackOverflowError [duplicate 1]
 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
 aborting job
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 533, in sum
 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
FAILED
 from TID 1774 because its task set is gone
 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 499, in reduce
 vals = self.mapPartitions(func).collect()
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 463, in collect
 bytesInJava = self._jrdd.collect().iterator()
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
o4317.collect.
 : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
times
 (most recent failure: Exception failure: java.lang.StackOverflowError)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
a:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSch
eduler$$abortStage(DAGScheduler.scala:1026)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
GScheduler.scala:619)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
GScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
 at
 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:6
19)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun
$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
Dispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
a:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
ava:107)

 ==
 The stack overflow error is shown as follows:
 ==

 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
 java.lang.StackOverflowError
 at java.util.zip.Inflater.inflate(Inflater.java:259)
 at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
 at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
 at
 
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:231
0)
 at
 
java.io.ObjectInputStream

java.lang.StackOverflowError when calling count()

2014-05-12 Thread Guanhua Yan
Dear Sparkers:

I am using Python spark of version 0.9.0 to implement some iterative
algorithm. I got some errors shown at the end of this email. It seems that
it's due to the Java Stack Overflow error. The same error has been
duplicated on a mac desktop and a linux workstation, both running the same
version of Spark.

The same line of code works correctly after quite some iterations. At the
line of error, rdd__new.count() could be 0. (In some previous rounds, this
was also 0 without any problem).

Any thoughts on this?

Thank you very much,
- Guanhua



CODE:print round, round, rdd__new.count()

  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 542, in count
14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
java.lang.StackOverflowError [duplicate 1]
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
aborting job
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 533, in sum
14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
from TID 1774 because its task set is gone
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 499, in reduce
vals = self.mapPartitions(func).collect()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 463, in collect
bytesInJava = self._jrdd.collect().iterator()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/java_gateway.py, line 537, in __call__
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/protocol.py, line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
: org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu
ler$$abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re
ceive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis
patcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)

==
The stack overflow error is shown as follows:
==

14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
java.lang.StackOverflowError
at java.util.zip.Inflater.inflate(Inflater.java:259)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at 
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2
323)
at 
java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav
a:2818)
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 

Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Hi all:

Is it possible to develop Spark programs in Python and run them on YARN?
From the Python SparkContext class, it doesn't seem to have such an option.

Thank you,
- Guanhua

===
Guanhua Yan, Ph.D.
Information Sciences Group (CCS-3)
Los Alamos National Laboratory
Tel: +1-505-667-0176
Email: gh...@lanl.gov
Web: http://ghyan.weebly.com/
===




Re: Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Thanks, Matei. Will take a look at it.

Best regards,
Guanhua

From:  Matei Zaharia matei.zaha...@gmail.com
Reply-To:  user@spark.apache.org
Date:  Tue, 29 Apr 2014 14:19:30 -0700
To:  user@spark.apache.org
Subject:  Re: Python Spark on YARN

This will be possible in 1.0 after this pull request:
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote:

 Hi all:
 
 Is it possible to develop Spark programs in Python and run them on YARN? From
 the Python SparkContext class, it doesn't seem to have such an option.
 
 Thank you,
 - Guanhua
 
 ===
 Guanhua Yan, Ph.D.
 Information Sciences Group (CCS-3)
 Los Alamos National Laboratory
 Tel: +1-505-667-0176
 Email: gh...@lanl.gov
 Web: http://ghyan.weebly.com http://ghyan.weebly.com/ /
 ===