This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b52fbee  [SPARK-36669][SQL] Add Lz4 wrappers for Hadoop Lz4 codec
b52fbee is described below

commit b52fbeee2d3b8a048a89f1ca8994d4b62bad92e3
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Thu Sep 9 09:31:00 2021 -0700

    [SPARK-36669][SQL] Add Lz4 wrappers for Hadoop Lz4 codec
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to add a few LZ4 wrapper classes for Parquet Lz4 
compression output that uses Hadoop Lz4 codec.
    
    ### Why are the changes needed?
    
    Currently we use Hadop 3.3.1's shaded client libraries. Lz4 is a provided 
dependency in Hadoop Common 3.3.1 for Lz4Codec. But it isn't excluded from 
relocation in these libraries. So to use lz4 as Parquet codec, we will hit the 
exception even we include lz4 as dependency.
    
    ```
    [info]   Cause: java.lang.NoClassDefFoundError: 
org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory
    [info]   at 
org.apache.hadoop.io.compress.lz4.Lz4Compressor.<init>(Lz4Compressor.java:66)
    [info]   at 
org.apache.hadoop.io.compress.Lz4Codec.createCompressor(Lz4Codec.java:119)
    [info]   at 
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
    [info]   at 
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
    ```
    
    Before the issue is fixed at Hadoop new release, we can add a few wrapper 
classes for Lz4 codec.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modified test.
    
    Closes #33940 from viirya/lz4-wrappers.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
    (cherry picked from commit 6bcf3301915b296205180f0aae721e8dade5b7e7)
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../shaded/net/jpountz/lz4/LZ4Compressor.java      | 37 ++++++++++++++++
 .../hadoop/shaded/net/jpountz/lz4/LZ4Factory.java  | 49 ++++++++++++++++++++++
 .../net/jpountz/lz4/LZ4SafeDecompressor.java       | 36 ++++++++++++++++
 .../test/scala/org/apache/spark/FileSuite.scala    |  8 ++--
 .../datasources/FileSourceCodecSuite.scala         |  5 +--
 5 files changed, 128 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java
 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java
new file mode 100644
index 0000000..092ed59
--- /dev/null
+++ 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.shaded.net.jpountz.lz4;
+
+/**
+ * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove 
this after
+ * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client 
libraries.
+ * This does not need implement all net.jpountz.lz4.LZ4Compressor API, just 
the ones used
+ * by Hadoop Lz4Compressor.
+ */
+public final class LZ4Compressor {
+
+  private net.jpountz.lz4.LZ4Compressor lz4Compressor;
+
+  public LZ4Compressor(net.jpountz.lz4.LZ4Compressor lz4Compressor) {
+    this.lz4Compressor = lz4Compressor;
+  }
+
+  public void compress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
+    lz4Compressor.compress(src, dest);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java
new file mode 100644
index 0000000..61829b2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.shaded.net.jpountz.lz4;
+
+/**
+ * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove 
this after
+ * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client 
libraries.
+ * This does not need implement all net.jpountz.lz4.LZ4Factory API, just the 
ones used by
+ * Hadoop Lz4Compressor.
+ */
+public final class LZ4Factory {
+
+  private net.jpountz.lz4.LZ4Factory lz4Factory;
+
+  public LZ4Factory(net.jpountz.lz4.LZ4Factory lz4Factory) {
+    this.lz4Factory = lz4Factory;
+  }
+
+  public static LZ4Factory fastestInstance() {
+    return new LZ4Factory(net.jpountz.lz4.LZ4Factory.fastestInstance());
+  }
+
+  public LZ4Compressor highCompressor() {
+    return new LZ4Compressor(lz4Factory.highCompressor());
+  }
+
+  public LZ4Compressor fastCompressor() {
+    return new LZ4Compressor(lz4Factory.fastCompressor());
+  }
+
+  public LZ4SafeDecompressor safeDecompressor() {
+    return new LZ4SafeDecompressor(lz4Factory.safeDecompressor());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java
 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java
new file mode 100644
index 0000000..cd3dd6f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.shaded.net.jpountz.lz4;
+
+/**
+ * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove 
this after
+ * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client 
libraries.
+ * This does not need implement all net.jpountz.lz4.LZ4SafeDecompressor API, 
just the ones
+ * used by Hadoop Lz4Decompressor.
+ */
+public final class LZ4SafeDecompressor {
+  private net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor;
+
+  public LZ4SafeDecompressor(net.jpountz.lz4.LZ4SafeDecompressor 
lz4Decompressor) {
+    this.lz4Decompressor = lz4Decompressor;
+  }
+
+  public void decompress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
+    lz4Decompressor.decompress(src, dest);
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 67a9764..1202284 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -28,7 +28,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io._
-import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, 
DefaultCodec}
+import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, 
DefaultCodec, Lz4Codec}
 import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, 
JobConf, TextInputFormat, TextOutputFormat}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, 
TextInputFormat => NewTextInputFormat}
@@ -136,9 +136,9 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   // Hadoop "gzip" and "zstd" codecs require native library installed for 
sequence files
-  // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
-  Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach {
-    case (codec, codecName) =>
+  // "snappy" codec does not work due to SPARK-36681.
+  Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new 
Lz4Codec(), "lz4"))
+    .foreach { case (codec, codecName) =>
       runSequenceFileCodecTest(codec, codecName)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 92b887e..3c226d6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -56,13 +56,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
   override def format: String = "parquet"
   override val codecConfigName: String = SQLConf.PARQUET_COMPRESSION.key
   // Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
-  // TODO(SPARK-36669): "lz4" codec fails due to HADOOP-17891.
   override protected def availableCodecs: Seq[String] =
     if (System.getProperty("os.arch") == "aarch64") {
       // Exclude "brotli" due to PARQUET-1975.
-      Seq("none", "uncompressed", "snappy", "gzip", "zstd")
+      Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd")
     } else {
-      Seq("none", "uncompressed", "snappy", "gzip", "brotli", "zstd")
+      Seq("none", "uncompressed", "snappy", "lz4", "gzip", "brotli", "zstd")
     }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to