This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1688a8768fb [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 1688a8768fb is described below commit 1688a8768fb34060548f8790e77f645027f65db2 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Mon Feb 20 09:40:44 2023 -0800 [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at https://github.com/apache/parquet-mr/pull/982 and https://github.com/apache/iceberg/pull/5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../datasources/parquet/ParquetCodecFactory.java | 112 +++++++++++++++++++++ .../parquet/SpecificParquetRecordReaderBase.java | 2 + 2 files changed, 114 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java new file mode 100644 index 00000000000..2edbdc70da2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * This class implements a codec factory that is used when reading from Parquet. It adds a + * workaround for memory issues encountered when reading from zstd-compressed files. For + * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> + * + * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160. + */ +public class ParquetCodecFactory extends CodecFactory { + + public ParquetCodecFactory(Configuration configuration, int pageSize) { + super(configuration, pageSize); + } + + /** + * Copied and modified from CodecFactory.HeapBytesDecompressor + */ + @SuppressWarnings("deprecation") + class HeapBytesDecompressor extends BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + HeapBytesDecompressor(CompressionCodecName codecName) { + this.codec = getCodec(codecName); + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + if (decompressor != null) { + decompressor.reset(); + } + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + + if (codec instanceof ZstandardCodec) { + // We need to explicitly close the ZstdDecompressorStream here to release the resources + // it holds to avoid off-heap memory fragmentation issue, see PARQUET-2160. + // This change will load the decompressor stream into heap a little earlier, since the + // problem it solves only happens in the ZSTD codec, so this modification is only made + // for ZSTD streams. + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + is.close(); + } else { + decompressed = BytesInput.from(is, uncompressedSize); + } + } else { + decompressed = bytes; + } + return decompressed; + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + ByteBuffer decompressed = + decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + @Override + public void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + @Override + @SuppressWarnings("deprecation") + protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { + return new HeapBytesDecompressor(codecName); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 8cefa589c0e..678b287a5e3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -96,6 +96,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ParquetReadOptions options = HadoopReadOptions .builder(configuration, file) .withRange(split.getStart(), split.getStart() + split.getLength()) + .withCodecFactory(new ParquetCodecFactory(configuration, 0)) .build(); ParquetFileReader fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); @@ -159,6 +160,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ParquetReadOptions options = HadoopReadOptions .builder(config, file) .withRange(0, length) + .withCodecFactory(new ParquetCodecFactory(config, 0)) .build(); ParquetFileReader fileReader = ParquetFileReader.open( HadoopInputFile.fromPath(file, config), options); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org