File system is HDFS. Executors are 2 cores, 14GB RAM. But I don't think
either of these relate to the problem -- this is a memory allocation issue
on the driver side, and happens in an intermediate stage that has no HDFS
read/write.

On Fri, Nov 8, 2019 at 10:01 AM Spico Florin <spicoflo...@gmail.com> wrote:

> Hi!
> What file system are you using: EMRFS or HDFS?
> Also what memory are you using for the reducer ?
>
> On Thu, Nov 7, 2019 at 8:37 PM abeboparebop <abebopare...@gmail.com>
> wrote:
>
>> I ran into the same issue processing 20TB of data, with 200k tasks on both
>> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
>> But this could/would be a major problem in cases where the data is bigger
>> or
>> the computation is heavier, since reducing the number of partitions may
>> not
>> be an option.
>>
>>
>> harelglik wrote
>> > I understand the error is because the number of partitions is very high,
>> > yet when processing 40 TB (and this number is expected to grow) this
>> > number
>> > seems reasonable:
>> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
>> > evenly distributed).
>> >
>> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov &lt;
>>
>> > vadim@
>>
>> > &gt; wrote:
>> >
>> >> You have too many partitions, so when the driver is trying to gather
>> >> the status of all map outputs and send back to executors it chokes on
>> >> the size of the structure that needs to be GZipped, and since it's
>> >> bigger than 2GiB, it produces OOM.
>> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman &lt;
>>
>> > harelglik@
>>
>> > &gt;
>> >> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
>> >> r3.2xlarge
>> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >> >
>> >> > It processes ~40 TB of data using aggregateByKey in which we specify
>> >> numPartitions = 300,000.
>> >> > Map side tasks succeed, but reduce side tasks all fail.
>> >> >
>> >> > We notice the following driver error:
>> >> >
>> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >> >
>> >> >  java.lang.OutOfMemoryError
>> >> >
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> >> > at
>> >>
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> > at java.lang.Thread.run(Thread.java:748)
>> >> > Exception in thread "map-output-dispatcher-0"
>> >> java.lang.OutOfMemoryError
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> >> > at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> >> > at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> >> > at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> >> > at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> >> > at
>> >>
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> > at java.lang.Thread.run(Thread.java:748)
>> >> > Suppressed: java.lang.OutOfMemoryError
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> >> > ... 6 more
>> >> >
>> >> > We found this reference (
>> >> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue
>> that
>> >> was closed in 2016.
>> >> >
>> >> > Please advise,
>> >> >
>> >> > Harel.
>> >> >
>> >> >
>> >>
>> >>
>> >> --
>> >> Sent from my iPhone
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

Reply via email to