This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d8f428e29 Iceberg Add Files (#37701)
b6d8f428e29 is described below

commit b6d8f428e2966468abfababc03440f82a1cb8e59
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Mar 25 10:44:45 2026 -0400

    Iceberg Add Files (#37701)
    
    * add files transform and schematransform
    
    * minor fixes
    
    * add tests
    
    * create table if needed; determine partition spec from metrics
    
    * spotless
    
    * add batch route as well
    
    * spotless
    
    * extract SchemaTransform logic out
    
    * add integration tests
    
    * spotless
    
    * spotless
    
    * fix deps
    
    * add a few more tests; add error handling output
    
    * clarify comments
    
    * add comment
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 sdks/java/io/iceberg/build.gradle                  |   7 +-
 .../org/apache/beam/sdk/io/iceberg/AddFiles.java   | 671 +++++++++++++++++++++
 .../iceberg/AddFilesSchemaTransformProvider.java   | 190 ++++++
 .../apache/beam/sdk/io/iceberg/PartitionUtils.java |   5 +-
 .../org/apache/beam/sdk/io/iceberg/ReadUtils.java  |  30 +
 .../org/apache/beam/sdk/io/iceberg/AddFilesIT.java | 536 ++++++++++++++++
 .../apache/beam/sdk/io/iceberg/AddFilesTest.java   | 655 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/parquet/ParquetIO.java  |   4 +-
 9 files changed, 2095 insertions(+), 5 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 4
+    "modification": 1
 }
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index a1f352d0530..bbd55fee2fc 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -51,6 +51,8 @@ dependencies {
     implementation library.java.joda_time
     implementation "org.apache.parquet:parquet-column:$parquet_version"
     implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
+    implementation "org.apache.parquet:parquet-common:$parquet_version"
+    implementation project(":sdks:java:io:parquet")
     implementation "org.apache.orc:orc-core:$orc_version"
     implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
     implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
@@ -74,11 +76,13 @@ dependencies {
     testImplementation library.java.bigdataoss_gcsio
     testImplementation library.java.bigdataoss_util_hadoop
     testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
-    testImplementation "org.apache.parquet:parquet-common:$parquet_version"
     testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
     testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
     testImplementation 
project(":sdks:java:extensions:google-cloud-platform-core")
     testImplementation library.java.junit
+    testImplementation library.java.hamcrest
+    testImplementation project(path: ":sdks:java:extensions:avro")
+    testImplementation 'org.awaitility:awaitility:4.2.0'
 
     // Hive catalog test dependencies
     testImplementation project(path: ":sdks:java:io:iceberg:hive")
@@ -95,6 +99,7 @@ dependencies {
     testImplementation project(path: ":sdks:java:io:iceberg:bqms", 
configuration: "shadow")
     testImplementation project(":sdks:java:io:google-cloud-platform")
     testImplementation library.java.google_api_services_bigquery
+    testImplementation 'com.google.cloud:google-cloud-storage'
 
     testImplementation library.java.google_auth_library_oauth2_http
     testRuntimeOnly library.java.slf4j_jdk14
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
new file mode 100644
index 00000000000..4a164700099
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES;
+import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS;
+import static org.apache.beam.sdk.metrics.Metrics.counter;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hasher;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A transform that takes in a stream of file paths, converts them to Iceberg 
{@link DataFile}s with
+ * partition metadata and metrics, then commits them to an Iceberg {@link 
Table}.
+ */
+public class AddFiles extends PTransform<PCollection<String>, 
PCollectionRowTuple> {
+  static final String OUTPUT_TAG = "snapshots";
+  static final String ERROR_TAG = "errors";
+  private static final Duration DEFAULT_TRIGGER_INTERVAL = 
Duration.standardMinutes(10);
+  private static final Counter numFilesAdded = counter(AddFiles.class, 
"numFilesAdded");
+  private static final Counter numErrorFiles = counter(AddFiles.class, 
"numErrorFiles");
+  private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class);
+  private static final int DEFAULT_FILES_TRIGGER = 1_000;
+  static final Schema ERROR_SCHEMA =
+      Schema.builder().addStringField("file").addStringField("error").build();
+  private final IcebergCatalogConfig catalogConfig;
+  private final String tableIdentifier;
+  private final Duration intervalTrigger;
+  private final int numFilesTrigger;
+  private final @Nullable String locationPrefix;
+  private final @Nullable List<String> partitionFields;
+  private final @Nullable Map<String, String> tableProps;
+
+  public AddFiles(
+      IcebergCatalogConfig catalogConfig,
+      String tableIdentifier,
+      @Nullable String locationPrefix,
+      @Nullable List<String> partitionFields,
+      @Nullable Map<String, String> tableProps,
+      @Nullable Integer numFilesTrigger,
+      @Nullable Duration intervalTrigger) {
+    this.catalogConfig = catalogConfig;
+    this.tableIdentifier = tableIdentifier;
+    this.partitionFields = partitionFields;
+    this.tableProps = tableProps;
+    this.intervalTrigger = intervalTrigger != null ? intervalTrigger : 
DEFAULT_TRIGGER_INTERVAL;
+    this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : 
DEFAULT_FILES_TRIGGER;
+    this.locationPrefix = locationPrefix;
+  }
+
+  @Override
+  public PCollectionRowTuple expand(PCollection<String> input) {
+    LOG.info(
+        "AddFiles configured to commit after accumulating {} files, or after 
{} seconds.",
+        numFilesTrigger,
+        intervalTrigger.getStandardSeconds());
+    if (!Strings.isNullOrEmpty(locationPrefix)) {
+      LOG.info(
+          "AddFiles configured to build partition metadata after the prefix: 
'{}'", locationPrefix);
+    }
+
+    PCollectionTuple dataFiles =
+        input.apply(
+            "ConvertToDataFiles",
+            ParDo.of(
+                    new ConvertToDataFile(
+                        catalogConfig,
+                        tableIdentifier,
+                        locationPrefix,
+                        partitionFields,
+                        tableProps))
+                .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS)));
+    SchemaCoder<SerializableDataFile> sdfSchema;
+    try {
+      sdfSchema = 
SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    PCollection<KV<Void, SerializableDataFile>> keyedFiles =
+        dataFiles
+            .get(DATA_FILES)
+            .setCoder(sdfSchema)
+            .apply("AddStaticKey", WithKeys.of((Void) null));
+
+    PCollection<KV<Void, Iterable<SerializableDataFile>>> groupedFiles =
+        keyedFiles.isBounded().equals(BOUNDED)
+            ? keyedFiles.apply(GroupByKey.create())
+            : keyedFiles.apply(
+                GroupIntoBatches.<Void, 
SerializableDataFile>ofSize(numFilesTrigger)
+                    .withMaxBufferingDuration(intervalTrigger));
+
+    PCollection<Row> snapshots =
+        groupedFiles
+            .apply(
+                "CommitFilesToIceberg",
+                ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier)))
+            .setRowSchema(SnapshotInfo.getSchema());
+
+    return PCollectionRowTuple.of(
+        OUTPUT_TAG, snapshots, ERROR_TAG, 
dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA));
+  }
+
+  static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
+    private final IcebergCatalogConfig catalogConfig;
+    private final String identifier;
+    public static final TupleTag<Row> ERRORS = new TupleTag<>();
+    public static final TupleTag<SerializableDataFile> DATA_FILES = new 
TupleTag<>();
+    private final @Nullable String prefix;
+    private final @Nullable List<String> partitionFields;
+    private final @Nullable Map<String, String> tableProps;
+    private transient @MonotonicNonNull Table table;
+    // Limit open readers to avoid blowing up memory on one worker
+    private static final int MAX_READERS = 10;
+    private static final Semaphore ACTIVE_READERS = new Semaphore(MAX_READERS);
+
+    public ConvertToDataFile(
+        IcebergCatalogConfig catalogConfig,
+        String identifier,
+        @Nullable String prefix,
+        @Nullable List<String> partitionFields,
+        @Nullable Map<String, String> tableProps) {
+      this.catalogConfig = catalogConfig;
+      this.identifier = identifier;
+      this.prefix = prefix;
+      this.partitionFields = partitionFields;
+      this.tableProps = tableProps;
+    }
+
+    static final String PREFIX_ERROR = "File path did not start with the 
specified prefix";
+    private static final String UNKNOWN_FORMAT_ERROR = "Could not determine 
the file's format";
+    static final String UNKNOWN_PARTITION_ERROR = "Could not determine the 
file's partition: ";
+
+    @ProcessElement
+    public void process(@Element String filePath, MultiOutputReceiver output)
+        throws IOException, InterruptedException {
+      FileFormat format;
+      try {
+        format = inferFormat(filePath);
+      } catch (UnknownFormatException e) {
+        output
+            .get(ERRORS)
+            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build());
+        numErrorFiles.inc();
+        return;
+      }
+
+      if (table == null) {
+        table = getOrCreateTable(getSchema(filePath, format));
+      }
+
+      // Check if the file path contains the provided prefix
+      if (table.spec().isPartitioned()
+          && !Strings.isNullOrEmpty(prefix)
+          && !filePath.startsWith(checkStateNotNull(prefix))) {
+        output
+            .get(ERRORS)
+            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
PREFIX_ERROR).build());
+        numErrorFiles.inc();
+        return;
+      }
+
+      InputFile inputFile = table.io().newInputFile(filePath);
+
+      Metrics metrics =
+          getFileMetrics(
+              inputFile, format, MetricsConfig.forTable(table), 
MappingUtil.create(table.schema()));
+
+      // Figure out which partition this DataFile should go to
+      String partitionPath;
+      if (table.spec().isUnpartitioned()) {
+        partitionPath = "";
+      } else if (!Strings.isNullOrEmpty(prefix)) {
+        // option 1: use directory structure to determine partition
+        // Note: we don't validate the DataFile content here
+        partitionPath = getPartitionFromFilePath(filePath);
+      } else {
+        try {
+          // option 2: examine DataFile min/max statistics to determine 
partition
+          partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
+        } catch (UnknownPartitionException e) {
+          output
+              .get(ERRORS)
+              .output(
+                  Row.withSchema(ERROR_SCHEMA)
+                      .addValues(filePath, UNKNOWN_PARTITION_ERROR + 
e.getMessage())
+                      .build());
+          numErrorFiles.inc();
+          return;
+        }
+      }
+
+      DataFile df =
+          DataFiles.builder(table.spec())
+              .withPath(filePath)
+              .withFormat(format)
+              .withMetrics(metrics)
+              .withFileSizeInBytes(inputFile.getLength())
+              .withPartitionPath(partitionPath)
+              .build();
+
+      output.get(DATA_FILES).output(SerializableDataFile.from(df, 
partitionPath));
+    }
+
+    static <W, T> T transformValue(Transform<W, T> transform, Type type, 
ByteBuffer bytes) {
+      return transform.bind(type).apply(Conversions.fromByteBuffer(type, 
bytes));
+    }
+
+    private static <W, T> T transformValue(Transform<W, T> transform, Type 
type, Object value) {
+      return transform.bind(type).apply((W) value);
+    }
+
+    private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
+      PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, 
schema);
+      try {
+        return tableProps == null
+            ? 
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, 
spec)
+            : catalogConfig
+                .catalog()
+                .createTable(TableIdentifier.parse(identifier), schema, spec, 
tableProps);
+      } catch (AlreadyExistsException e) { // if table already exists, just 
load it
+        return 
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+      }
+    }
+
+    /**
+     * We don't have a table yet, so we don't know which FileIO to use to read 
these files. Instead,
+     * we use Beam's FileSystem utilities to read the file and extract its 
schema to create the
+     * table
+     */
+    private static org.apache.iceberg.Schema getSchema(String filePath, 
FileFormat format)
+        throws IOException {
+      Preconditions.checkArgument(
+          format.equals(FileFormat.PARQUET), "Table creation is only supported 
for Parquet files.");
+      try (ParquetFileReader reader = 
ParquetFileReader.open(getParquetInputFile(filePath))) {
+        MessageType messageType = 
reader.getFooter().getFileMetaData().getSchema();
+        return ParquetSchemaUtil.convert(messageType);
+      }
+    }
+
+    private String getPartitionFromFilePath(String filePath) {
+      if (checkStateNotNull(table).spec().isUnpartitioned()) {
+        return "";
+      }
+      String partitionPath = 
filePath.substring(checkStateNotNull(prefix).length());
+      int lastSlashIndex = partitionPath.lastIndexOf('/');
+
+      return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : 
"";
+    }
+
+    /**
+     * Examines the min/max values of each partition column to determine the 
destination partition.
+     *
+     * <p>If the transformed min/max values are not equal for any given 
column, we won't be able to
+     * determine the partition. We also cannot fall back to a "null" 
partition, because that will
+     * also get skipped by most queries.
+     *
+     * <p>The Bucket partition transform is an exceptional case because it is 
not monotonic, meaning
+     * it's not enough to just compare the min and max values. There may be a 
middle value somewhere
+     * that gets hashed to a different value. For this transform, we'll need 
to read all the values
+     * in the column ensure they all get transformed to the same partition 
value.
+     *
+     * <p>In these cases, we output the DataFile to the DLQ, because assigning 
an incorrect
+     * partition may lead to it being incorrectly ignored by downstream 
queries.
+     */
+    static String getPartitionFromMetrics(Metrics metrics, InputFile 
inputFile, Table table)
+        throws UnknownPartitionException, IOException, InterruptedException {
+      List<PartitionField> fields = table.spec().fields();
+      List<Integer> sourceIds =
+          
fields.stream().map(PartitionField::sourceId).collect(Collectors.toList());
+      Metrics partitionMetrics;
+      // Check if metrics already includes partition columns (configured by 
table properties):
+      if (metrics.lowerBounds().keySet().containsAll(sourceIds)
+          && metrics.upperBounds().keySet().containsAll(sourceIds)) {
+        partitionMetrics = metrics;
+      } else {
+        // Otherwise, recollect metrics and ensure it includes all partition 
fields.
+        // Note: we don't attach these additional metrics to the DataFile 
because we can't assume
+        // that's in the user's best interest.
+        // Some tables are very wide and users may not want to store excessive 
metadata.
+        List<String> sourceNames =
+            fields.stream()
+                .map(pf -> table.schema().findColumnName(pf.sourceId()))
+                .collect(Collectors.toList());
+        Map<String, String> configProps =
+            sourceNames.stream()
+                .collect(Collectors.toMap(s -> 
"write.metadata.metrics.column." + s, s -> "full"));
+        MetricsConfig configWithPartitionFields = 
MetricsConfig.fromProperties(configProps);
+        partitionMetrics =
+            getFileMetrics(
+                inputFile,
+                inferFormat(inputFile.location()),
+                configWithPartitionFields,
+                MappingUtil.create(table.schema()));
+      }
+
+      PartitionKey pk = new PartitionKey(table.spec(), table.schema());
+
+      HashMap<Integer, PartitionField> bucketPartitions = new HashMap<>();
+      for (int i = 0; i < fields.size(); i++) {
+        PartitionField field = fields.get(i);
+        Transform<?, ?> transform = field.transform();
+        if (transform.toString().contains("bucket[")) {
+          bucketPartitions.put(i, field);
+        }
+      }
+
+      // first, read only metadata for the non-bucket partition types
+      for (int i = 0; i < fields.size(); i++) {
+        PartitionField field = fields.get(i);
+        // skip bucket partitions (we will process them below)
+        if (bucketPartitions.containsKey(i)) {
+          continue;
+        }
+        Type type = table.schema().findType(field.sourceId());
+        Transform<?, ?> transform = field.transform();
+
+        // Make a best effort estimate by comparing the lower and upper 
transformed values.
+        // If the transformed values are equal, assume that the DataFile's 
data safely
+        // aligns with the same partition.
+        ByteBuffer lowerBytes = 
partitionMetrics.lowerBounds().get(field.sourceId());
+        ByteBuffer upperBytes = 
partitionMetrics.upperBounds().get(field.sourceId());
+        if (lowerBytes == null && upperBytes == null) {
+          continue;
+        } else if (lowerBytes == null || upperBytes == null) {
+          throw new UnknownPartitionException(
+              "Only one of the min/max was was null, for field "
+                  + table.schema().findColumnName(field.sourceId()));
+        }
+        Object lowerTransformedValue = transformValue(transform, type, 
lowerBytes);
+        Object upperTransformedValue = transformValue(transform, type, 
upperBytes);
+
+        if (!Objects.deepEquals(lowerTransformedValue, upperTransformedValue)) 
{
+          // The DataFile contains values that align to different partitions, 
so we cannot
+          // safely determine a partition.
+          throw new UnknownPartitionException(
+              "Min and max transformed values were not equal, for column: " + 
field.name());
+        }
+
+        pk.set(i, lowerTransformedValue);
+      }
+
+      // bucket transform needs extra processing (see java doc above)
+      if (!bucketPartitions.isEmpty()) {
+        // Optimize by only reading bucket-transformed columns into memory
+        org.apache.iceberg.Schema bucketCols =
+            TypeUtil.select(
+                table.schema(),
+                bucketPartitions.values().stream()
+                    .map(PartitionField::sourceId)
+                    .collect(Collectors.toSet()));
+
+        // Keep one instance of transformed value per column. Use this to 
compare against each
+        // record's transformed value.
+        // Values in the same columns must yield the same transformed value, 
otherwise we cannot
+        // determine a partition
+        // from this file.
+        Map<Integer, Object> transformedValues = new HashMap<>();
+
+        // Do a one-time read of the file and compare all bucket-transformed 
columns
+        ACTIVE_READERS.acquire();
+        try (CloseableIterable<Record> reader = 
ReadUtils.createReader(inputFile, bucketCols)) {
+          for (Record record : reader) {
+            for (Map.Entry<Integer, PartitionField> entry : 
bucketPartitions.entrySet()) {
+              int partitionIndex = entry.getKey();
+              PartitionField partitionField = entry.getValue();
+              Transform<?, ?> transform = partitionField.transform();
+              Types.NestedField field = 
table.schema().findField(partitionField.sourceId());
+              Object value = record.getField(field.name());
+
+              // set initial transformed value for this column
+              @Nullable Object transformedValue = 
transformedValues.get(partitionIndex);
+              Object currentTransformedValue = transformValue(transform, 
field.type(), value);
+              if (transformedValue == null) {
+                transformedValues.put(partitionIndex, 
checkStateNotNull(currentTransformedValue));
+                continue;
+              }
+
+              if (!Objects.deepEquals(currentTransformedValue, 
transformedValue)) {
+                throw new UnknownPartitionException(
+                    "Found records with conflicting transformed values, for 
column: "
+                        + field.name());
+              }
+            }
+          }
+        } finally {
+          ACTIVE_READERS.release();
+        }
+
+        for (Map.Entry<Integer, Object> partitionCol : 
transformedValues.entrySet()) {
+          pk.set(partitionCol.getKey(), partitionCol.getValue());
+        }
+      }
+      return pk.toPath();
+    }
+  }
+
+  /**
+   * A stateful {@link DoFn} that commits batches of files to an Iceberg table.
+   *
+   * <p>Addresses two primary concerns:
+   *
+   * <ul>
+   *   <li><b>Concurrency:</b> Being stateful on a dummy {@code Void} key 
forces the runner to
+   *       process batches sequentially, preventing concurrent commit 
conflicts on the Iceberg
+   *       table.
+   *   <li><b>Idempotency:</b> Prevents duplicate commits during bundle 
failures by calculating a
+   *       deterministic hash for the file set. This ID is stored in the 
Iceberg {@code Snapshot}
+   *       summary, under the key {@code "beam.add-files-commit-id"}. Before 
committing, the DoFn
+   *       travereses backwards through recent snapshots to check if the 
current batch's ID is
+   *       already present.
+   * </ul>
+   *
+   * <p>Outputs the resulting Iceberg {@link Snapshot} information.
+   */
+  static class CommitFilesDoFn extends DoFn<KV<Void, 
Iterable<SerializableDataFile>>, Row> {
+    private final IcebergCatalogConfig catalogConfig;
+    private final String identifier;
+    private transient @MonotonicNonNull Table table = null;
+    private static final String COMMIT_ID_KEY = "beam.add-files-commit-id";
+
+    @StateId("lastCommitTimestamp")
+    private final StateSpec<ValueState<Long>> lastCommitTimestamp =
+        StateSpecs.value(VarLongCoder.of());
+
+    public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String 
identifier) {
+      this.catalogConfig = catalogConfig;
+      this.identifier = identifier;
+    }
+
+    @StartBundle
+    public void start() {
+      if (table == null) {
+        table = 
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Void, Iterable<SerializableDataFile>> files,
+        @AlwaysFetched @StateId("lastCommitTimestamp") ValueState<Long> 
lastCommitTimestamp,
+        OutputReceiver<Row> output) {
+      String commitId = commitHash(files.getValue());
+      Table table = checkStateNotNull(this.table);
+      table.refresh();
+
+      if (shouldSkip(commitId, lastCommitTimestamp.read())) {
+        return;
+      }
+
+      int numFiles = 0;
+      AppendFiles appendFiles = table.newFastAppend();
+      for (SerializableDataFile file : files.getValue()) {
+        DataFile df = file.createDataFile(table.specs());
+        appendFiles.appendFile(df);
+        numFiles++;
+      }
+      appendFiles.set(COMMIT_ID_KEY, commitId);
+      LOG.info("Committing {} files, with commit ID: {}", numFiles, commitId);
+      appendFiles.commit();
+
+      Snapshot snapshot = table.currentSnapshot();
+      output.output(SnapshotInfo.fromSnapshot(snapshot).toRow());
+      lastCommitTimestamp.write(snapshot.timestampMillis());
+      numFilesAdded.inc(numFiles);
+    }
+
+    private String commitHash(Iterable<SerializableDataFile> files) {
+      Hasher hasher = Hashing.sha256().newHasher();
+
+      // Extract, sort, and hash to ensure deterministic output
+      List<String> paths = new ArrayList<>();
+      for (SerializableDataFile file : files) {
+        paths.add(file.getPath());
+      }
+      Collections.sort(paths);
+
+      for (String path : paths) {
+        hasher.putString(path, StandardCharsets.UTF_8);
+      }
+      return hasher.hash().toString();
+    }
+
+    /**
+     * Performs a look-back through Iceberg table history to determine if this 
specific batch of
+     * files has already been successfully committed.
+     */
+    private boolean shouldSkip(String commitUID, @Nullable Long 
lastCommitTimestamp) {
+      if (lastCommitTimestamp == null) {
+        return false;
+      }
+      Table table = checkStateNotNull(this.table);
+
+      // check past snapshots to see if they contain the commit ID
+      @Nullable Snapshot current = table.currentSnapshot();
+      while (current != null && current.timestampMillis() > 
lastCommitTimestamp) {
+        Map<String, String> summary = current.summary();
+        if (summary != null && commitUID.equals(summary.get(COMMIT_ID_KEY))) {
+          return true; // commit already happened, we should skip
+        }
+        if (current.parentId() == null) {
+          break;
+        }
+        current = table.snapshot(current.parentId());
+      }
+
+      return false;
+    }
+  }
+
+  @SuppressWarnings("argument")
+  public static Metrics getFileMetrics(
+      InputFile file, FileFormat format, MetricsConfig config, NameMapping 
mapping)
+      throws IOException {
+    switch (format) {
+      case PARQUET:
+        try (ParquetFileReader reader =
+            ParquetFileReader.open(getParquetInputFile(file.location()))) {
+          ParquetMetadata footer = reader.getFooter();
+          MessageType originalMessageType = 
footer.getFileMetaData().getSchema();
+          if (!ParquetSchemaUtil.hasIds(originalMessageType)) {
+            footer = getFooterWithTypeIds(originalMessageType, footer, 
mapping);
+          }
+
+          return ParquetUtil.footerMetrics(footer, Stream.empty(), config, 
mapping);
+        }
+      case ORC:
+        return OrcMetrics.fromInputFile(file, config, mapping);
+      case AVRO:
+        return new Metrics(Avro.rowCount(file), null, null, null, null);
+      default:
+        throw new UnsupportedOperationException("Unsupported format: " + 
format);
+    }
+  }
+
+  /** Tries to infer other file formats. Defaults to Parquet. */
+  public static FileFormat inferFormat(String path) {
+    String lowerPath = path.toLowerCase();
+
+    if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) {
+      return FileFormat.PARQUET;
+    } else if (lowerPath.endsWith(".orc")) {
+      return FileFormat.ORC;
+    } else if (lowerPath.endsWith(".avro")) {
+      return FileFormat.AVRO;
+    } else {
+      throw new UnknownFormatException();
+    }
+  }
+
+  static ParquetMetadata getFooterWithTypeIds(
+      MessageType originalMessageType, ParquetMetadata footer, NameMapping 
mapping) {
+    originalMessageType = 
ParquetSchemaUtil.applyNameMapping(originalMessageType, mapping);
+    FileMetaData oldFileMeta = footer.getFileMetaData();
+    FileMetaData newFileMeta =
+        new FileMetaData(
+            originalMessageType, oldFileMeta.getKeyValueMetaData(), 
oldFileMeta.getCreatedBy());
+    return new ParquetMetadata(newFileMeta, footer.getBlocks());
+  }
+
+  static org.apache.parquet.io.InputFile getParquetInputFile(String filePath) 
throws IOException {
+    ResourceId resourceId =
+        
Iterables.getOnlyElement(FileSystems.match(filePath).metadata()).resourceId();
+    Compression compression = 
Compression.detect(checkStateNotNull(resourceId.getFilename()));
+    SeekableByteChannel channel =
+        (SeekableByteChannel) 
compression.readDecompressed(FileSystems.open(resourceId));
+    return new BeamParquetInputFile(channel);
+  }
+
+  static class UnknownFormatException extends IllegalArgumentException {}
+
+  static class UnknownPartitionException extends IllegalStateException {
+    UnknownPartitionException(String msg) {
+      super(msg);
+    }
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
new file mode 100644
index 00000000000..a04853c8ad9
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.AddFiles.ERROR_TAG;
+import static org.apache.beam.sdk.io.iceberg.AddFiles.OUTPUT_TAG;
+import static 
org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class AddFilesSchemaTransformProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  @Override
+  public AddFilesSchemaTransform from(Configuration configuration) {
+    return new AddFilesSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:iceberg_add_files:v1";
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Configuration {
+    public static Builder builder() {
+      return new 
AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("A fully-qualified table identifier.")
+    public abstract String getTable();
+
+    @SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
+    public abstract @Nullable Map<String, String> getCatalogProperties();
+
+    @SchemaFieldDescription("Properties passed to the Hadoop ")
+    public abstract @Nullable Map<String, String> getConfigProperties();
+
+    @SchemaFieldDescription(
+        "For a streaming pipeline, sets the frequency at which incoming files 
are appended. Defaults to 600 (10 minutes). "
+            + "A commit is triggered when either this or append batch size is 
reached.")
+    public abstract @Nullable Integer getTriggeringFrequencySeconds();
+
+    @SchemaFieldDescription(
+        "For a streaming pipeline, sets the desired number of appended files 
per commit. Defaults to 100,000 files. "
+            + "A commit is triggered when either this or append triggering 
interval is reached.")
+    public abstract @Nullable Integer getAppendBatchSize();
+
+    @SchemaFieldDescription(
+        "The prefix shared among all partitions. For example, a data file may 
have the following"
+            + " location:%n"
+            + 
"'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n"
+            + "The provided prefix should go up until the partition 
information:%n"
+            + "'gs://bucket/namespace/table/data/'.%n"
+            + "If not provided, will try determining each DataFile's partition 
from its metrics metadata.")
+    public abstract @Nullable String getLocationPrefix();
+
+    @SchemaFieldDescription(
+        "Fields used to create a partition spec that is applied when tables 
are created. For a field 'foo', "
+            + "the available partition transforms are:\n\n"
+            + "- `foo`\n"
+            + "- `truncate(foo, N)`\n"
+            + "- `bucket(foo, N)`\n"
+            + "- `hour(foo)`\n"
+            + "- `day(foo)`\n"
+            + "- `month(foo)`\n"
+            + "- `year(foo)`\n"
+            + "- `void(foo)`\n\n"
+            + "For more information on partition transforms, please visit 
https://iceberg.apache.org/spec/#partition-transforms.";)
+    public abstract @Nullable List<String> getPartitionFields();
+
+    @SchemaFieldDescription(
+        "Iceberg table properties to be set on the table when it is created.\n"
+            + "For more information on table properties,"
+            + " please visit 
https://iceberg.apache.org/docs/latest/configuration/#table-properties.";)
+    public abstract @Nullable Map<String, String> getTableProperties();
+
+    @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
+    public abstract @Nullable ErrorHandling getErrorHandling();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setTable(String table);
+
+      public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
+
+      public abstract Builder setConfigProperties(Map<String, String> 
confProperties);
+
+      public abstract Builder setTriggeringFrequencySeconds(Integer 
triggeringFrequencySeconds);
+
+      public abstract Builder setAppendBatchSize(Integer size);
+
+      public abstract Builder setLocationPrefix(String prefix);
+
+      public abstract Builder setPartitionFields(List<String> fields);
+
+      public abstract Builder setTableProperties(Map<String, String> props);
+
+      public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
+      public abstract Configuration build();
+    }
+
+    public IcebergCatalogConfig getIcebergCatalog() {
+      return IcebergCatalogConfig.builder()
+          .setCatalogProperties(getCatalogProperties())
+          .setConfigProperties(getConfigProperties())
+          .build();
+    }
+  }
+
+  public static class AddFilesSchemaTransform extends SchemaTransform {
+    private final Configuration configuration;
+
+    public AddFilesSchemaTransform(Configuration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      Schema inputSchema = input.getSinglePCollection().getSchema();
+      Preconditions.checkState(
+          inputSchema.getFieldCount() == 1
+              && 
inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING),
+          "Incoming Row Schema must contain only one field of type String. 
Instead, got schema: %s",
+          inputSchema);
+
+      @Nullable Integer frequency = 
configuration.getTriggeringFrequencySeconds();
+
+      PCollectionRowTuple result =
+          input
+              .getSinglePCollection()
+              .apply("Filter empty paths", Filter.by(row -> row.getString(0) 
!= null))
+              .apply(
+                  "ExtractPaths",
+                  MapElements.into(TypeDescriptors.strings())
+                      .via(row -> checkStateNotNull(row.getString(0))))
+              .apply(
+                  new AddFiles(
+                      configuration.getIcebergCatalog(),
+                      configuration.getTable(),
+                      configuration.getLocationPrefix(),
+                      configuration.getPartitionFields(),
+                      configuration.getTableProperties(),
+                      configuration.getAppendBatchSize(),
+                      frequency != null ? Duration.standardSeconds(frequency) 
: null));
+
+      PCollectionRowTuple output = PCollectionRowTuple.of("snapshots", 
result.get(OUTPUT_TAG));
+      ErrorHandling errorHandling = configuration.getErrorHandling();
+      if (errorHandling != null) {
+        output = output.and(errorHandling.getOutput(), result.get(ERROR_TAG));
+      }
+      return output;
+    }
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
index 2b3117f8bf8..805cc067294 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
@@ -68,10 +68,13 @@ class PartitionUtils {
 
   static PartitionSpec toPartitionSpec(
       @Nullable List<String> fields, org.apache.beam.sdk.schemas.Schema 
beamSchema) {
+    return toPartitionSpec(fields, 
IcebergUtils.beamSchemaToIcebergSchema(beamSchema));
+  }
+
+  static PartitionSpec toPartitionSpec(@Nullable List<String> fields, Schema 
schema) {
     if (fields == null) {
       return PartitionSpec.unpartitioned();
     }
-    Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema);
     PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
 
     for (String field : fields) {
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
index a4d95ca249b..e7f50882f43 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
@@ -46,9 +46,11 @@ import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedInputFile;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.parquet.ParquetReader;
@@ -112,6 +114,34 @@ public class ReadUtils {
         true);
   }
 
+  static ParquetReader<Record> createReader(InputFile inputFile, Schema 
schema) {
+    ParquetReadOptions.Builder optionsBuilder;
+    if (inputFile instanceof HadoopInputFile) {
+      // remove read properties already set that may conflict with this read
+      Configuration conf = new Configuration(((HadoopInputFile) 
inputFile).getConf());
+      for (String property : READ_PROPERTIES_TO_REMOVE) {
+        conf.unset(property);
+      }
+      optionsBuilder = HadoopReadOptions.builder(conf);
+    } else {
+      optionsBuilder = ParquetReadOptions.builder();
+    }
+    optionsBuilder =
+        optionsBuilder
+            .withRange(0, inputFile.getLength())
+            .withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE);
+
+    return new ParquetReader<>(
+        inputFile,
+        schema,
+        optionsBuilder.build(),
+        fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema),
+        MappingUtil.create(schema),
+        Expressions.alwaysTrue(),
+        false,
+        true);
+  }
+
   static Map<Integer, ?> constantsMap(
       FileScanTask task,
       BiFunction<Type, Object, Object> converter,
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
new file mode 100644
index 00000000000..a67707e3dbb
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static java.lang.String.format;
+import static org.apache.beam.sdk.io.FileIO.Write.defaultNaming;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Notification;
+import com.google.cloud.storage.NotificationInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Deduplicate;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.hadoop.util.Lists;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.util.Pair;
+import org.awaitility.Awaitility;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for {@link AddFiles}, using Pubsub notifications to pass 
newly created file
+ * paths downstream to add to an Iceberg table.
+ */
+public class AddFilesIT {
+  private static final Logger LOG = LoggerFactory.getLogger(AddFilesIT.class);
+
+  private static final String WAREHOUSE = "gs://managed-iceberg-biglake-its";
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  @Rule public TestName testName = new TestName();
+  private final String notificationsTopic =
+      format(
+          "projects/%s/topics/%s-%s-%s",
+          PROJECT,
+          AddFilesIT.class.getSimpleName(),
+          testName.getMethodName(),
+          System.currentTimeMillis());
+  private static final Schema NOTIFICATION_SCHEMA =
+      Schema.builder()
+          .addStringField("bucket")
+          .addStringField("name")
+          .addStringField("kind")
+          .build();
+  private static final Map<String, String> BIGLAKE_PROPS =
+      Map.of(
+          "type", "rest",
+          "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog";,
+          "warehouse", WAREHOUSE,
+          "header.x-goog-user-project", PROJECT,
+          "rest.auth.type", "google",
+          "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
+          "rest-metrics-reporting-enabled", "false");
+  private Storage storage;
+  private PubsubClient pubsub;
+  private Notification notification;
+  private final String namespace = getClass().getSimpleName();
+  private String srcTableName;
+  private String destTableName;
+  private TableIdentifier srcTableId;
+  private TableIdentifier destTableId;
+  private long salt;
+  private String dirName;
+  private static final Schema ROW_SCHEMA =
+      
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();
+  private static final List<String> PARTITION_FIELDS =
+      Arrays.asList("bucket(id, 16)", "truncate(name, 8)", "age");
+  private static final PartitionSpec SPEC =
+      PartitionUtils.toPartitionSpec(PARTITION_FIELDS, ROW_SCHEMA);
+  private static final Map<String, String> TABLE_PROPS = 
ImmutableMap.of("foo", "bar");
+  private static final List<Row> TEST_ROWS =
+      IntStream.range(0, 20)
+          .mapToObj(
+              i -> Row.withSchema(ROW_SCHEMA).addValues((long) i, "name_" + i, 
i + 30).build())
+          .collect(Collectors.toList());
+  private final RESTCatalog catalog = new RESTCatalog();
+
+  @Before
+  public void setup() throws IOException {
+    storage = StorageOptions.newBuilder().build().getService();
+    pubsub =
+        PubsubGrpcClient.FACTORY.newClient(
+            null, null, 
TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class));
+
+    pubsub.createTopic(PubsubClient.topicPathFromPath(notificationsTopic));
+
+    NotificationInfo notificationInfo =
+        NotificationInfo.newBuilder(notificationsTopic)
+            .setEventTypes(NotificationInfo.EventType.OBJECT_FINALIZE)
+            .setPayloadFormat(NotificationInfo.PayloadFormat.JSON_API_V1)
+            .build();
+    try {
+      notification = storage.createNotification(WAREHOUSE.replace("gs://", 
""), notificationInfo);
+    } catch (StorageException e) {
+      if (e.getMessage().contains("Too many overlapping notifications")) {
+        List<Notification> existing = 
storage.listNotifications(WAREHOUSE.replace("gs://", ""));
+        LOG.warn(
+            "Too many notifications on bucket {}: {}. Deleting existing 
notifications to make room: {}",
+            WAREHOUSE,
+            e,
+            existing.stream()
+                .map(NotificationInfo::getNotificationId)
+                .collect(Collectors.toList()));
+        existing.forEach(
+            n -> storage.deleteNotification(WAREHOUSE.replace("gs://", ""), 
n.getNotificationId()));
+
+        // try creating it again
+        notification = storage.createNotification(WAREHOUSE.replace("gs://", 
""), notificationInfo);
+      } else {
+        throw e;
+      }
+    }
+
+    salt = System.currentTimeMillis();
+    dirName = format("%s-%s/%s", getClass().getSimpleName(), salt, 
testName.getMethodName());
+    srcTableName = "src_" + testName.getMethodName() + "_" + salt;
+    destTableName = "dest_" + testName.getMethodName() + "_" + salt;
+    srcTableId = TableIdentifier.of(namespace, srcTableName);
+    destTableId = TableIdentifier.of(namespace, destTableName);
+
+    catalog.initialize("test_catalog", BIGLAKE_PROPS);
+    cleanupCatalog();
+    catalog.createNamespace(Namespace.of(namespace));
+  }
+
+  private void cleanupCatalog() {
+    Namespace ns = Namespace.of(namespace);
+    if (catalog.namespaceExists(ns)) {
+      catalog.listTables(ns).forEach(catalog::dropTable);
+      catalog.dropNamespace(ns);
+    }
+  }
+
+  @After
+  public void cleanup() {
+    try {
+      pubsub.deleteTopic(PubsubClient.topicPathFromPath(notificationsTopic));
+      pubsub.close();
+    } catch (Exception e) {
+      LOG.warn("Failed to clean up PubSub", e);
+    }
+
+    try {
+      storage.deleteNotification(WAREHOUSE.replace("gs://", ""), 
notification.getNotificationId());
+      storage.close();
+    } catch (Exception e) {
+      LOG.warn("Failed to clean up GCS notifications", e);
+    }
+
+    try {
+      cleanupCatalog();
+    } catch (Exception e) {
+      LOG.warn("Failed to clean up Iceberg catalog", e);
+    }
+
+    try {
+      Iterable<Blob> blobs =
+          storage
+              .list(WAREHOUSE.replace("gs://", ""), 
Storage.BlobListOption.prefix(dirName))
+              .getValues();
+      blobs.forEach(b -> storage.delete(b.getBlobId()));
+    } catch (Exception e) {
+      LOG.warn("Failed to clean up GCS bucket", e);
+    }
+  }
+
+  @Test
+  public void testStreamingImportFromExistingIcebergTable()
+      throws IOException, InterruptedException {
+    // first create a source iceberg table
+    catalog.createTable(srcTableId, beamSchemaToIcebergSchema(ROW_SCHEMA), 
SPEC);
+
+    String filter = format("%s/%s/data/", namespace, srcTableName);
+
+    // build AddFiles pipeline and let it run in the background
+    PipelineResult addFilesPipeline = startAddFilesListener(filter);
+
+    // before writing, confirm the destination table still does not exist
+    assertFalse(catalog.tableExists(destTableId));
+
+    // write some rows to the source table
+    LOG.info("Writing records to the source table");
+    Pipeline q = Pipeline.create();
+    q.apply(Create.of(TEST_ROWS))
+        .setRowSchema(ROW_SCHEMA)
+        .apply(
+            Managed.write(Managed.ICEBERG)
+                .withConfig(
+                    ImmutableMap.of(
+                        "table", srcTableId.toString(), "catalog_properties", 
BIGLAKE_PROPS)));
+    q.run().waitUntilFinish();
+
+    // check that the destination table has been created
+    Awaitility.await()
+        .atMost(java.time.Duration.ofMinutes(5))
+        .pollInterval(java.time.Duration.ofSeconds(10))
+        .until(() -> catalog.tableExists(destTableId));
+    LOG.info("Destination table has been created");
+
+    LOG.info("Checking if all source files have been registered in the 
destination table");
+    Awaitility.await()
+        .atMost(java.time.Duration.ofMinutes(2))
+        .pollInterval(java.time.Duration.ofSeconds(5))
+        .until(() -> checkTableFiles() != null);
+    LOG.info("Destination table has registered all source files.");
+    Pair<Map<String, DataFile>, Map<String, DataFile>> srcAndDestfiles =
+        checkStateNotNull(checkTableFiles());
+
+    for (Map.Entry<String, DataFile> srcFile : 
srcAndDestfiles.first().entrySet()) {
+      String location = srcFile.getKey();
+      DataFile destFile =
+          checkStateNotNull(
+              srcAndDestfiles.second().get(location),
+              "Source file '%s' was not registered in the destination table",
+              location);
+
+      // check that partition metadata was preserved
+      assertEquals(destFile.partition(), srcFile.getValue().partition());
+    }
+
+    // safe to cancel the AddFiles pipeline now
+    LOG.info("Canceling AddFiles listener.");
+    addFilesPipeline.cancel();
+
+    // check all records are there
+    checkRecordsInDestinationTable();
+  }
+
+  /**
+   * Fetch all added files in both tables. Return null if some files have not 
yet propagated to
+   * destination table
+   */
+  private @Nullable Pair<Map<String, DataFile>, Map<String, DataFile>> 
checkTableFiles() {
+    Table destTable = catalog.loadTable(destTableId);
+    Table srcTable = catalog.loadTable(srcTableId);
+
+    Map<String, DataFile> srcFiles = new HashMap<>();
+    Map<String, DataFile> destFiles = new HashMap<>();
+    for (Snapshot snapshot : srcTable.snapshots()) {
+      snapshot.addedDataFiles(srcTable.io()).forEach(df -> 
srcFiles.put(df.location(), df));
+    }
+    for (Snapshot snapshot : destTable.snapshots()) {
+      snapshot.addedDataFiles(destTable.io()).forEach(df -> 
destFiles.put(df.location(), df));
+    }
+
+    LOG.info(
+        "Number of source files: {}, Number of registered destination files: 
{}",
+        srcFiles.size(),
+        destFiles.size());
+
+    if (srcFiles.size() != destFiles.size()) {
+      Set<String> onlyInSrc = new HashSet<>(srcFiles.keySet());
+      onlyInSrc.removeAll(destFiles.keySet());
+      LOG.info("Missing source files: {}", onlyInSrc);
+      return null;
+    }
+
+    return Pair.of(srcFiles, destFiles);
+  }
+
+  @Test
+  public void testStreamingParquetImport()
+      throws InterruptedException, TimeoutException, IOException {
+    // start with a table that does not exist
+
+    String parquetDir = format("%s/%s/", WAREHOUSE, dirName);
+    String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName);
+
+    // let the add files pipeline run in the background
+    PipelineResult addFilesPipeline = startAddFilesListener(dirName);
+
+    // before writing, confirm the destination table still does not exist
+    assertFalse(catalog.tableExists(destTableId));
+
+    // write some parquet files
+    LOG.info("Writing records to the parquet dir");
+    Pipeline q = Pipeline.create();
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA);
+    q.apply(Create.of(TEST_ROWS))
+        .setRowSchema(ROW_SCHEMA)
+        .apply(
+            MapElements.into(TypeDescriptor.of(GenericRecord.class))
+                .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+        .setCoder(AvroCoder.of(avroSchema))
+        .apply(
+            FileIO.<String, GenericRecord>writeDynamic()
+                .by(
+                    record ->
+                        format("%s-%s-%s", record.get("id"), 
record.get("name"), record.get("age")))
+                .via(ParquetIO.sink(avroSchema))
+                .withNaming(name -> defaultNaming(name, ".parquet"))
+                .withTempDirectory(tempDir)
+                .to(parquetDir)
+                .withDestinationCoder(StringUtf8Coder.of()));
+    q.run().waitUntilFinish();
+
+    GcsUtil gcsUtil = 
TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil();
+
+    Iterable<StorageObject> objects =
+        gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName, 
null).getItems();
+    List<String> writtenFilePaths =
+        Lists.newArrayList(objects).stream()
+            .map(o -> format("gs://%s/%s", o.getBucket(), o.getName()))
+            .collect(Collectors.toList());
+    LOG.info("Written file paths: {}", writtenFilePaths);
+
+    // check that the destination table has been created
+    Awaitility.await()
+        .atMost(java.time.Duration.ofMinutes(1))
+        .pollInterval(java.time.Duration.ofSeconds(5))
+        .until(() -> catalog.tableExists(destTableId));
+    LOG.info("Destination table has been created");
+
+    LOG.info("Checking if all source files have been registered in the 
destination table");
+    Awaitility.await()
+        .atMost(java.time.Duration.ofMinutes(2))
+        .pollInterval(java.time.Duration.ofSeconds(5))
+        .until(() -> checkTableHasRegisteredParquetFiles(writtenFilePaths));
+    LOG.info(
+        "Destination table has registered all source files ({} files).", 
writtenFilePaths.size());
+
+    // safe to cancel the AddFiles pipeline now
+    LOG.info("Canceling AddFiles listener.");
+    addFilesPipeline.cancel();
+
+    // check all records are there
+    checkRecordsInDestinationTable();
+  }
+
+  @Test
+  public void testBatchParquetImport() throws IOException {
+    // start with a table that does not exist
+
+    String parquetDir = format("%s/%s/", WAREHOUSE, dirName);
+    String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName);
+
+    // write some parquet files
+    LOG.info("Writing records to the parquet dir");
+    Pipeline q = Pipeline.create();
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA);
+    q.apply(Create.of(TEST_ROWS))
+        .setRowSchema(ROW_SCHEMA)
+        .apply(
+            MapElements.into(TypeDescriptor.of(GenericRecord.class))
+                .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+        .setCoder(AvroCoder.of(avroSchema))
+        .apply(
+            FileIO.<String, GenericRecord>writeDynamic()
+                .by(
+                    record ->
+                        format("%s-%s-%s", record.get("id"), 
record.get("name"), record.get("age")))
+                .via(ParquetIO.sink(avroSchema))
+                .withNaming(name -> defaultNaming(name, ".parquet"))
+                .withTempDirectory(tempDir)
+                .to(parquetDir)
+                .withDestinationCoder(StringUtf8Coder.of()));
+    q.run().waitUntilFinish();
+
+    GcsUtil gcsUtil = 
TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil();
+
+    Iterable<StorageObject> objects =
+        gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName, 
null).getItems();
+    List<String> writtenFilePaths =
+        Lists.newArrayList(objects).stream()
+            .map(o -> format("gs://%s/%s", o.getBucket(), o.getName()))
+            .collect(Collectors.toList());
+    LOG.info("Written file paths: {}", writtenFilePaths);
+
+    // before adding, confirm the destination table still does not exist
+    assertFalse(catalog.tableExists(destTableId));
+
+    // run batch AddFiles
+    Pipeline p = Pipeline.create();
+    PCollectionRowTuple tuple =
+        p.apply(Create.of(writtenFilePaths))
+            .apply(
+                new AddFiles(
+                    
IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(),
+                    namespace + "." + destTableName,
+                    null,
+                    PARTITION_FIELDS,
+                    TABLE_PROPS,
+                    10,
+                    Duration.standardSeconds(10)));
+    PAssert.that(tuple.get("errors")).empty();
+    p.run().waitUntilFinish();
+
+    // check that the destination table has been created
+    assertTrue(catalog.tableExists(destTableId));
+    LOG.info("Destination table has been created");
+
+    LOG.info("Checking if all source files have been registered in the 
destination table");
+    assertTrue(checkTableHasRegisteredParquetFiles(writtenFilePaths));
+    LOG.info(
+        "Destination table has registered all source files ({} files).", 
writtenFilePaths.size());
+
+    // check all records are there
+    checkRecordsInDestinationTable();
+  }
+
+  private void checkRecordsInDestinationTable() {
+    Pipeline s = Pipeline.create();
+    PCollection<Row> destRows =
+        s.apply(
+                Managed.read(Managed.ICEBERG)
+                    .withConfig(
+                        ImmutableMap.of(
+                            "table", destTableId.toString(), 
"catalog_properties", BIGLAKE_PROPS)))
+            .getSinglePCollection();
+    PAssert.that(destRows).containsInAnyOrder(TEST_ROWS);
+    s.run().waitUntilFinish();
+  }
+
+  private boolean checkTableHasRegisteredParquetFiles(List<String> 
parquetFiles) {
+    Table destTable = catalog.loadTable(destTableId);
+
+    int numRegisteredFiles = 0;
+    for (Snapshot snapshot : destTable.snapshots()) {
+      numRegisteredFiles += 
Iterables.size(snapshot.addedDataFiles(destTable.io()));
+    }
+    LOG.info(
+        "Number of source files: {}, Number of registered destination files: 
{}",
+        parquetFiles.size(),
+        numRegisteredFiles);
+    return numRegisteredFiles == parquetFiles.size();
+  }
+
+  private PipelineResult startAddFilesListener(String filter) throws 
InterruptedException {
+    DirectOptions options = 
TestPipeline.testingPipelineOptions().as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    Pipeline p = Pipeline.create(options);
+
+    PCollectionRowTuple tuple =
+        p.apply(PubsubIO.readStrings().fromTopic(notificationsTopic))
+            .apply(JsonToRow.withSchema(NOTIFICATION_SCHEMA))
+            .apply(Filter.by(row -> row.getString("name").contains(filter)))
+            .apply(
+                MapElements.into(strings())
+                    .via(
+                        row ->
+                            format("gs://%s/%s", row.getString("bucket"), 
row.getString("name"))))
+            .apply(Deduplicate.values())
+            .apply(
+                new AddFiles(
+                    
IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(),
+                    namespace + "." + destTableName,
+                    null,
+                    PARTITION_FIELDS,
+                    TABLE_PROPS,
+                    10,
+                    Duration.standardSeconds(10)));
+    PAssert.that(tuple.get("errors")).empty();
+    PipelineResult result = p.run();
+
+    LOG.info(
+        "Started running the AddFiles listener pipeline. Waiting for 10s to 
allow it enough time to setup");
+    Thread.sleep(10_000);
+    return result;
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
new file mode 100644
index 00000000000..287b9140e00
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.PREFIX_ERROR;
+import static 
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.UNKNOWN_PARTITION_ERROR;
+import static 
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.getPartitionFromMetrics;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class AddFilesTest {
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  private String root;
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private HadoopCatalog catalog;
+  private TableIdentifier tableId;
+  private final org.apache.iceberg.Schema icebergSchema =
+      new org.apache.iceberg.Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "name", Types.StringType.get()),
+          Types.NestedField.required(3, "age", Types.IntegerType.get()));
+  private final List<String> partitionFields = Arrays.asList("age", 
"truncate(name, 3)");
+  private final PartitionSpec spec = 
PartitionUtils.toPartitionSpec(partitionFields, icebergSchema);
+  private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema);
+  private final Map<String, String> tableProps =
+      ImmutableMap.of("write.metadata.metrics.default", "full", "foo", "bar");
+  private IcebergCatalogConfig catalogConfig;
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @Rule public TestName testName = new TestName();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public transient TestDataWarehouse warehouse = new 
TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+  @Before
+  public void setup() throws Exception {
+    // Root for existing data files:
+    root = temp.getRoot().getAbsolutePath() + "/";
+
+    // Set up a local Hadoop Catalog
+    catalog = new HadoopCatalog(new Configuration(), warehouse.location);
+    tableId = TableIdentifier.of("default", testName.getMethodName());
+
+    catalogConfig =
+        IcebergCatalogConfig.builder()
+            .setCatalogProperties(
+                ImmutableMap.of("type", "hadoop", "warehouse", 
warehouse.location))
+            .build();
+  }
+
+  @Test
+  public void testAddPartitionedFiles() throws Exception {
+    testAddFilesWithPartitionPath(true);
+  }
+
+  @Test
+  public void testAddUnPartitionedFiles() throws Exception {
+    testAddFilesWithPartitionPath(false);
+  }
+
+  public void testAddFilesWithPartitionPath(boolean isPartitioned) throws 
Exception {
+    // 1. Generate two local Parquet file.
+    // Include Hive-like partition path if testing partition case
+    String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : "";
+    String file1 = root + partitionPath1 + "data1.parquet";
+    wrapper.wrap(record(-1, "Mar", 20));
+    DataWriter<Record> writer = createWriter(file1, isPartitioned ? 
wrapper.copy() : null);
+    writer.write(record(1, "Mark", 20));
+    writer.write(record(2, "Martin", 20));
+    writer.close();
+
+    String partitionPath2 = isPartitioned ? "age=25/name_trunc=Sam/" : "";
+    String file2 = root + partitionPath2 + "data2.parquet";
+    wrapper.wrap(record(-1, "Sam", 25));
+    DataWriter<Record> writer2 = createWriter(file2, isPartitioned ? 
wrapper.copy() : null);
+    writer2.write(record(3, "Samantha", 25));
+    writer2.write(record(4, "Sammy", 25));
+    writer2.close();
+
+    // 2. Setup the input PCollection
+    PCollection<String> inputFiles = pipeline.apply("Create Input", 
Create.of(file1, file2));
+
+    // 3. Apply the transform (Trigger aggressively for testing)
+    PCollectionRowTuple output =
+        inputFiles.apply(
+            new AddFiles(
+                catalogConfig,
+                tableId.toString(),
+                isPartitioned ? root : null,
+                isPartitioned ? partitionFields : null,
+                tableProps,
+                2, // trigger at 2 files
+                Duration.standardSeconds(10)));
+
+    // 4. Validate PCollection Outputs
+    PAssert.that(output.get("errors")).empty();
+
+    // 5. Run the pipeline
+    pipeline.run().waitUntilFinish();
+
+    // 6. Validate the Iceberg Table was created with the correct spec and 
properties
+    Table table = catalog.loadTable(tableId);
+    tableProps.forEach((key, value) -> assertThat(table.properties(), 
hasEntry(key, value)));
+    assertEquals(isPartitioned ? spec : PartitionSpec.unpartitioned(), 
table.spec());
+
+    // Check that we have exactly 1 snapshot with 2 files
+    assertEquals(1, Iterables.size(table.snapshots()));
+
+    List<DataFile> addedFiles =
+        Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()));
+    assertEquals(2, addedFiles.size());
+
+    // Verify file paths
+    assertTrue(addedFiles.stream().anyMatch(df -> 
df.location().contains("data1.parquet")));
+    assertTrue(addedFiles.stream().anyMatch(df -> 
df.location().contains("data2.parquet")));
+
+    // check metrics metadata is preserved
+    DataFile writtenDf1 = writer.toDataFile();
+    DataFile writtenDf2 = writer2.toDataFile();
+    DataFile addedDf1 =
+        Iterables.getOnlyElement(
+            addedFiles.stream()
+                .filter(df -> df.location().contains("data1.parquet"))
+                .collect(Collectors.toList()));
+    DataFile addedDf2 =
+        Iterables.getOnlyElement(
+            addedFiles.stream()
+                .filter(df -> df.location().contains("data2.parquet"))
+                .collect(Collectors.toList()));
+
+    assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds());
+    assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds());
+    assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds());
+    assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds());
+
+    // check partition metadata is preserved
+    assertEquals(writtenDf1.partition(), addedDf1.partition());
+    assertEquals(writtenDf2.partition(), addedDf2.partition());
+  }
+
+  @Test
+  public void testAddFilesWithPartitionFromMetrics() throws IOException {
+    // 1. Generate local Parquet files with no directory structure.
+    String file1 = root + "data1.parquet";
+    DataWriter<Record> writer = createWriter(file1);
+    writer.write(record(1, "Mark", 20));
+    writer.write(record(2, "Martin", 20));
+    writer.close();
+    PartitionData expectedPartition1 = new PartitionData(spec.partitionType());
+    expectedPartition1.set(0, 20);
+    expectedPartition1.set(1, "Mar");
+
+    String file2 = root + "data2.parquet";
+    DataWriter<Record> writer2 = createWriter(file2);
+    writer2.write(record(3, "Samantha", 25));
+    writer2.write(record(4, "Sammy", 25));
+    writer2.close();
+    PartitionData expectedPartition2 = new PartitionData(spec.partitionType());
+    expectedPartition2.set(0, 25);
+    expectedPartition2.set(1, "Sam");
+
+    // Also create a "bad" DataFile, containing values that correspond to 
different partitions
+    // This file should get output to the DLQ, because we cannot determine its 
partition
+    String file3 = root + "data3.parquet";
+    DataWriter<Record> writer3 = createWriter(file3);
+    writer3.write(record(5, "Johnny", 25));
+    writer3.write(record(6, "Yaseen", 32));
+    writer3.close();
+
+    // 2. Setup the input PCollection
+    PCollection<String> inputFiles = pipeline.apply("Create Input", 
Create.of(file1, file2, file3));
+
+    // 3. Apply the transform (Trigger aggressively for testing)
+    PCollectionRowTuple output =
+        inputFiles.apply(
+            new AddFiles(
+                catalogConfig,
+                tableId.toString(),
+                null, // no prefix, so determine partition from DF metrics
+                partitionFields,
+                tableProps,
+                2, // trigger at 2 files
+                Duration.standardSeconds(10)));
+
+    // 4. There should be an error for File3, because its partition could not 
be determined
+    PAssert.that(output.get("errors"))
+        .satisfies(
+            errorRows -> {
+              Row errorRow = Iterables.getOnlyElement(errorRows);
+              checkState(
+                  errorRow.getSchema().equals(AddFiles.ERROR_SCHEMA)
+                      && file3.equals(errorRow.getString(0))
+                      && checkStateNotNull(errorRow.getString(1))
+                          .startsWith(UNKNOWN_PARTITION_ERROR));
+              return null;
+            });
+
+    // 5. Run the pipeline
+    pipeline.run().waitUntilFinish();
+
+    // 6. Validate the Iceberg Table was created with the correct spec and 
properties
+    Table table = catalog.loadTable(tableId);
+    tableProps.forEach((key, value) -> assertThat(table.properties(), 
hasEntry(key, value)));
+    assertEquals(spec, table.spec());
+
+    // Check that we have exactly 1 snapshot with 2 files
+    assertEquals(1, Iterables.size(table.snapshots()));
+
+    List<DataFile> addedFiles =
+        Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()));
+    assertEquals(2, addedFiles.size());
+
+    // Verify file paths
+    assertTrue(addedFiles.stream().anyMatch(df -> 
df.location().contains("data1.parquet")));
+    assertTrue(addedFiles.stream().anyMatch(df -> 
df.location().contains("data2.parquet")));
+
+    // check metrics metadata is preserved
+    DataFile writtenDf1 = writer.toDataFile();
+    DataFile writtenDf2 = writer2.toDataFile();
+    DataFile addedDf1 =
+        Iterables.getOnlyElement(
+            addedFiles.stream()
+                .filter(df -> df.location().contains("data1.parquet"))
+                .collect(Collectors.toList()));
+    DataFile addedDf2 =
+        Iterables.getOnlyElement(
+            addedFiles.stream()
+                .filter(df -> df.location().contains("data2.parquet"))
+                .collect(Collectors.toList()));
+
+    assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds());
+    assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds());
+    assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds());
+    assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds());
+
+    // check partition metadata is preserved
+    assertEquals(expectedPartition1, addedDf1.partition());
+    assertEquals(expectedPartition2, addedDf2.partition());
+  }
+
+  @Test
+  public void testStreamingAdds() throws IOException {
+    List<String> paths = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      String file = String.format("%sdata_%s.parquet", root, i);
+      DataWriter<Record> writer = createWriter(file);
+      writer.write(record(1, "SomeName", 30));
+      writer.close();
+      paths.add(file);
+    }
+
+    PCollection<String> files =
+        pipeline.apply(
+            TestStream.create(StringUtf8Coder.of())
+                .addElements(
+                    paths.get(0),
+                    paths.subList(1, 15).toArray(new String[] {})) // should 
commit twice
+                .advanceProcessingTime(Duration.standardSeconds(10))
+                .addElements(
+                    paths.get(15),
+                    paths.subList(16, 40).toArray(new String[] {})) // should 
commit 3 times
+                .advanceProcessingTime(Duration.standardSeconds(10))
+                .addElements(
+                    paths.get(40),
+                    paths.subList(41, 45).toArray(new String[] {})) // should 
commit once
+                .advanceWatermarkToInfinity());
+
+    files.apply(
+        new AddFiles(
+            catalogConfig,
+            tableId.toString(),
+            null,
+            null,
+            null,
+            10, // trigger at 10 files
+            Duration.standardSeconds(5)));
+    pipeline.run().waitUntilFinish();
+
+    Table table = catalog.loadTable(tableId);
+
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis));
+
+    assertEquals(6, snapshots.size());
+    assertEquals(10, 
Iterables.size(snapshots.get(0).addedDataFiles(table.io())));
+    assertEquals(5, 
Iterables.size(snapshots.get(1).addedDataFiles(table.io())));
+    assertEquals(10, 
Iterables.size(snapshots.get(2).addedDataFiles(table.io())));
+    assertEquals(10, 
Iterables.size(snapshots.get(3).addedDataFiles(table.io())));
+    assertEquals(5, 
Iterables.size(snapshots.get(4).addedDataFiles(table.io())));
+    assertEquals(5, 
Iterables.size(snapshots.get(5).addedDataFiles(table.io())));
+  }
+
+  @Test
+  public void testUnknownFormatErrors() throws Exception {
+    catalog.createTable(tableId, icebergSchema);
+    // Create a dummy text file (unsupported extension)
+    File txtFile = temp.newFile("unsupported.txt");
+    txtFile.createNewFile();
+
+    PCollection<String> inputFiles =
+        pipeline.apply("Create Input", Create.of(txtFile.getAbsolutePath()));
+
+    AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, 
null, null, 1, null);
+    PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+    // Validate the file ended up in the errors PCollection with the correct 
schema
+    PAssert.that(outputTuple.get("errors"))
+        .containsInAnyOrder(
+            Row.withSchema(AddFiles.ERROR_SCHEMA)
+                .addValues(txtFile.getAbsolutePath(), "Could not determine the 
file's format")
+                .build());
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testPartitionPrefixErrors() throws Exception {
+    // Drop unpartitioned table and create a partitioned one
+    catalog.dropTable(tableId);
+    PartitionSpec spec = 
PartitionSpec.builderFor(icebergSchema).identity("name").build();
+    catalog.createTable(tableId, icebergSchema, spec);
+
+    String file1 = root + "data1.parquet";
+    wrapper.wrap(record(-1, "And", 30));
+    DataWriter<Record> writer = createWriter(file1, wrapper.copy());
+    writer.write(record(1, "Andrew", 30));
+    writer.close();
+
+    PCollection<String> inputFiles = pipeline.apply("Create Input", 
Create.of(file1));
+
+    // Notice locationPrefix is "some/prefix/" but the absolute path doesn't 
start with it
+    AddFiles addFiles =
+        new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", null, 
null, 1, null);
+    PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+    PAssert.that(outputTuple.get("errors"))
+        .containsInAnyOrder(
+            Row.withSchema(AddFiles.ERROR_SCHEMA).addValues(file1, 
PREFIX_ERROR).build());
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testRecognizesBucketPartitionMismatch() throws IOException {
+    catalog.dropTable(tableId);
+
+    String file1 = root + "data1.parquet";
+    wrapper.wrap(record(-1, "And", 30));
+    DataWriter<Record> writer = createWriter(file1, wrapper.copy());
+    writer.write(record(1, "Andrew", 30));
+    writer.write(record(5, "Sally", 30));
+    writer.write(record(10, "Ahmed", 30));
+    writer.close();
+
+    // 1 (min) and 10 (max) will transform to bucket=0
+    // 5 (some middle value) transforms to bucket=1
+    // To prove this transform value mapping^, below is a sanity check.
+    // We should recognize that we cannot assign a partition to such a file, 
and pass it to DLQ.
+    List<String> partitionFields = Arrays.asList("bucket(id, 2)", "age");
+    PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, 
icebergSchema);
+    PartitionField bucketPartition = spec.fields().get(0);
+    assertEquals("id_bucket", bucketPartition.name());
+    assertTrue(bucketPartition.transform().toString().contains("bucket["));
+    SerializableFunction<Long, Integer> transformFunc =
+        (SerializableFunction<Long, Integer>)
+            bucketPartition.transform().bind(Types.LongType.get());
+    assertEquals(0, (int) transformFunc.apply(1L));
+    assertEquals(1, (int) transformFunc.apply(5L));
+    assertEquals(0, (int) transformFunc.apply(10L));
+
+    AddFiles addFiles =
+        new AddFiles(catalogConfig, tableId.toString(), null, partitionFields, 
null, 1, null);
+    PCollection<String> inputFiles = pipeline.apply("Create Input", 
Create.of(file1));
+    PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+    PAssert.that(outputTuple.get("errors"))
+        .containsInAnyOrder(
+            Row.withSchema(AddFiles.ERROR_SCHEMA)
+                .addValues(
+                    file1,
+                    UNKNOWN_PARTITION_ERROR
+                        + "Found records with conflicting transformed values, 
for column: id")
+                .build());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testGetPartitionFromMetrics() throws IOException, 
InterruptedException {
+    PartitionSpec partitionSpec =
+        PartitionSpec.builderFor(icebergSchema)
+            .bucket("id", 2)
+            .truncate("name", 4)
+            .identity("age")
+            .build();
+
+    List<PartitionTestCase> testCases =
+        Arrays.asList(
+            PartitionTestCase.of(
+                root + "data_1.parquet",
+                record(1, "aaaa", 10),
+                Arrays.asList(
+                    record(1, "aaaa123", 10),
+                    record(10, "aaaa789", 10),
+                    record(100, "aaaa456", 10)),
+                Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10),
+                Arrays.asList(100, CharBuffer.wrap("aaaa789"), 10),
+                "id_bucket=0/name_trunc=aaaa/age=10"),
+            PartitionTestCase.of(
+                root + "data_2.parquet",
+                record(1, "bbbb", 30),
+                Arrays.asList(
+                    record(5, "bbbb789", 30),
+                    record(55, "bbbb456", 30),
+                    record(500, "bbbb123", 30)),
+                Arrays.asList(5, CharBuffer.wrap("bbbb123"), 30),
+                Arrays.asList(500, CharBuffer.wrap("bbbb789"), 30),
+                "id_bucket=1/name_trunc=bbbb/age=30"));
+
+    PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema);
+    MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps);
+    Table table = catalog.createTable(tableId, icebergSchema, partitionSpec);
+
+    for (PartitionTestCase caze : testCases) {
+      List<Record> records = caze.records;
+      String fileName = caze.fileName;
+      pk.wrap(caze.partition);
+      DataWriter<Record> writer = createWriter(fileName, pk.copy());
+
+      for (Record record : records) {
+        writer.write(record);
+      }
+      writer.close();
+      InputFile file = table.io().newInputFile(fileName);
+
+      Metrics metrics =
+          AddFiles.getFileMetrics(
+              file, FileFormat.PARQUET, metricsConfig, 
MappingUtil.create(icebergSchema));
+      for (int i = 0; i < partitionSpec.fields().size(); i++) {
+        PartitionField partitionField = partitionSpec.fields().get(i);
+        Types.NestedField field = 
icebergSchema.findField(partitionField.sourceId());
+        ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId());
+        ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId());
+
+        Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes);
+        Object upper = Conversions.fromByteBuffer(field.type(), upperBytes);
+
+        assertEquals(caze.expectedLower.get(i), lower);
+        assertEquals(caze.expectedUpper.get(i), upper);
+      }
+
+      String partitionPath = getPartitionFromMetrics(metrics, file, table);
+      assertEquals(caze.expectedPartition, partitionPath);
+    }
+  }
+
+  @Test
+  public void testThrowPartitionMismatchError() throws IOException, 
InterruptedException {
+    PartitionSpec partitionSpec =
+        PartitionSpec.builderFor(icebergSchema)
+            .bucket("id", 2)
+            .truncate("name", 4)
+            .identity("age")
+            .build();
+
+    List<PartitionTestCase> testCases =
+        Arrays.asList(
+            PartitionTestCase.of(
+                root + "data_1.parquet",
+                record(1, "aaaa", 10),
+                Arrays.asList(
+                    record(1, "aaaa123", 10), record(10, "abab", 10), 
record(100, "aaaa789", 10)),
+                Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10),
+                Arrays.asList(100, CharBuffer.wrap("abab"), 10),
+                "error"),
+            PartitionTestCase.of(
+                root + "data_2.parquet",
+                record(1, "bbbb", 30),
+                Arrays.asList(
+                    record(5, "bbbb", 30), record(55, "bbbb", 30), record(500, 
"bbbb", 50)),
+                Arrays.asList(5, CharBuffer.wrap("bbbb"), 30),
+                Arrays.asList(500, CharBuffer.wrap("bbbb"), 50),
+                "error"));
+
+    PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema);
+    MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps);
+    Table table = catalog.createTable(tableId, icebergSchema, partitionSpec);
+
+    for (PartitionTestCase caze : testCases) {
+      List<Record> records = caze.records;
+      String fileName = caze.fileName;
+      pk.wrap(caze.partition);
+      DataWriter<Record> writer = createWriter(fileName, pk.copy());
+
+      for (Record record : records) {
+        writer.write(record);
+      }
+      writer.close();
+      InputFile file = table.io().newInputFile(fileName);
+
+      Metrics metrics =
+          AddFiles.getFileMetrics(
+              file, FileFormat.PARQUET, metricsConfig, 
MappingUtil.create(icebergSchema));
+      // check that lower/upper stats are still fetched correctly
+      for (int i = 0; i < partitionSpec.fields().size(); i++) {
+        PartitionField partitionField = partitionSpec.fields().get(i);
+        Types.NestedField field = 
icebergSchema.findField(partitionField.sourceId());
+        ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId());
+        ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId());
+
+        Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes);
+        Object upper = Conversions.fromByteBuffer(field.type(), upperBytes);
+
+        assertEquals(caze.expectedLower.get(i), lower);
+        assertEquals(caze.expectedUpper.get(i), upper);
+      }
+
+      assertThrows(
+          AddFiles.UnknownPartitionException.class,
+          () -> getPartitionFromMetrics(metrics, file, table));
+    }
+  }
+
+  static class PartitionTestCase {
+    String fileName;
+    StructLike partition;
+    List<Record> records;
+    List<Object> expectedLower;
+    List<Object> expectedUpper;
+    String expectedPartition;
+
+    PartitionTestCase(
+        String fileName,
+        StructLike partition,
+        List<Record> records,
+        List<Object> expectedLower,
+        List<Object> expectedUpper,
+        String expectedPartition) {
+      this.fileName = fileName;
+      this.partition = partition;
+      this.records = records;
+      this.expectedLower = expectedLower;
+      this.expectedUpper = expectedUpper;
+      this.expectedPartition = expectedPartition;
+    }
+
+    static PartitionTestCase of(
+        String fileName,
+        StructLike partition,
+        List<Record> records,
+        List<Object> expectedLower,
+        List<Object> expectedUpper,
+        String expectedPartition) {
+      return new PartitionTestCase(
+          fileName, partition, records, expectedLower, expectedUpper, 
expectedPartition);
+    }
+  }
+
+  private DataWriter<Record> createWriter(String file) throws IOException {
+    return createWriter(file, null);
+  }
+
+  private DataWriter<Record> createWriter(String file, @Nullable StructLike 
partition)
+      throws IOException {
+    return Parquet.writeData(Files.localOutput(file))
+        .schema(icebergSchema)
+        .withSpec(partition != null ? spec : PartitionSpec.unpartitioned())
+        .withPartition(partition)
+        .createWriterFunc(GenericParquetWriter::create)
+        .build();
+  }
+
+  private Record record(int id, String name, int age) {
+    return GenericRecord.create(icebergSchema).copy("id", id, "name", name, 
"age", age);
+  }
+}
diff --git 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index e6e4e27d74f..d365cd5f7c7 100644
--- 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -971,10 +971,10 @@ public class ParquetIO {
       }
     }
 
-    private static class BeamParquetInputFile implements InputFile {
+    public static class BeamParquetInputFile implements InputFile {
       private final SeekableByteChannel seekableByteChannel;
 
-      BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
+      public BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
         this.seekableByteChannel = seekableByteChannel;
       }
 


Reply via email to