Thanks for reporting such an issue. Do you happen to have a heap dump when OOM happens? Than might help us to identify which part causes huge memory usage/loss.
On Thu, Jan 14, 2021 at 8:35 AM Yuhong Cheng <[email protected]> wrote: > Hi Beam, > > We are running a beam pipeline on spark, whose pipeline is mainly like > this: > > pipeline > > .apply(AvroIO > > .readGenericRecords(schema) > > .from(// a direction) > > .apply(ParDo.of( > > new DoFn<GenericRecord, SomeType>() { > > @DoFn.ProcessElement > > public void process(ProcessContext c) { > > // convert to another type > > } > > })) > > .setCoder(..) > > .apply(// write to a file); > > It ran well before. But when we upgraded to beam 2.26. , the memory used > by the job increased a lot and we met the GC limit exception: > > Container exited with a non-zero exit code 52. Error file: prelaunch.err. > > 13-01-2021 12:25:32 PST INFO - Last 4096 bytes of prelaunch.err : > > 13-01-2021 12:25:32 PST INFO - Last 4096 bytes of stderr : > > 13-01-2021 12:25:32 PST INFO - > e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.spark.scheduler.Task.run(Task.scala:109) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) > > 13-01-2021 12:25:32 PST INFO - at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > 13-01-2021 12:25:32 PST INFO - at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > 13-01-2021 12:25:32 PST INFO - at java.lang.Thread.run(Thread.java:748) > > 13-01-2021 12:25:32 PST INFO - Caused by: java.lang.OutOfMemoryError: GC > overhead limit exceeded > > 13-01-2021 12:25:32 PST INFO - at > java.util.HashMap.newNode(HashMap.java:1747) > > 13-01-2021 12:25:32 PST INFO - at > java.util.HashMap.putVal(HashMap.java:642) > > 13-01-2021 12:25:32 PST INFO - at java.util.HashMap.put(HashMap.java:612) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.addToMap(GenericDatumReader.java:275) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:256) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:647) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:212) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:487) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:258) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:347) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:312) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:298) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141) > > 13-01-2021 12:25:32 PST INFO - at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136) > > 13-01-2021 12:25:32 PST INFO - at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > > 13-01-2021 12:25:32 PST INFO - at > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > > 13-01-2021 12:25:32 PST INFO - at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > > 13-01-2021 12:25:32 PST INFO - at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30) > > > Before upgrading the beam version, the peak jvm memory usage was less than > 1G. After upgrading, the peak jvm memory usage can be around 6G which is > too large for a simple job. > > We temporarily fix this problem by setting > --experiments=use_deprecated_read, and guess `splitableDofn` is related to > this huge memory usage. > > So we want to report this issue to you to see if we can have a better fix > to the huge memory increase issue. > > > Thank, > > Yuhong > > >
