This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 90ee29992522 [SPARK-48518][CORE] Make LZF compression be able to run in parallel 90ee29992522 is described below commit 90ee299925220fa564c90e1f688a0d13ba0ac79d Author: Kent Yao <y...@apache.org> AuthorDate: Tue Jun 4 18:58:33 2024 +0800 [SPARK-48518][CORE] Make LZF compression be able to run in parallel ### What changes were proposed in this pull request? This PR introduced a config that turns on LZF compression to parallel mode via using PLZFOutputStream. FYI, https://github.com/ning/compress?tab=readme-ov-file#parallel-processing ### Why are the changes needed? Improve performance ``` [info] OpenJDK 64-Bit Server VM 17.0.10+0 on Mac OS X 14.5 [info] Apple M2 Max [info] Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ----------------------------------------------------------------------------------------------------------------------------- [info] Compression 1024 array values in 7 threads 12 13 1 0.1 11788.2 1.0X [info] Compression 1024 array values single-threaded 23 23 0 0.0 22512.7 0.5X ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? benchmark ### Was this patch authored or co-authored using generative AI tooling? no Closes #46858 from yaooqinn/SPARK-48518. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- core/benchmarks/LZFBenchmark-jdk21-results.txt | 19 +++++ core/benchmarks/LZFBenchmark-results.txt | 19 +++++ .../org/apache/spark/internal/config/package.scala | 7 ++ .../org/apache/spark/io/CompressionCodec.scala | 8 +- .../scala/org/apache/spark/io/LZFBenchmark.scala | 93 ++++++++++++++++++++++ docs/configuration.md | 8 ++ 6 files changed, 153 insertions(+), 1 deletion(-) diff --git a/core/benchmarks/LZFBenchmark-jdk21-results.txt b/core/benchmarks/LZFBenchmark-jdk21-results.txt new file mode 100644 index 000000000000..e1566f201a1f --- /dev/null +++ b/core/benchmarks/LZFBenchmark-jdk21-results.txt @@ -0,0 +1,19 @@ +================================================================================================ +Benchmark LZFCompressionCodec +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure +AMD EPYC 7763 64-Core Processor +Compress small objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +Compression 256000000 int values in parallel 598 600 2 428.2 2.3 1.0X +Compression 256000000 int values single-threaded 568 570 2 451.0 2.2 1.1X + +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure +AMD EPYC 7763 64-Core Processor +Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------- +Compression 1024 array values in 1 threads 39 45 5 0.0 38475.4 1.0X +Compression 1024 array values single-threaded 32 33 1 0.0 31154.5 1.2X + + diff --git a/core/benchmarks/LZFBenchmark-results.txt b/core/benchmarks/LZFBenchmark-results.txt new file mode 100644 index 000000000000..facc67f9cf4a --- /dev/null +++ b/core/benchmarks/LZFBenchmark-results.txt @@ -0,0 +1,19 @@ +================================================================================================ +Benchmark LZFCompressionCodec +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure +AMD EPYC 7763 64-Core Processor +Compress small objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +Compression 256000000 int values in parallel 602 612 6 425.1 2.4 1.0X +Compression 256000000 int values single-threaded 610 617 5 419.8 2.4 1.0X + +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure +AMD EPYC 7763 64-Core Processor +Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------- +Compression 1024 array values in 1 threads 35 43 6 0.0 33806.8 1.0X +Compression 1024 array values single-threaded 32 32 0 0.0 30990.4 1.1X + + diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dc3edfaa8613..a7268c640991 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2031,6 +2031,13 @@ package object config { .intConf .createWithDefault(1) + private[spark] val IO_COMPRESSION_LZF_PARALLEL = + ConfigBuilder("spark.io.compression.lzf.parallel.enabled") + .doc("When true, LZF compression will use multiple threads to compress data in parallel.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val IO_WARNING_LARGEFILETHRESHOLD = ConfigBuilder("spark.io.warning.largeFileThreshold") .internal() diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 07e694b6c5b0..7d5a86d1a81d 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -22,6 +22,7 @@ import java.util.Locale import com.github.luben.zstd.{NoPool, RecyclingBufferPool, ZstdInputStreamNoFinalizer, ZstdOutputStreamNoFinalizer} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import com.ning.compress.lzf.parallel.PLZFOutputStream import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} import net.jpountz.xxhash.XXHashFactory import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} @@ -170,9 +171,14 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { + private val parallelCompression = conf.get(IO_COMPRESSION_LZF_PARALLEL) override def compressedOutputStream(s: OutputStream): OutputStream = { - new LZFOutputStream(s).setFinishBlockOnFlush(true) + if (parallelCompression) { + new PLZFOutputStream(s) + } else { + new LZFOutputStream(s).setFinishBlockOnFlush(true) + } } override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s) diff --git a/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala b/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala new file mode 100644 index 000000000000..1934bd516970 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.io + +import java.io.{ByteArrayOutputStream, ObjectOutputStream} +import java.lang.management.ManagementFactory + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config.IO_COMPRESSION_LZF_PARALLEL + +/** + * Benchmark for ZStandard codec performance. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class <this class> <spark core test jar> + * 2. build/sbt "core/Test/runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>" + * Results will be written to "benchmarks/ZStandardBenchmark-results.txt". + * }}} + */ +object LZFBenchmark extends BenchmarkBase { + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Benchmark LZFCompressionCodec") { + compressSmallObjects() + compressLargeObjects() + } + } + + private def compressSmallObjects(): Unit = { + val N = 256_000_000 + val benchmark = new Benchmark("Compress small objects", N, output = output) + Seq(true, false).foreach { parallel => + val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, parallel) + val condition = if (parallel) "in parallel" else "single-threaded" + benchmark.addCase(s"Compression $N int values $condition") { _ => + val os = new LZFCompressionCodec(conf).compressedOutputStream(new ByteArrayOutputStream()) + for (i <- 1 until N) { + os.write(i) + } + os.close() + } + } + benchmark.run() + } + + private def compressLargeObjects(): Unit = { + val N = 1024 + val data: Array[Byte] = (1 until 128 * 1024 * 1024).map(_.toByte).toArray + val benchmark = new Benchmark(s"Compress large objects", N, output = output) + + // com.ning.compress.lzf.parallel.PLZFOutputStream.getNThreads + def getNThreads: Int = { + var nThreads = Runtime.getRuntime.availableProcessors + val jmx = ManagementFactory.getOperatingSystemMXBean + if (jmx != null) { + val loadAverage = jmx.getSystemLoadAverage.toInt + if (nThreads > 1 && loadAverage >= 1) nThreads = Math.max(1, nThreads - loadAverage) + } + nThreads + } + Seq(true, false).foreach { parallel => + val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, parallel) + val condition = if (parallel) s"in $getNThreads threads" else "single-threaded" + benchmark.addCase(s"Compression $N array values $condition") { _ => + val baos = new ByteArrayOutputStream() + val zcos = new LZFCompressionCodec(conf).compressedOutputStream(baos) + val oos = new ObjectOutputStream(zcos) + 1 to N foreach { _ => + oos.writeObject(data) + } + oos.close() + } + } + benchmark.run() + } +} diff --git a/docs/configuration.md b/docs/configuration.md index 409f1f521eb5..23443cab2eac 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1895,6 +1895,14 @@ Apart from these, the following properties are also available, and may be useful </td> <td>4.0.0</td> </tr> +<tr> + <td><code>spark.io.compression.lzf.parallel.enabled</code></td> + <td>false</td> + <td> + When true, LZF compression will use multiple threads to compress data in parallel. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.kryo.classesToRegister</code></td> <td>(none)</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org