[ https://issues.apache.org/jira/browse/SPARK-3333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117853#comment-14117853 ]
Nicholas Chammas commented on SPARK-3333: ----------------------------------------- It looks like the default number of reducers does indeed explain most of the performance difference here. But there is still a significant difference even after controlling this variable. I have 2 identical EC2 clusters as described in this JIRA issue, one on 1.0.2 and one on 1.1.0-rc3. This time I ran the following PySpark code: {code} a = sc.parallelize(["Nick", "John", "Bob"]) a = a.repartition(24000) a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, sc.defaultParallelism).take(1) {code} Here are my results for 3 runs on each cluster: ||1.0.2||1.1.0-rc3|| | 95s | 343s | | 89s | 336s | | 95s | 334s | So manually setting the number of reducers to a smaller number does help a lot, but there is still a 3-4x performance slowdown. Can anyone else replicate this result? > Large number of partitions causes OOM > ------------------------------------- > > Key: SPARK-3333 > URL: https://issues.apache.org/jira/browse/SPARK-3333 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.1.0 > Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances > Reporter: Nicholas Chammas > > Here’s a repro for PySpark: > {code} > a = sc.parallelize(["Nick", "John", "Bob"]) > a = a.repartition(24000) > a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) > {code} > This code runs fine on 1.0.2. It returns the following result in just over a > minute: > {code} > [(4, 'NickJohn')] > {code} > However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it > runs for a very, very long time (at least > 45 min) and then fails with > {{java.lang.OutOfMemoryError: Java heap space}}. > Here is a stack trace taken from a run on 1.1.0-rc2: > {code} > >>> a = sc.parallelize(["Nick", "John", "Bob"]) > >>> a = a.repartition(24000) > >>> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) > 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent > heart beats: 175143ms exceeds 45000ms > 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent > heart beats: 175359ms exceeds 45000ms > 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent > heart beats: 173061ms exceeds 45000ms > 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent > heart beats: 176816ms exceeds 45000ms > 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent > heart beats: 182241ms exceeds 45000ms > 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent > heart beats: 178406ms exceeds 45000ms > 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver > thread-3 > java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Exception in thread "Result resolver thread-3" 14/08/29 21:56:26 ERROR > SendingConnection: Exception while reading SendingConnection to > ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014) > java.nio.channels.ClosedChannelException > at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) > at org.apache.spark.network.SendingConnection.read(Connection.scala:390) > at > org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 14/08/29 21:54:43 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(6, ip-10-137-1-139.ec2.internal, 42539, 0) with no recent > heart beats: 183978ms exceeds 45000ms > 14/08/29 21:57:42 ERROR ConnectionManager: Corresponding SendingConnection to > ConnectionManagerId(ip-10-138-9-33.ec2.internal,41924) not found > 14/08/29 21:57:51 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(11, ip-10-236-181-116.ec2.internal, 46847, 0) with no recent > heart beats: 178629ms exceeds 45000ms > 14/08/29 21:57:43 ERROR ConnectionManager: Corresponding SendingConnection to > ConnectionManagerId(ip-10-137-1-139.ec2.internal,42539) not found > 14/08/29 21:57:54 ERROR SendingConnection: Exception while reading > SendingConnection to ConnectionManagerId(ip-10-141-136-168.ec2.internal,42960) > java.nio.channels.ClosedChannelException > at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) > at org.apache.spark.network.SendingConnection.read(Connection.scala:390) > at > org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org