This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 40951024bfe Iceberg AddFiles -- handle FileNotFound exceptions (#37952)
40951024bfe is described below

commit 40951024bfec7f4196a35a7da1c58502901a9075
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Mar 25 15:36:08 2026 -0400

    Iceberg AddFiles -- handle FileNotFound exceptions (#37952)
    
    * handle filenotfound exceptions
    
    * trigger ITs
---
 .../IO_Iceberg_Integration_Tests.json              |  2 +-
 .../org/apache/beam/sdk/io/iceberg/AddFiles.java   | 61 +++++++++++++++++-----
 .../apache/beam/sdk/io/iceberg/AddFilesTest.java   | 27 +++++++++-
 3 files changed, 74 insertions(+), 16 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index b73af5e61a4..7ab7bcd9a9c 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 1
+    "modification": 2
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
index 4a164700099..e250536382e 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
@@ -23,6 +23,7 @@ import static org.apache.beam.sdk.metrics.Metrics.counter;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SeekableByteChannel;
@@ -81,6 +82,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
@@ -240,7 +242,18 @@ public class AddFiles extends 
PTransform<PCollection<String>, PCollectionRowTupl
       }
 
       if (table == null) {
-        table = getOrCreateTable(getSchema(filePath, format));
+        try {
+          table = getOrCreateTable(filePath, format);
+        } catch (FileNotFoundException e) {
+          output
+              .get(ERRORS)
+              .output(
+                  Row.withSchema(ERROR_SCHEMA)
+                      .addValues(filePath, checkStateNotNull(e.getMessage()))
+                      .build());
+          numErrorFiles.inc();
+          return;
+        }
       }
 
       // Check if the file path contains the provided prefix
@@ -256,9 +269,24 @@ public class AddFiles extends 
PTransform<PCollection<String>, PCollectionRowTupl
 
       InputFile inputFile = table.io().newInputFile(filePath);
 
-      Metrics metrics =
-          getFileMetrics(
-              inputFile, format, MetricsConfig.forTable(table), 
MappingUtil.create(table.schema()));
+      Metrics metrics;
+      try {
+        metrics =
+            getFileMetrics(
+                inputFile,
+                format,
+                MetricsConfig.forTable(table),
+                MappingUtil.create(table.schema()));
+      } catch (FileNotFoundException e) {
+        output
+            .get(ERRORS)
+            .output(
+                Row.withSchema(ERROR_SCHEMA)
+                    .addValues(filePath, checkStateNotNull(e.getMessage()))
+                    .build());
+        numErrorFiles.inc();
+        return;
+      }
 
       // Figure out which partition this DataFile should go to
       String partitionPath;
@@ -304,16 +332,23 @@ public class AddFiles extends 
PTransform<PCollection<String>, PCollectionRowTupl
       return transform.bind(type).apply((W) value);
     }
 
-    private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
-      PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, 
schema);
+    private Table getOrCreateTable(String filePath, FileFormat format) throws 
IOException {
+      TableIdentifier tableId = TableIdentifier.parse(identifier);
       try {
-        return tableProps == null
-            ? 
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, 
spec)
-            : catalogConfig
-                .catalog()
-                .createTable(TableIdentifier.parse(identifier), schema, spec, 
tableProps);
-      } catch (AlreadyExistsException e) { // if table already exists, just 
load it
-        return 
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+        return catalogConfig.catalog().loadTable(tableId);
+      } catch (NoSuchTableException e) {
+        try {
+          org.apache.iceberg.Schema schema = getSchema(filePath, format);
+          PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, 
schema);
+
+          return tableProps == null
+              ? 
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, 
spec)
+              : catalogConfig
+                  .catalog()
+                  .createTable(TableIdentifier.parse(identifier), schema, 
spec, tableProps);
+        } catch (AlreadyExistsException e2) { // if table already exists, just 
load it
+          return 
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+        }
       }
     }
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
index 287b9140e00..56ba36919e5 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.getParti
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
@@ -422,8 +423,6 @@ public class AddFilesTest {
 
   @Test
   public void testRecognizesBucketPartitionMismatch() throws IOException {
-    catalog.dropTable(tableId);
-
     String file1 = root + "data1.parquet";
     wrapper.wrap(record(-1, "And", 30));
     DataWriter<Record> writer = createWriter(file1, wrapper.copy());
@@ -464,6 +463,30 @@ public class AddFilesTest {
     pipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testCatchFileNotFoundException() throws IOException {
+    String file = root + "non-existent.parquet";
+
+    PCollectionRowTuple outputTuple =
+        pipeline
+            .apply("Create Input", Create.of(file))
+            .apply(new AddFiles(catalogConfig, tableId.toString(), null, null, 
null, 1, null));
+
+    PAssert.that(outputTuple.get("errors"))
+        .satisfies(
+            rows -> {
+              Row error = Iterables.getOnlyElement(rows);
+              String errorFile = error.getString("file");
+              String message = error.getString("error");
+
+              assertEquals(file, errorFile);
+              assertThat(message, containsString("No files found"));
+              assertThat(message, containsString(errorFile));
+              return null;
+            });
+    pipeline.run().waitUntilFinish();
+  }
+
   @Test
   public void testGetPartitionFromMetrics() throws IOException, 
InterruptedException {
     PartitionSpec partitionSpec =

Reply via email to