This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 13ea40c3d [CELEBORN-2272] Add LZ4TPCDSDataBenchmark
13ea40c3d is described below
commit 13ea40c3d086a483f9913628b80640591c223508
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 2 20:01:28 2026 +0800
[CELEBORN-2272] Add LZ4TPCDSDataBenchmark
### What changes were proposed in this pull request?
Add LZ4TPCDSDataBenchmark, use TPC-DS data to measure
compression/decompression perf.
### Why are the changes needed?
Provide benchmark reports to measure performance change when upgrading
lz4-java.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
See benchmark reports.
Closes #3613 from pan3793/lz4-benchmark.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../LZ4TPCDSDataBenchmark-jdk17-results.txt | 53 ++++++++++++
.../client/compress/LZ4TPCDSDataBenchmark.scala | 95 ++++++++++++++++++++++
.../client/compress/TPCDSDataBenchmark.scala | 66 +++++++++++++++
3 files changed, 214 insertions(+)
diff --git a/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt
b/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt
new file mode 100644
index 000000000..a6ce37aba
--- /dev/null
+++ b/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Benchmark LZ4CompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 65536 4 times 2131 2134
5 0.0 532707902.0 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 65536 4 times 536 541
9 0.0 133951799.0 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 262144 4 times 1754 1756
2 0.0 438523185.2 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 262144 4 times 436 439
4 0.0 109013659.0 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 1048576 4 times 1774 1780
9 0.0 443426664.3 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 1048576 4 times 431 434
4 0.0 107823243.0 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 4194304 4 times 1785 1791
8 0.0 446360006.8 1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 4194304 4 times 436 438
1 0.0 109117732.3 1.0X
+
+
diff --git
a/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
b/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
new file mode 100644
index 000000000..dde347d2a
--- /dev/null
+++
b/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.compress
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.celeborn.benchmark.Benchmark
+import org.apache.celeborn.common.CelebornConf
+
+/**
+ * Benchmark for LZ4 codec performance.
+ * To run this benchmark
+ * {{{
+ * 1. build/sbt "celeborn-client/test:runMain <this class>"
+ * 2. generate result:
+ * CELEBORN_GENERATE_BENCHMARK_FILES=1 build/sbt
"celeborn-client/test:runMain <this class>"
+ * Results will be written to
"benchmarks/LZ4TPCDSDataBenchmark-results.txt".
+ * }}}
+ */
+object LZ4TPCDSDataBenchmark extends TPCDSDataBenchmark {
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ prepareData()
+ runBenchmark("Benchmark LZ4CompressionCodec") {
+ Seq(
+ 64 * 1024 /* 64k */,
+ 256 * 1024 /* 256k */,
+ 1024 * 1024 /* 1m */,
+ 4 * 1024 * 1024 /* 4m */ ).foreach { chunkSize: Int =>
+ compressionBenchmark(chunkSize)
+ decompressionBenchmark(chunkSize)
+ }
+ }
+ }
+
+ private def compressionBenchmark(chunkSize: Int): Unit = {
+ val benchmark = new Benchmark("Compression", N, output = output)
+ val blockSize = new CelebornConf().clientPushBufferMaxSize
+ val lz4Compressor = new Lz4Compressor(blockSize)
+ benchmark.addCase(s"Compression with chunk size $chunkSize $N times") { _
=>
+ (1 until N).foreach { _ =>
+ var offset = 0
+ while (offset < data.length) {
+ val length = math.min(chunkSize, data.length - offset)
+ lz4Compressor.compress(data, offset, length)
+ offset += length
+ }
+ }
+ }
+ benchmark.run()
+ }
+
+ private def decompressionBenchmark(chunkSize: Int): Unit = {
+ val benchmark = new Benchmark("Decompression", N, output = output)
+ val blockSize = new CelebornConf().clientPushBufferMaxSize
+ val lz4Compressor = new Lz4Compressor(blockSize)
+ val compressedDataChunks = ArrayBuffer.empty[Array[Byte]]
+ var offset = 0
+ while (offset < data.length) {
+ val length = math.min(chunkSize, data.length - offset)
+ lz4Compressor.compress(data, offset, length)
+ val compressed = lz4Compressor.getCompressedBuffer
+ compressedDataChunks += util.Arrays.copyOf(compressed, compressed.length)
+ offset += length
+ }
+
+ val lz4Decompressor = new Lz4Decompressor(Option.empty)
+ val dst = new Array[Byte](chunkSize)
+ benchmark.addCase(s"Decompression with chunk size $chunkSize $N times") {
_ =>
+ (1 until N).foreach { _ =>
+ compressedDataChunks.foreach { compressedChunk =>
+ lz4Decompressor.decompress(compressedChunk, dst, 0)
+ }
+ }
+ }
+ benchmark.run()
+ }
+}
diff --git
a/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
b/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
new file mode 100644
index 000000000..11f9625e1
--- /dev/null
+++
b/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.compress
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.benchmark.BenchmarkBase
+
+/**
+ * TPC-DS data preparation:
+ * <p>
+ * 1. Follow https://github.com/databricks/tpcds-kit.git to set up tpcds-kit
+ * <p>
+ * 2. Create a folder and export environment variable SPARK_TPCDS_DATA_TEXT
+ * {{{
+ * mkdir -p /path/of/tpcds-sf1-text
+ * export SPARK_TPCDS_DATA_TEXT=/path/of/tpcds-sf1-text
+ * }}}
+ * <p>
+ * 3. Generate TPC-DS (SF1) text data under SPARK_TPCDS_DATA_TEXT
+ * {{{
+ * tpcds-kit/tools/dsdgen \
+ * -DISTRIBUTIONS tpcds-kit/tools/tpcds.idx \
+ * -SCALE 1 \
+ * -DIR $SPARK_TPCDS_DATA_TEXT
+ * }}}
+ */
+abstract class TPCDSDataBenchmark extends BenchmarkBase {
+
+ val N = 4
+
+ var data: Array[Byte] = _
+
+ protected def prepareData(): Unit = {
+ val tpcDsDataDir = sys.env.get("SPARK_TPCDS_DATA_TEXT")
+ require(tpcDsDataDir.nonEmpty, "Can not find env var
SPARK_TPCDS_DATA_TEXT")
+
+ val catalogSalesDatPath = Paths.get(tpcDsDataDir.get, "catalog_sales.dat")
+ require(
+ Files.exists(catalogSalesDatPath),
+ s"File $catalogSalesDatPath does not exists, " +
+ s"please follow instruction to generate the TPC-DS (SF1) text data
first.")
+
+ // the size of TPCDS catalog_sales.dat (SF1) is about 283M
+ data = Files.readAllBytes(catalogSalesDatPath)
+ }
+
+ override def afterAll(): Unit = {
+ data = null
+ }
+}