This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new f6aa1aff0 [CELEBORN-2272] Add LZ4TPCDSDataBenchmark
f6aa1aff0 is described below

commit f6aa1aff08dfc586ff3316b12f723ebc8382ac5c
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 2 20:01:28 2026 +0800

    [CELEBORN-2272] Add LZ4TPCDSDataBenchmark
    
    ### What changes were proposed in this pull request?
    
    Add LZ4TPCDSDataBenchmark, use TPC-DS data to measure 
compression/decompression perf.
    
    ### Why are the changes needed?
    
    Provide benchmark reports to measure performance change when upgrading 
lz4-java.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    See benchmark reports.
    
    Closes #3613 from pan3793/lz4-benchmark.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 13ea40c3d086a483f9913628b80640591c223508)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../LZ4TPCDSDataBenchmark-jdk17-results.txt        | 53 ++++++++++++
 .../client/compress/LZ4TPCDSDataBenchmark.scala    | 95 ++++++++++++++++++++++
 .../client/compress/TPCDSDataBenchmark.scala       | 66 +++++++++++++++
 3 files changed, 214 insertions(+)

diff --git a/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt 
b/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt
new file mode 100644
index 000000000..a6ce37aba
--- /dev/null
+++ b/client/benchmarks/LZ4TPCDSDataBenchmark-jdk17-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Benchmark LZ4CompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression:                               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 65536 4 times           2131           2134        
   5          0.0   532707902.0       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression:                               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 65536 4 times            536            541      
     9          0.0   133951799.0       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 262144 4 times           1754           1756       
    2          0.0   438523185.2       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 262144 4 times            436            439     
      4          0.0   109013659.0       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 1048576 4 times           1774           1780      
     9          0.0   443426664.3       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 1048576 4 times            431            434    
       4          0.0   107823243.0       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Compression:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+Compression with chunk size 4194304 4 times           1785           1791      
     8          0.0   446360006.8       1.0X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.17.9-76061709-generic
+Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
+Decompression:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+Decompression with chunk size 4194304 4 times            436            438    
       1          0.0   109117732.3       1.0X
+
+
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
 
b/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
new file mode 100644
index 000000000..dde347d2a
--- /dev/null
+++ 
b/client/src/test/scala/org/apache/celeborn/client/compress/LZ4TPCDSDataBenchmark.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.compress
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.celeborn.benchmark.Benchmark
+import org.apache.celeborn.common.CelebornConf
+
+/**
+ * Benchmark for LZ4 codec performance.
+ * To run this benchmark
+ * {{{
+ *   1. build/sbt "celeborn-client/test:runMain <this class>"
+ *   2. generate result:
+ *      CELEBORN_GENERATE_BENCHMARK_FILES=1 build/sbt 
"celeborn-client/test:runMain <this class>"
+ *      Results will be written to 
"benchmarks/LZ4TPCDSDataBenchmark-results.txt".
+ * }}}
+ */
+object LZ4TPCDSDataBenchmark extends TPCDSDataBenchmark {
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    prepareData()
+    runBenchmark("Benchmark LZ4CompressionCodec") {
+      Seq(
+        64 * 1024 /* 64k */,
+        256 * 1024 /* 256k */,
+        1024 * 1024 /* 1m */,
+        4 * 1024 * 1024 /* 4m */ ).foreach { chunkSize: Int =>
+        compressionBenchmark(chunkSize)
+        decompressionBenchmark(chunkSize)
+      }
+    }
+  }
+
+  private def compressionBenchmark(chunkSize: Int): Unit = {
+    val benchmark = new Benchmark("Compression", N, output = output)
+    val blockSize = new CelebornConf().clientPushBufferMaxSize
+    val lz4Compressor = new Lz4Compressor(blockSize)
+    benchmark.addCase(s"Compression with chunk size $chunkSize $N times") { _ 
=>
+      (1 until N).foreach { _ =>
+        var offset = 0
+        while (offset < data.length) {
+          val length = math.min(chunkSize, data.length - offset)
+          lz4Compressor.compress(data, offset, length)
+          offset += length
+        }
+      }
+    }
+    benchmark.run()
+  }
+
+  private def decompressionBenchmark(chunkSize: Int): Unit = {
+    val benchmark = new Benchmark("Decompression", N, output = output)
+    val blockSize = new CelebornConf().clientPushBufferMaxSize
+    val lz4Compressor = new Lz4Compressor(blockSize)
+    val compressedDataChunks = ArrayBuffer.empty[Array[Byte]]
+    var offset = 0
+    while (offset < data.length) {
+      val length = math.min(chunkSize, data.length - offset)
+      lz4Compressor.compress(data, offset, length)
+      val compressed = lz4Compressor.getCompressedBuffer
+      compressedDataChunks += util.Arrays.copyOf(compressed, compressed.length)
+      offset += length
+    }
+
+    val lz4Decompressor = new Lz4Decompressor(Option.empty)
+    val dst = new Array[Byte](chunkSize)
+    benchmark.addCase(s"Decompression with chunk size $chunkSize $N times") { 
_ =>
+      (1 until N).foreach { _ =>
+        compressedDataChunks.foreach { compressedChunk =>
+          lz4Decompressor.decompress(compressedChunk, dst, 0)
+        }
+      }
+    }
+    benchmark.run()
+  }
+}
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
 
b/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
new file mode 100644
index 000000000..11f9625e1
--- /dev/null
+++ 
b/client/src/test/scala/org/apache/celeborn/client/compress/TPCDSDataBenchmark.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.compress
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.benchmark.BenchmarkBase
+
+/**
+ * TPC-DS data preparation:
+ * <p>
+ * 1. Follow https://github.com/databricks/tpcds-kit.git to set up tpcds-kit
+ * <p>
+ * 2. Create a folder and export environment variable SPARK_TPCDS_DATA_TEXT
+ * {{{
+ * mkdir -p /path/of/tpcds-sf1-text
+ * export SPARK_TPCDS_DATA_TEXT=/path/of/tpcds-sf1-text
+ * }}}
+ * <p>
+ * 3. Generate TPC-DS (SF1) text data under SPARK_TPCDS_DATA_TEXT
+ * {{{
+ * tpcds-kit/tools/dsdgen \
+ *   -DISTRIBUTIONS tpcds-kit/tools/tpcds.idx \
+ *   -SCALE 1 \
+ *   -DIR $SPARK_TPCDS_DATA_TEXT
+ * }}}
+ */
+abstract class TPCDSDataBenchmark extends BenchmarkBase {
+
+  val N = 4
+
+  var data: Array[Byte] = _
+
+  protected def prepareData(): Unit = {
+    val tpcDsDataDir = sys.env.get("SPARK_TPCDS_DATA_TEXT")
+    require(tpcDsDataDir.nonEmpty, "Can not find env var 
SPARK_TPCDS_DATA_TEXT")
+
+    val catalogSalesDatPath = Paths.get(tpcDsDataDir.get, "catalog_sales.dat")
+    require(
+      Files.exists(catalogSalesDatPath),
+      s"File $catalogSalesDatPath does not exists, " +
+        s"please follow instruction to generate the TPC-DS (SF1) text data 
first.")
+
+    // the size of TPCDS catalog_sales.dat (SF1) is about 283M
+    data = Files.readAllBytes(catalogSalesDatPath)
+  }
+
+  override def afterAll(): Unit = {
+    data = null
+  }
+}

Reply via email to