This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8deb950f09c [SPARK-43150][SQL] Remove workaround for PARQUET-2160
8deb950f09c is described below

commit 8deb950f09cd4e5224ed2e727245b5386e1dafdc
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Fri Apr 14 21:50:37 2023 -0700

    [SPARK-43150][SQL] Remove workaround for PARQUET-2160
    
    ### What changes were proposed in this pull request?
    
    Remove workaround(SPARK-41952) for 
[PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160)
    
    ### Why are the changes needed?
    
    [SPARK-42926](https://issues.apache.org/jira/browse/SPARK-42926) upgraded 
Parquet to 1.13.0, which includes 
[PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160). So we no 
longer need [SPARK-41952](https://issues.apache.org/jira/browse/SPARK-41952).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40802 from pan3793/SPARK-43150.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../datasources/parquet/ParquetCodecFactory.java   | 112 ---------------------
 .../parquet/SpecificParquetRecordReaderBase.java   |   2 -
 2 files changed, 114 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
deleted file mode 100644
index 2edbdc70da2..00000000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.codec.ZstandardCodec;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
-/**
- * This class implements a codec factory that is used when reading from 
Parquet. It adds a
- * workaround for memory issues encountered when reading from zstd-compressed 
files. For
- * details, see <a 
href="https://issues.apache.org/jira/browse/PARQUET-2160";>PARQUET-2160</a>
- *
- * TODO: Remove this workaround after upgrading Parquet which include 
PARQUET-2160.
- */
-public class ParquetCodecFactory extends CodecFactory {
-
-  public ParquetCodecFactory(Configuration configuration, int pageSize) {
-    super(configuration, pageSize);
-  }
-
-  /**
-   * Copied and modified from CodecFactory.HeapBytesDecompressor
-   */
-  @SuppressWarnings("deprecation")
-  class HeapBytesDecompressor extends BytesDecompressor {
-
-    private final CompressionCodec codec;
-    private final Decompressor decompressor;
-
-    HeapBytesDecompressor(CompressionCodecName codecName) {
-      this.codec = getCodec(codecName);
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
-    }
-
-    @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
-      final BytesInput decompressed;
-      if (codec != null) {
-        if (decompressor != null) {
-          decompressor.reset();
-        }
-        InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
-
-        if (codec instanceof ZstandardCodec) {
-          // We need to explicitly close the ZstdDecompressorStream here to 
release the resources
-          // it holds to avoid off-heap memory fragmentation issue, see 
PARQUET-2160.
-          // This change will load the decompressor stream into heap a little 
earlier, since the
-          // problem it solves only happens in the ZSTD codec, so this 
modification is only made
-          // for ZSTD streams.
-          decompressed = BytesInput.copy(BytesInput.from(is, 
uncompressedSize));
-          is.close();
-        } else {
-          decompressed = BytesInput.from(is, uncompressedSize);
-        }
-      } else {
-        decompressed = bytes;
-      }
-      return decompressed;
-    }
-
-    @Override
-    public void decompress(
-        ByteBuffer input, int compressedSize, ByteBuffer output, int 
uncompressedSize)
-        throws IOException {
-      ByteBuffer decompressed =
-          decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
-      output.put(decompressed);
-    }
-
-    @Override
-    public void release() {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
-  }
-
-  @Override
-  @SuppressWarnings("deprecation")
-  protected BytesDecompressor createDecompressor(CompressionCodecName 
codecName) {
-    return new HeapBytesDecompressor(codecName);
-  }
-}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 678b287a5e3..8cefa589c0e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -96,7 +96,6 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     ParquetReadOptions options = HadoopReadOptions
       .builder(configuration, file)
       .withRange(split.getStart(), split.getStart() + split.getLength())
-      .withCodecFactory(new ParquetCodecFactory(configuration, 0))
       .build();
     ParquetFileReader fileReader = new ParquetFileReader(
         HadoopInputFile.fromPath(file, configuration), options);
@@ -160,7 +159,6 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     ParquetReadOptions options = HadoopReadOptions
       .builder(config, file)
       .withRange(0, length)
-      .withCodecFactory(new ParquetCodecFactory(config, 0))
       .build();
     ParquetFileReader fileReader = ParquetFileReader.open(
       HadoopInputFile.fromPath(file, config), options);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to