This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7ea1cd472 Add microbenchmark for IcebergScan operator serde roundtrip
(#3296)
7ea1cd472 is described below
commit 7ea1cd47233658c805f352ffc769bd043e243cd2
Author: Andy Grove <[email protected]>
AuthorDate: Wed Jan 28 07:37:08 2026 -0700
Add microbenchmark for IcebergScan operator serde roundtrip (#3296)
This benchmark measures the serialization/deserialization performance
of Iceberg FileScanTask objects to protobuf, starting from actual
Iceberg Java objects rather than pre-constructed protobuf messages.
The benchmark:
- Creates a real Iceberg table with configurable number of partitions
- Extracts FileScanTask objects through query planning
- Benchmarks conversion from FileScanTask to Protobuf
- Benchmarks serialization to bytes and deserialization
Usage:
make benchmark-org.apache.spark.sql.benchmark.CometOperatorSerdeBenchmark
-- 30000
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
.../benchmark/CometOperatorSerdeBenchmark.scala | 311 +++++++++++++++++++++
1 file changed, 311 insertions(+)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala
new file mode 100644
index 000000000..036d526a4
--- /dev/null
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.comet.{CometBatchScanExec,
CometIcebergNativeScanExec}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+
+import org.apache.comet.CometConf
+import org.apache.comet.serde.OperatorOuterClass
+import org.apache.comet.serde.operator.CometIcebergNativeScan
+
+/**
+ * Benchmark for operator serialization/deserialization roundtrip performance.
+ *
+ * This benchmark measures the time to serialize Iceberg FileScanTask objects
to protobuf,
+ * starting from actual Iceberg Java objects rather than pre-constructed
protobuf messages.
+ *
+ * To run this benchmark:
+ * {{{
+ * SPARK_GENERATE_BENCHMARK_FILES=1 make \
+ * benchmark-org.apache.spark.sql.benchmark.CometOperatorSerdeBenchmark
+ * }}}
+ *
+ * Results will be written to
"spark/benchmarks/CometOperatorSerdeBenchmark-**results.txt".
+ */
+object CometOperatorSerdeBenchmark extends CometBenchmarkBase {
+
+ // Check if Iceberg is available in classpath
+ private def icebergAvailable: Boolean = {
+ try {
+ Class.forName("org.apache.iceberg.catalog.Catalog")
+ true
+ } catch {
+ case _: ClassNotFoundException => false
+ }
+ }
+
+ // Helper to create temp directory for Iceberg warehouse
+ private def withTempIcebergDir(f: File => Unit): Unit = {
+ val dir = Files.createTempDirectory("comet-serde-benchmark").toFile
+ try {
+ f(dir)
+ } finally {
+ def deleteRecursively(file: File): Unit = {
+ if (file.isDirectory) {
+ Option(file.listFiles()).foreach(_.foreach(deleteRecursively))
+ }
+ file.delete()
+ }
+ deleteRecursively(dir)
+ }
+ }
+
+ /**
+ * Extracts CometIcebergNativeScanExec from a query plan, unwrapping AQE if
present.
+ */
+ private def extractIcebergNativeScanExec(
+ plan: SparkPlan): Option[CometIcebergNativeScanExec] = {
+ val unwrapped = plan match {
+ case aqe: AdaptiveSparkPlanExec => aqe.executedPlan
+ case other => other
+ }
+
+ def find(p: SparkPlan): Option[CometIcebergNativeScanExec] = {
+ p match {
+ case scan: CometIcebergNativeScanExec => Some(scan)
+ case _ => p.children.flatMap(find).headOption
+ }
+ }
+ find(unwrapped)
+ }
+
+ /**
+ * Reconstructs a CometBatchScanExec from CometIcebergNativeScanExec for
benchmarking the
+ * conversion process.
+ */
+ private def reconstructBatchScanExec(
+ nativeScan: CometIcebergNativeScanExec): CometBatchScanExec = {
+ CometBatchScanExec(
+ wrapped = nativeScan.originalPlan,
+ runtimeFilters = Seq.empty,
+ nativeIcebergScanMetadata = Some(nativeScan.nativeIcebergScanMetadata))
+ }
+
+ /**
+ * Creates an Iceberg table with the specified number of partitions. Each
partition contains one
+ * data file.
+ */
+ private def createPartitionedIcebergTable(
+ warehouseDir: File,
+ numPartitions: Int,
+ tableName: String = "serde_bench_table"): Unit = {
+ // Configure Hadoop catalog
+ spark.conf.set("spark.sql.catalog.bench_cat",
"org.apache.iceberg.spark.SparkCatalog")
+ spark.conf.set("spark.sql.catalog.bench_cat.type", "hadoop")
+ spark.conf.set("spark.sql.catalog.bench_cat.warehouse",
warehouseDir.getAbsolutePath)
+
+ val fullTableName = s"bench_cat.db.$tableName"
+
+ // Drop table if exists
+ spark.sql(s"DROP TABLE IF EXISTS $fullTableName")
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS bench_cat.db")
+
+ // Create partitioned Iceberg table
+ spark.sql(s"""
+ CREATE TABLE $fullTableName (
+ id BIGINT,
+ name STRING,
+ value DOUBLE,
+ partition_col INT
+ ) USING iceberg
+ PARTITIONED BY (partition_col)
+ TBLPROPERTIES (
+ 'format-version'='2',
+ 'write.parquet.compression-codec' = 'snappy'
+ )
+ """)
+
+ // Insert data to create the specified number of partitions
+ // Use a range to create unique partition values
+ // scalastyle:off println
+ println(s"Creating Iceberg table with $numPartitions partitions...")
+ // scalastyle:on println
+
+ // Insert in batches to avoid memory issues
+ val batchSize = 1000
+ var partitionsCreated = 0
+
+ while (partitionsCreated < numPartitions) {
+ val batchEnd = math.min(partitionsCreated + batchSize, numPartitions)
+ val partitionRange = partitionsCreated until batchEnd
+
+ // Create DataFrame with partition data
+ import spark.implicits._
+ val df = partitionRange
+ .map { p =>
+ (p.toLong, s"name_$p", p * 1.5, p)
+ }
+ .toDF("id", "name", "value", "partition_col")
+
+ df.writeTo(fullTableName).append()
+ partitionsCreated = batchEnd
+
+ if (partitionsCreated % 5000 == 0 || partitionsCreated == numPartitions)
{
+ // scalastyle:off println
+ println(s" Created $partitionsCreated / $numPartitions partitions")
+ // scalastyle:on println
+ }
+ }
+ }
+
+ /**
+ * Benchmarks the serialization of IcebergScan operator from FileScanTask
objects.
+ */
+ def icebergScanSerdeBenchmark(numPartitions: Int): Unit = {
+ if (!icebergAvailable) {
+ // scalastyle:off println
+ println("Iceberg not available in classpath, skipping benchmark")
+ // scalastyle:on println
+ return
+ }
+
+ withTempIcebergDir { warehouseDir =>
+ withSQLConf(
+ "spark.sql.catalog.bench_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.bench_cat.type" -> "hadoop",
+ "spark.sql.catalog.bench_cat.warehouse" ->
warehouseDir.getAbsolutePath,
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
+
+ // Create the partitioned table
+ createPartitionedIcebergTable(warehouseDir, numPartitions)
+
+ val fullTableName = "bench_cat.db.serde_bench_table"
+
+ // Plan a query to get the CometIcebergNativeScanExec with
FileScanTasks
+ val df = spark.sql(s"SELECT * FROM $fullTableName")
+ val plan = df.queryExecution.executedPlan
+
+ val nativeScanOpt = extractIcebergNativeScanExec(plan)
+
+ nativeScanOpt match {
+ case Some(nativeScan) =>
+ // Get metadata and tasks
+ val metadata = nativeScan.nativeIcebergScanMetadata
+ val tasks = metadata.tasks
+ // scalastyle:off println
+ println(s"Found ${tasks.size()} FileScanTasks")
+ // scalastyle:on println
+
+ // Reconstruct CometBatchScanExec for conversion benchmarking
+ val scanExec = reconstructBatchScanExec(nativeScan)
+
+ // Benchmark the serialization
+ val iterations = 100
+ val benchmark = new Benchmark(
+ s"IcebergScan serde ($numPartitions partitions, ${tasks.size()}
tasks)",
+ iterations,
+ output = output)
+
+ // Benchmark: Convert FileScanTasks to protobuf (the convert()
method)
+ benchmark.addCase("FileScanTask -> Protobuf (convert)") { _ =>
+ var i = 0
+ while (i < iterations) {
+ val builder = OperatorOuterClass.Operator.newBuilder()
+ CometIcebergNativeScan.convert(scanExec, builder)
+ i += 1
+ }
+ }
+
+ // Benchmark: Full roundtrip - convert to protobuf and serialize
to bytes
+ benchmark.addCase("FileScanTask -> Protobuf -> bytes") { _ =>
+ var i = 0
+ while (i < iterations) {
+ val builder = OperatorOuterClass.Operator.newBuilder()
+ val operatorOpt = CometIcebergNativeScan.convert(scanExec,
builder)
+ operatorOpt.foreach(_.toByteArray)
+ i += 1
+ }
+ }
+
+ // Get serialized bytes for deserialization benchmark
+ val builder = OperatorOuterClass.Operator.newBuilder()
+ val operatorOpt = CometIcebergNativeScan.convert(scanExec, builder)
+
+ operatorOpt match {
+ case Some(operator) =>
+ val serializedBytes = operator.toByteArray
+ val sizeKB = serializedBytes.length / 1024.0
+ val sizeMB = sizeKB / 1024.0
+
+ // scalastyle:off println
+ println(
+ s"Serialized IcebergScan size: ${f"$sizeKB%.1f"} KB
(${f"$sizeMB%.2f"} MB)")
+ // scalastyle:on println
+
+ // Benchmark: Deserialize from bytes
+ benchmark.addCase("bytes -> Protobuf (parseFrom)") { _ =>
+ var i = 0
+ while (i < iterations) {
+ OperatorOuterClass.Operator.parseFrom(serializedBytes)
+ i += 1
+ }
+ }
+
+ // Benchmark: Full roundtrip including deserialization
+ benchmark.addCase("Full roundtrip (convert + serialize +
deserialize)") { _ =>
+ var i = 0
+ while (i < iterations) {
+ val b = OperatorOuterClass.Operator.newBuilder()
+ val op = CometIcebergNativeScan.convert(scanExec, b)
+ op.foreach { o =>
+ val bytes = o.toByteArray
+ OperatorOuterClass.Operator.parseFrom(bytes)
+ }
+ i += 1
+ }
+ }
+
+ case None =>
+ // scalastyle:off println
+ println("WARNING: convert() returned None, cannot benchmark
serialization")
+ // scalastyle:on println
+ }
+
+ benchmark.run()
+
+ case None =>
+ // scalastyle:off println
+ println("WARNING: Could not find CometIcebergNativeScanExec in
query plan")
+ println(s"Plan:\n$plan")
+ // scalastyle:on println
+ }
+
+ // Cleanup
+ spark.sql(s"DROP TABLE IF EXISTS $fullTableName")
+ }
+ }
+ }
+
+ override def runCometBenchmark(args: Array[String]): Unit = {
+ val numPartitions = if (args.nonEmpty) args(0).toInt else 30000
+
+ runBenchmark("IcebergScan Operator Serde Benchmark") {
+ icebergScanSerdeBenchmark(numPartitions)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]