Thanks for reporting such an issue.

Do you happen to have a heap dump when OOM happens? Than might help us to
identify which part causes huge memory usage/loss.

On Thu, Jan 14, 2021 at 8:35 AM Yuhong Cheng <[email protected]>
wrote:

> Hi Beam,
>
>  We are running a beam pipeline on spark, whose pipeline is mainly like
> this:
>
> pipeline
>
>    .apply(AvroIO
>
>        .readGenericRecords(schema)
>
>        .from(// a direction)
>
>    .apply(ParDo.of(
>
>        new DoFn<GenericRecord, SomeType>() {
>
>          @DoFn.ProcessElement
>
>          public void process(ProcessContext c) {
>
>           // convert to another type
>
>          }
>
>        }))
>
>    .setCoder(..)
>
>    .apply(// write to a file);
>
> It ran well before. But when we upgraded to beam 2.26. , the memory used
> by the job increased a lot and we met the GC limit exception:
>
> Container exited with a non-zero exit code 52. Error file: prelaunch.err.
>
> 13-01-2021 12:25:32 PST  INFO - Last 4096 bytes of prelaunch.err :
>
> 13-01-2021 12:25:32 PST  INFO - Last 4096 bytes of stderr :
>
> 13-01-2021 12:25:32 PST  INFO -
> e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.scheduler.Task.run(Task.scala:109)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> 13-01-2021 12:25:32 PST  INFO - at java.lang.Thread.run(Thread.java:748)
>
> 13-01-2021 12:25:32 PST  INFO - Caused by: java.lang.OutOfMemoryError: GC
> overhead limit exceeded
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.HashMap.newNode(HashMap.java:1747)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.HashMap.putVal(HashMap.java:642)
>
> 13-01-2021 12:25:32 PST  INFO - at java.util.HashMap.put(HashMap.java:612)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.addToMap(GenericDatumReader.java:275)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:256)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:647)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:212)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:487)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:258)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:347)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:312)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:298)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>
> 13-01-2021 12:25:32 PST  INFO - at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>
> 13-01-2021 12:25:32 PST  INFO - at
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>
> 13-01-2021 12:25:32 PST  INFO - at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>
> 13-01-2021 12:25:32 PST  INFO - at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
>
>
> Before upgrading the beam version, the peak jvm memory usage was less than
> 1G. After upgrading, the peak jvm memory usage can be around 6G which is
> too large for a simple job.
>
> We temporarily fix this problem by setting
> --experiments=use_deprecated_read, and guess `splitableDofn` is related to
> this huge memory usage.
>
> So we want to report this issue to you to see if we can have a better fix
> to the huge memory increase issue.
>
>
> Thank,
>
> Yuhong
>
>
>

Reply via email to