This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8deb950f09c [SPARK-43150][SQL] Remove workaround for PARQUET-2160 8deb950f09c is described below commit 8deb950f09cd4e5224ed2e727245b5386e1dafdc Author: Cheng Pan <cheng...@apache.org> AuthorDate: Fri Apr 14 21:50:37 2023 -0700 [SPARK-43150][SQL] Remove workaround for PARQUET-2160 ### What changes were proposed in this pull request? Remove workaround(SPARK-41952) for [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160) ### Why are the changes needed? [SPARK-42926](https://issues.apache.org/jira/browse/SPARK-42926) upgraded Parquet to 1.13.0, which includes [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160). So we no longer need [SPARK-41952](https://issues.apache.org/jira/browse/SPARK-41952). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #40802 from pan3793/SPARK-43150. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../datasources/parquet/ParquetCodecFactory.java | 112 --------------------- .../parquet/SpecificParquetRecordReaderBase.java | 2 - 2 files changed, 114 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java deleted file mode 100644 index 2edbdc70da2..00000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.codec.ZstandardCodec; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround for memory issues encountered when reading from zstd-compressed files. For - * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> - * - * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160. - */ -public class ParquetCodecFactory extends CodecFactory { - - public ParquetCodecFactory(Configuration configuration, int pageSize) { - super(configuration, pageSize); - } - - /** - * Copied and modified from CodecFactory.HeapBytesDecompressor - */ - @SuppressWarnings("deprecation") - class HeapBytesDecompressor extends BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - HeapBytesDecompressor(CompressionCodecName codecName) { - this.codec = getCodec(codecName); - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - if (decompressor != null) { - decompressor.reset(); - } - InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - - if (codec instanceof ZstandardCodec) { - // We need to explicitly close the ZstdDecompressorStream here to release the resources - // it holds to avoid off-heap memory fragmentation issue, see PARQUET-2160. - // This change will load the decompressor stream into heap a little earlier, since the - // problem it solves only happens in the ZSTD codec, so this modification is only made - // for ZSTD streams. - decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); - is.close(); - } else { - decompressed = BytesInput.from(is, uncompressedSize); - } - } else { - decompressed = bytes; - } - return decompressed; - } - - @Override - public void decompress( - ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) - throws IOException { - ByteBuffer decompressed = - decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); - output.put(decompressed); - } - - @Override - public void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - @Override - @SuppressWarnings("deprecation") - protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - return new HeapBytesDecompressor(codecName); - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 678b287a5e3..8cefa589c0e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -96,7 +96,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ParquetReadOptions options = HadoopReadOptions .builder(configuration, file) .withRange(split.getStart(), split.getStart() + split.getLength()) - .withCodecFactory(new ParquetCodecFactory(configuration, 0)) .build(); ParquetFileReader fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); @@ -160,7 +159,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ParquetReadOptions options = HadoopReadOptions .builder(config, file) .withRange(0, length) - .withCodecFactory(new ParquetCodecFactory(config, 0)) .build(); ParquetFileReader fileReader = ParquetFileReader.open( HadoopInputFile.fromPath(file, config), options); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org