I understand the error is because the number of partitions is very high, yet when processing 40 TB (and this number is expected to grow) this number seems reasonable: 40TB / 300,000 will result in partitions size of ~ 130MB (data should be evenly distributed).
On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <va...@datadoghq.com> wrote: > You have too many partitions, so when the driver is trying to gather > the status of all map outputs and send back to executors it chokes on > the size of the structure that needs to be GZipped, and since it's > bigger than 2GiB, it produces OOM. > On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <harelg...@gmail.com> > wrote: > > > > Hi, > > > > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge > (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB. > > > > It processes ~40 TB of data using aggregateByKey in which we specify > numPartitions = 300,000. > > Map side tasks succeed, but reduce side tasks all fail. > > > > We notice the following driver error: > > > > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null > > > > java.lang.OutOfMemoryError > > > > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > > at > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > > at > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822) > > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719) > > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740) > > at > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389) > > at > org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789) > > at > org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174) > > at > org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError > > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > > at > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > > at > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > > at > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787) > > at > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786) > > at > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) > > at > org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789) > > at > org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174) > > at > org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Suppressed: java.lang.OutOfMemoryError > > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > > at > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > > at > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822) > > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719) > > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740) > > at > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389) > > ... 6 more > > > > We found this reference ( > https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that > was closed in 2016. > > > > Please advise, > > > > Harel. > > > > > > > -- > Sent from my iPhone >