This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90ee29992522 [SPARK-48518][CORE] Make LZF compression be able to run 
in parallel
90ee29992522 is described below

commit 90ee299925220fa564c90e1f688a0d13ba0ac79d
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Jun 4 18:58:33 2024 +0800

    [SPARK-48518][CORE] Make LZF compression be able to run in parallel
    
    ### What changes were proposed in this pull request?
    
    This PR introduced a config that turns on LZF compression to parallel mode 
via using PLZFOutputStream.
    
    FYI, https://github.com/ning/compress?tab=readme-ov-file#parallel-processing
    
    ### Why are the changes needed?
    
    Improve performance
    
    ```
    [info] OpenJDK 64-Bit Server VM 17.0.10+0 on Mac OS X 14.5
    [info] Apple M2 Max
    [info] Compress large objects:                        Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] 
-----------------------------------------------------------------------------------------------------------------------------
    [info] Compression 1024 array values in 7 threads                12         
    13           1          0.1       11788.2       1.0X
    [info] Compression 1024 array values single-threaded             23         
    23           0          0.0       22512.7       0.5X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    benchmark
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #46858 from yaooqinn/SPARK-48518.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 core/benchmarks/LZFBenchmark-jdk21-results.txt     | 19 +++++
 core/benchmarks/LZFBenchmark-results.txt           | 19 +++++
 .../org/apache/spark/internal/config/package.scala |  7 ++
 .../org/apache/spark/io/CompressionCodec.scala     |  8 +-
 .../scala/org/apache/spark/io/LZFBenchmark.scala   | 93 ++++++++++++++++++++++
 docs/configuration.md                              |  8 ++
 6 files changed, 153 insertions(+), 1 deletion(-)

diff --git a/core/benchmarks/LZFBenchmark-jdk21-results.txt 
b/core/benchmarks/LZFBenchmark-jdk21-results.txt
new file mode 100644
index 000000000000..e1566f201a1f
--- /dev/null
+++ b/core/benchmarks/LZFBenchmark-jdk21-results.txt
@@ -0,0 +1,19 @@
+================================================================================================
+Benchmark LZFCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
+AMD EPYC 7763 64-Core Processor
+Compress small objects:                           Best Time(ms)   Avg Time(ms) 
  Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------
+Compression 256000000 int values in parallel                598            600 
          2        428.2           2.3       1.0X
+Compression 256000000 int values single-threaded            568            570 
          2        451.0           2.2       1.1X
+
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
+AMD EPYC 7763 64-Core Processor
+Compress large objects:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Compression 1024 array values in 1 threads                39             45    
       5          0.0       38475.4       1.0X
+Compression 1024 array values single-threaded             32             33    
       1          0.0       31154.5       1.2X
+
+
diff --git a/core/benchmarks/LZFBenchmark-results.txt 
b/core/benchmarks/LZFBenchmark-results.txt
new file mode 100644
index 000000000000..facc67f9cf4a
--- /dev/null
+++ b/core/benchmarks/LZFBenchmark-results.txt
@@ -0,0 +1,19 @@
+================================================================================================
+Benchmark LZFCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
+AMD EPYC 7763 64-Core Processor
+Compress small objects:                           Best Time(ms)   Avg Time(ms) 
  Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------
+Compression 256000000 int values in parallel                602            612 
          6        425.1           2.4       1.0X
+Compression 256000000 int values single-threaded            610            617 
          5        419.8           2.4       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
+AMD EPYC 7763 64-Core Processor
+Compress large objects:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Compression 1024 array values in 1 threads                35             43    
       6          0.0       33806.8       1.0X
+Compression 1024 array values single-threaded             32             32    
       0          0.0       30990.4       1.1X
+
+
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index dc3edfaa8613..a7268c640991 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2031,6 +2031,13 @@ package object config {
       .intConf
       .createWithDefault(1)
 
+  private[spark] val IO_COMPRESSION_LZF_PARALLEL =
+    ConfigBuilder("spark.io.compression.lzf.parallel.enabled")
+      .doc("When true, LZF compression will use multiple threads to compress 
data in parallel.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val IO_WARNING_LARGEFILETHRESHOLD =
     ConfigBuilder("spark.io.warning.largeFileThreshold")
       .internal()
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 07e694b6c5b0..7d5a86d1a81d 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 
 import com.github.luben.zstd.{NoPool, RecyclingBufferPool, 
ZstdInputStreamNoFinalizer, ZstdOutputStreamNoFinalizer}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import com.ning.compress.lzf.parallel.PLZFOutputStream
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
 import net.jpountz.xxhash.XXHashFactory
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
@@ -170,9 +171,14 @@ class LZ4CompressionCodec(conf: SparkConf) extends 
CompressionCodec {
  */
 @DeveloperApi
 class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
+  private val parallelCompression = conf.get(IO_COMPRESSION_LZF_PARALLEL)
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    new LZFOutputStream(s).setFinishBlockOnFlush(true)
+    if (parallelCompression) {
+      new PLZFOutputStream(s)
+    } else {
+      new LZFOutputStream(s).setFinishBlockOnFlush(true)
+    }
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
LZFInputStream(s)
diff --git a/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala 
b/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala
new file mode 100644
index 000000000000..1934bd516970
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.io.{ByteArrayOutputStream, ObjectOutputStream}
+import java.lang.management.ManagementFactory
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config.IO_COMPRESSION_LZF_PARALLEL
+
+/**
+ * Benchmark for ZStandard codec performance.
+ * {{{
+ *   To run this benchmark:
+ *   1. without sbt: bin/spark-submit --class <this class> <spark core test 
jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/ZStandardBenchmark-results.txt".
+ * }}}
+ */
+object LZFBenchmark extends BenchmarkBase {
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Benchmark LZFCompressionCodec") {
+      compressSmallObjects()
+      compressLargeObjects()
+    }
+  }
+
+  private def compressSmallObjects(): Unit = {
+    val N = 256_000_000
+    val benchmark = new Benchmark("Compress small objects", N, output = output)
+    Seq(true, false).foreach { parallel =>
+      val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, 
parallel)
+      val condition = if (parallel) "in parallel" else "single-threaded"
+      benchmark.addCase(s"Compression $N int values $condition") { _ =>
+        val os = new LZFCompressionCodec(conf).compressedOutputStream(new 
ByteArrayOutputStream())
+        for (i <- 1 until N) {
+          os.write(i)
+        }
+        os.close()
+      }
+    }
+    benchmark.run()
+  }
+
+  private def compressLargeObjects(): Unit = {
+    val N = 1024
+    val data: Array[Byte] = (1 until 128 * 1024 * 1024).map(_.toByte).toArray
+    val benchmark = new Benchmark(s"Compress large objects", N, output = 
output)
+
+    // com.ning.compress.lzf.parallel.PLZFOutputStream.getNThreads
+    def getNThreads: Int = {
+      var nThreads = Runtime.getRuntime.availableProcessors
+      val jmx = ManagementFactory.getOperatingSystemMXBean
+      if (jmx != null)  {
+        val loadAverage = jmx.getSystemLoadAverage.toInt
+        if (nThreads > 1 && loadAverage >= 1)  nThreads = Math.max(1, nThreads 
- loadAverage)
+      }
+      nThreads
+    }
+    Seq(true, false).foreach { parallel =>
+      val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, 
parallel)
+      val condition = if (parallel) s"in $getNThreads threads" else 
"single-threaded"
+      benchmark.addCase(s"Compression $N array values $condition") { _ =>
+        val baos = new ByteArrayOutputStream()
+        val zcos = new LZFCompressionCodec(conf).compressedOutputStream(baos)
+        val oos = new ObjectOutputStream(zcos)
+        1 to N foreach { _ =>
+          oos.writeObject(data)
+        }
+        oos.close()
+      }
+    }
+    benchmark.run()
+  }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 409f1f521eb5..23443cab2eac 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1895,6 +1895,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
   <td>4.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.io.compression.lzf.parallel.enabled</code></td>
+  <td>false</td>
+  <td>
+    When true, LZF compression will use multiple threads to compress data in 
parallel.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.kryo.classesToRegister</code></td>
   <td>(none)</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to