This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 177b63a  Basic Benchmarks for Iceberg Spark Data Source (#105)
177b63a is described below

commit 177b63a63face221278c6536835df8685811221a
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Jun 13 01:18:52 2019 +0300

    Basic Benchmarks for Iceberg Spark Data Source (#105)
    
    * Basic benchmarks for Iceberg Spark Data Source
    
    * Add benchmarks for Parquet readers/writers & restructure code
    
    * Minor style fixes
---
 build.gradle                                       |  20 +-
 gradle.properties                                  |   2 +
 .../apache/iceberg/spark/SparkBenchmarkUtil.java   |  57 ++++++
 .../SparkParquetReadersFlatDataBenchmark.java      | 215 +++++++++++++++++++++
 .../SparkParquetReadersNestedDataBenchmark.java    | 215 +++++++++++++++++++++
 .../SparkParquetWritersFlatDataBenchmark.java      | 123 ++++++++++++
 .../SparkParquetWritersNestedDataBenchmark.java    | 122 ++++++++++++
 .../org/apache/iceberg/spark/source/Action.java    |  25 +++
 .../spark/source/IcebergSourceBenchmark.java       | 182 +++++++++++++++++
 .../source/IcebergSourceFlatDataBenchmark.java     |  58 ++++++
 .../source/IcebergSourceNestedDataBenchmark.java   |  57 ++++++
 ...cebergSourceFlatParquetDataFilterBenchmark.java | 122 ++++++++++++
 .../IcebergSourceFlatParquetDataReadBenchmark.java | 153 +++++++++++++++
 ...IcebergSourceFlatParquetDataWriteBenchmark.java |  90 +++++++++
 ...bergSourceNestedParquetDataFilterBenchmark.java | 121 ++++++++++++
 ...cebergSourceNestedParquetDataReadBenchmark.java | 154 +++++++++++++++
 ...ebergSourceNestedParquetDataWriteBenchmark.java |  90 +++++++++
 17 files changed, 1805 insertions(+), 1 deletion(-)

diff --git a/build.gradle b/build.gradle
index eb5ae3d..fd0e1e3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,7 +21,8 @@ buildscript {
   repositories {
     jcenter()
     gradlePluginPortal()
-    maven { url  "http://palantir.bintray.com/releases"; }
+    maven { url "http://palantir.bintray.com/releases"; }
+    maven { url "https://plugins.gradle.org/m2/"; }
   }
   dependencies {
     classpath 'com.github.jengelman.gradle.plugins:shadow:5.0.0'
@@ -30,6 +31,7 @@ buildscript {
     classpath 'com.palantir.baseline:gradle-baseline-java:0.55.0'
     classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0'
     classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0'
+    classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8'
   }
 }
 
@@ -85,6 +87,7 @@ subprojects {
     scalaVersion = '2.11'
     sparkVersion = '2.4.0'
     caffeineVersion = "2.7.0"
+    jmhVersion = '1.21'
   }
 
   sourceCompatibility = '1.8'
@@ -162,6 +165,21 @@ configure(baselineProjects) {
   }
 }
 
+def jmhProjects = [ project("iceberg-spark") ]
+
+configure(jmhProjects) {
+  apply plugin: 'me.champeau.gradle.jmh'
+
+  jmh {
+    jmhVersion = jmhVersion
+    failOnError = true
+    forceGC = true
+    includeTests = true
+    humanOutputFile = file(jmhOutputPath)
+    include = [jmhIncludeRegex]
+  }
+}
+
 project(':iceberg-api') {
   dependencies {
     testCompile "org.apache.avro:avro:$avroVersion"
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..f2ff982
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,2 @@
+jmhOutputPath=build/reports/jmh/human-readable-output.txt
+jmhIncludeRegex=.*
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java 
b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java
new file mode 100644
index 0000000..6399bc7
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.StructType;
+import scala.collection.JavaConverters;
+
+public class SparkBenchmarkUtil {
+
+  private SparkBenchmarkUtil() {}
+
+  public static UnsafeProjection projection(Schema expectedSchema, Schema 
actualSchema) {
+    StructType struct = SparkSchemaUtil.convert(actualSchema);
+
+    List<AttributeReference> refs = 
JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
+    List<Attribute> attrs = 
Lists.newArrayListWithExpectedSize(struct.fields().length);
+    List<Expression> exprs = 
Lists.newArrayListWithExpectedSize(struct.fields().length);
+
+    for (AttributeReference ref : refs) {
+      attrs.add(ref.toAttribute());
+    }
+
+    for (Types.NestedField field : expectedSchema.columns()) {
+      int indexInIterSchema = struct.fieldIndex(field.name());
+      exprs.add(refs.get(indexInIterSchema));
+    }
+
+    return UnsafeProjection.create(
+        JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
+        JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
new file mode 100644
index 0000000..92fbdee
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading Parquet data with a 
flat schema using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark
+ *       
-PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersFlatDataBenchmark {
+
+  private static final DynMethods.UnboundMethod APPLY_PROJECTION = 
DynMethods.builder("apply")
+      .impl(UnsafeProjection.class, InternalRow.class)
+      .build();
+  private static final Schema SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      required(2, "intCol", Types.IntegerType.get()),
+      required(3, "floatCol", Types.FloatType.get()),
+      optional(4, "doubleCol", Types.DoubleType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(6, "dateCol", Types.DateType.get()),
+      optional(7, "timestampCol", Types.TimestampType.withZone()),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final Schema PROJECTED_SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final int NUM_RECORDS = 10000000;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+    List<GenericData.Record> records = RandomData.generateList(SCHEMA, 
NUM_RECORDS, 0L);
+    try (FileAppender<GenericData.Record> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .schema(SCHEMA)
+        .named("benchmark")
+        .build()) {
+      writer.addAll(records);
+    }
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackHole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws 
IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, 
SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", 
sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) 
throws IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          
APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, 
PROJECTED_SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws 
IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", 
sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
new file mode 100644
index 0000000..199d884
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading nested Parquet data 
using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark
+ *       
-PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersNestedDataBenchmark {
+
+  private static final DynMethods.UnboundMethod APPLY_PROJECTION = 
DynMethods.builder("apply")
+      .impl(UnsafeProjection.class, InternalRow.class)
+      .build();
+  private static final Schema SCHEMA = new Schema(
+      required(0, "id", Types.LongType.get()),
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get()),
+          required(2, "col2", Types.DoubleType.get()),
+          required(3, "col3", Types.LongType.get())
+      ))
+  );
+  private static final Schema PROJECTED_SCHEMA = new Schema(
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get())
+      ))
+  );
+  private static final int NUM_RECORDS = 10000000;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    dataFile = File.createTempFile("parquet-nested-data-benchmark", 
".parquet");
+    List<GenericData.Record> records = RandomData.generateList(SCHEMA, 
NUM_RECORDS, 0L);
+    try (FileAppender<GenericData.Record> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .schema(SCHEMA)
+        .named("benchmark")
+        .build()) {
+      writer.addAll(records);
+    }
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws 
IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, 
SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", 
sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) 
throws IOException {
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          
APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, 
PROJECTED_SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws 
IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+    try (CloseableIterable<InternalRow> rows = 
Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", 
sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
new file mode 100644
index 0000000..8cb5b07
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing Parquet data with a 
flat schema using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark
+ *       
-PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersFlatDataBenchmark {
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      required(2, "intCol", Types.IntegerType.get()),
+      required(3, "floatCol", Types.FloatType.get()),
+      optional(4, "doubleCol", Types.DoubleType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(6, "dateCol", Types.DateType.get()),
+      optional(7, "timestampCol", Types.TimestampType.withZone()),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final int NUM_RECORDS = 1000000;
+  private Iterable<InternalRow> rows;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+    dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingIcebergWriter() throws IOException {
+    try (FileAppender<InternalRow> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, 
msgType))
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingSparkWriter() throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (FileAppender<InternalRow> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .writeSupport(new ParquetWriteSupport())
+        .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+        .set("spark.sql.parquet.writeLegacyFormat", "false")
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
new file mode 100644
index 0000000..dd395f5
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data 
using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark
+ *       
-PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersNestedDataBenchmark {
+
+  private static final Schema SCHEMA = new Schema(
+      required(0, "id", Types.LongType.get()),
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get()),
+          required(2, "col2", Types.DoubleType.get()),
+          required(3, "col3", Types.LongType.get())
+      ))
+  );
+  private static final int NUM_RECORDS = 1000000;
+  private Iterable<InternalRow> rows;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+    dataFile = File.createTempFile("parquet-nested-data-benchmark", 
".parquet");
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingIcebergWriter() throws IOException {
+    try (FileAppender<InternalRow> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, 
msgType))
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingSparkWriter() throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (FileAppender<InternalRow> writer = 
Parquet.write(Files.localOutput(dataFile))
+        .writeSupport(new ParquetWriteSupport())
+        .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+        .set("spark.sql.parquet.writeLegacyFormat", "false")
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java
new file mode 100644
index 0000000..1820a80
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+@FunctionalInterface
+public interface Action {
+  void invoke();
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
new file mode 100644
index 0000000..1648ee7
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public abstract class IcebergSourceBenchmark {
+
+  private final Configuration hadoopConf = initHadoopConf();
+  private final Table table = initTable();
+  private SparkSession spark;
+
+  protected abstract Configuration initHadoopConf();
+
+  protected final Configuration hadoopConf() {
+    return hadoopConf;
+  }
+
+  protected abstract Table initTable();
+
+  protected final Table table() {
+    return table;
+  }
+
+  protected final SparkSession spark() {
+    return spark;
+  }
+
+  protected String newTableLocation() {
+    String tmpDir = hadoopConf.get("hadoop.tmp.dir");
+    Path tablePath = new Path(tmpDir, "spark-iceberg-table-" + 
UUID.randomUUID());
+    return tablePath.toString();
+  }
+
+  protected String dataLocation() {
+    Map<String, String> properties = table.properties();
+    return properties.getOrDefault(WRITE_NEW_DATA_LOCATION, 
String.format("%s/data", table.location()));
+  }
+
+  protected void cleanupFiles() throws IOException {
+    try (FileSystem fileSystem = FileSystem.get(hadoopConf)) {
+      Path dataPath = new Path(dataLocation());
+      fileSystem.delete(dataPath, true);
+      Path tablePath = new Path(table.location());
+      fileSystem.delete(tablePath, true);
+    }
+  }
+
+  protected void setupSpark() {
+    spark = SparkSession.builder()
+        .config("spark.ui.enabled", false)
+        .master("local")
+        .getOrCreate();
+    Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration();
+    hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), 
entry.getValue()));
+  }
+
+  protected void tearDownSpark() {
+    spark.stop();
+  }
+
+  protected void materialize(Dataset<?> ds) {
+    ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { });
+  }
+
+  protected void appendAsFile(Dataset<Row> ds) {
+    // ensure the schema is precise (including nullability)
+    StructType sparkSchema = SparkSchemaUtil.convert(table.schema());
+    spark.createDataFrame(ds.rdd(), sparkSchema)
+        .coalesce(1)
+        .write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(table.location());
+  }
+
+  protected void withSQLConf(Map<String, String> conf, Action action) {
+    SQLConf sqlConf = SQLConf.get();
+
+    Map<String, String> currentConfValues = new HashMap<>();
+    conf.keySet().forEach(confKey -> {
+      if (sqlConf.contains(confKey)) {
+        String currentConfValue = sqlConf.getConfString(confKey);
+        currentConfValues.put(confKey, currentConfValue);
+      }
+    });
+
+    conf.forEach((confKey, confValue) -> {
+      if (SQLConf.staticConfKeys().contains(confKey)) {
+        throw new RuntimeException("Cannot modify the value of a static 
config: " + confKey);
+      }
+      sqlConf.setConfString(confKey, confValue);
+    });
+
+    try {
+      action.invoke();
+    } finally {
+      conf.forEach((confKey, confValue) -> {
+        if (currentConfValues.containsKey(confKey)) {
+          sqlConf.setConfString(confKey, currentConfValues.get(confKey));
+        } else {
+          sqlConf.unsetConf(confKey);
+        }
+      });
+    }
+  }
+
+  protected void withTableProperties(Map<String, String> props, Action action) 
{
+    Map<String, String> tableProps = table.properties();
+    Map<String, String> currentPropValues = new HashMap<>();
+    props.keySet().forEach(propKey -> {
+      if (tableProps.containsKey(propKey)) {
+        String currentPropValue = tableProps.get(propKey);
+        currentPropValues.put(propKey, currentPropValue);
+      }
+    });
+
+    UpdateProperties updateProperties = table.updateProperties();
+    props.forEach(updateProperties::set);
+    updateProperties.commit();
+
+    try {
+      action.invoke();
+    } finally {
+      UpdateProperties restoreProperties = table.updateProperties();
+      props.forEach((propKey, propValue) -> {
+        if (currentPropValues.containsKey(propKey)) {
+          restoreProperties.set(propKey, currentPropValues.get(propKey));
+        } else {
+          restoreProperties.remove(propKey);
+        }
+      });
+      restoreProperties.commit();
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
new file mode 100644
index 0000000..72346a7
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ConfigProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class IcebergSourceFlatDataBenchmark extends 
IcebergSourceBenchmark {
+
+  @Override
+  protected Configuration initHadoopConf() {
+    Configuration conf = new Configuration();
+    conf.set(ConfigProperties.COMPRESS_METADATA, "true");
+    return conf;
+  }
+
+  @Override
+  protected final Table initTable() {
+    Schema schema = new Schema(
+        required(1, "longCol", Types.LongType.get()),
+        required(2, "intCol", Types.IntegerType.get()),
+        required(3, "floatCol", Types.FloatType.get()),
+        optional(4, "doubleCol", Types.DoubleType.get()),
+        optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+        optional(6, "dateCol", Types.DateType.get()),
+        optional(7, "timestampCol", Types.TimestampType.withZone()),
+        optional(8, "stringCol", Types.StringType.get()));
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    return tables.create(schema, partitionSpec, Maps.newHashMap(), 
newTableLocation());
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
new file mode 100644
index 0000000..bc54257
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ConfigProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class IcebergSourceNestedDataBenchmark extends 
IcebergSourceBenchmark {
+
+  @Override
+  protected Configuration initHadoopConf() {
+    Configuration conf = new Configuration();
+    conf.set(ConfigProperties.COMPRESS_METADATA, "true");
+    return conf;
+  }
+
+  @Override
+  protected final Table initTable() {
+    Schema schema = new Schema(
+        required(0, "id", Types.LongType.get()),
+        optional(4, "nested", Types.StructType.of(
+            required(1, "col1", Types.StringType.get()),
+            required(2, "col2", Types.DoubleType.get()),
+            required(3, "col3", Types.LongType.get())
+        ))
+    );
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    return tables.create(schema, partitionSpec, Maps.newHashMap(), 
newTableLocation());
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
new file mode 100644
index 0000000..1c91645
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+/**
+ * A benchmark that evaluates the file skipping capabilities in the Spark data 
source for Iceberg.
+ *
+ * This class uses a dataset with a flat schema, where the records are 
clustered according to the
+ * column used in the filter predicate.
+ *
+ * The performance is compared to the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceFlatParquetDataFilterBenchmark extends 
IcebergSourceFlatDataBenchmark {
+
+  private static final String FILTER_COND = "dateCol == 
date_add(current_date(), 1)";
+  private static final int NUM_FILES = 500;
+  private static final int NUM_ROWS = 10000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+    appendData();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = 
spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  private void appendData() {
+    for (int fileNum = 1; fileNum < NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("intCol", expr("CAST(longCol AS INT)"))
+          .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+          .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+          .withColumn("dateCol", date_add(current_date(), fileNum))
+          .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+          .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+      appendAsFile(df);
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
new file mode 100644
index 0000000..37a6336
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+/**
+ * A benchmark that evaluates the performance of reading Parquet data with a 
flat schema
+ * using Iceberg and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceFlatParquetDataReadBenchmark extends 
IcebergSourceFlatDataBenchmark {
+
+  private static final int NUM_FILES = 10;
+  private static final int NUM_ROWS = 1000000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+    appendData();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = 
spark().read().format("iceberg").load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).select("longCol");
+      materialize(df);
+    });
+  }
+
+  private void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("intCol", expr("CAST(longCol AS INT)"))
+          .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+          .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+          .withColumn("dateCol", date_add(current_date(), fileNum))
+          .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+          .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+      appendAsFile(df);
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
new file mode 100644
index 0000000..ab62f53
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.expr;
+
+/**
+ * A benchmark that evaluates the performance of writing Parquet data with a 
flat schema
+ * using Iceberg and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataWriteBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceFlatParquetDataWriteBenchmark extends 
IcebergSourceFlatDataBenchmark {
+
+  private static final int NUM_ROWS = 5000000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeIceberg() {
+    String tableLocation = table().location();
+    
benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeFileSource() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
+    withSQLConf(conf, () -> 
benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
+  }
+
+  private Dataset<Row> benchmarkData() {
+    return spark().range(NUM_ROWS)
+        .withColumnRenamed("id", "longCol")
+        .withColumn("intCol", expr("CAST(longCol AS INT)"))
+        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+        .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 
20))"))
+        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+        .coalesce(1);
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
new file mode 100644
index 0000000..c12f3ee
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the file skipping capabilities in the Spark data 
source for Iceberg.
+ *
+ * This class uses a dataset with nested data, where the records are clustered 
according to the
+ * column used in the filter predicate.
+ *
+ * The performance is compared to the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataFilterBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-filter-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedParquetDataFilterBenchmark extends 
IcebergSourceNestedDataBenchmark {
+
+  private static final String FILTER_COND = "nested.col3 == 0";
+  private static final int NUM_FILES = 500;
+  private static final int NUM_ROWS = 10000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+    appendData();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = 
spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  private void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn(
+              "nested",
+              struct(
+                  expr("CAST(id AS string) AS col1"),
+                  expr("CAST(id AS double) AS col2"),
+                  lit(fileNum).cast("long").as("col3")
+              ));
+      appendAsFile(df);
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
new file mode 100644
index 0000000..acba2e5
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of reading nested Parquet data 
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataReadBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-read-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedParquetDataReadBenchmark extends 
IcebergSourceNestedDataBenchmark {
+
+  private static final int NUM_FILES = 10;
+  private static final int NUM_ROWS = 1000000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+    appendData();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 
1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = 
spark().read().format("iceberg").load(tableLocation).selectExpr("nested.col3");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true");
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).selectExpr("nested.col3");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 
1024 * 1024));
+    conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true");
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = 
spark().read().parquet(dataLocation()).selectExpr("nested.col3");
+      materialize(df);
+    });
+  }
+
+  private void appendData() {
+    for (int fileNum = 0; fileNum < NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn(
+              "nested",
+              struct(
+                  expr("CAST(id AS string) AS col1"),
+                  expr("CAST(id AS double) AS col2"),
+                  lit(fileNum).cast("long").as("col3")
+              ));
+      appendAsFile(df);
+    }
+  }
+}
diff --git 
a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
new file mode 100644
index 0000000..0bd9861
--- /dev/null
+++ 
b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataWriteBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedParquetDataWriteBenchmark extends 
IcebergSourceNestedDataBenchmark {
+
+  private static final int NUM_ROWS = 5000000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeIceberg() {
+    String tableLocation = table().location();
+    
benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeFileSource() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
+    withSQLConf(conf, () -> 
benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
+  }
+
+  private Dataset<Row> benchmarkData() {
+    return spark().range(NUM_ROWS)
+        .withColumn(
+            "nested",
+            struct(
+                expr("CAST(id AS string) AS col1"),
+                expr("CAST(id AS double) AS col2"),
+                expr("id AS col3")
+            ))
+        .coalesce(1);
+  }
+}

Reply via email to