[ https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690886#comment-17690886 ]
ASF GitHub Bot commented on PARQUET-2160: ----------------------------------------- pan3793 commented on PR #982: URL: https://github.com/apache/parquet-mr/pull/982#issuecomment-1435989086 I also encountered this memory leak when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM, this patch can solve this problem. It's a critical issue in my case, wish the upstream publish a patched version so that we don't need to maintain the internal version. > Close decompression stream to free off-heap memory in time > ---------------------------------------------------------- > > Key: PARQUET-2160 > URL: https://issues.apache.org/jira/browse/PARQUET-2160 > Project: Parquet > Issue Type: Bug > Components: parquet-format > Affects Versions: 1.12.3 > Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni > 1.4.9.1 + glibc > Reporter: Yujiang Zhong > Priority: Blocker > > The decompressed stream in HeapBytesDecompressor$decompress now relies on the > JVM GC to close. When reading parquet in zstd compressed format, sometimes I > ran into OOM cause high off-heap usage. I think the reason is that the GC is > not timely and causes off-heap memory fragmentation. I had to set lower > MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. > There is a > [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] > of this zstd parquet issus in Iceberg community slack: some people had the > same problem. > I think maybe we can use ByteArrayBytesInput as decompressed bytes input and > close decompressed stream in time to solve this problem: > {code:java} > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.from(is, uncompressedSize); {code} > -> > {code:java} > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); > is.close(); {code} > After I made this change to decompress, I found off-heap memory is > significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.10#820010)