This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 40951024bfe Iceberg AddFiles -- handle FileNotFound exceptions (#37952)
40951024bfe is described below
commit 40951024bfec7f4196a35a7da1c58502901a9075
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Mar 25 15:36:08 2026 -0400
Iceberg AddFiles -- handle FileNotFound exceptions (#37952)
* handle filenotfound exceptions
* trigger ITs
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
.../org/apache/beam/sdk/io/iceberg/AddFiles.java | 61 +++++++++++++++++-----
.../apache/beam/sdk/io/iceberg/AddFilesTest.java | 27 +++++++++-
3 files changed, 74 insertions(+), 16 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index b73af5e61a4..7ab7bcd9a9c 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 1
+ "modification": 2
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
index 4a164700099..e250536382e 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
@@ -23,6 +23,7 @@ import static org.apache.beam.sdk.metrics.Metrics.counter;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
@@ -81,6 +82,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
@@ -240,7 +242,18 @@ public class AddFiles extends
PTransform<PCollection<String>, PCollectionRowTupl
}
if (table == null) {
- table = getOrCreateTable(getSchema(filePath, format));
+ try {
+ table = getOrCreateTable(filePath, format);
+ } catch (FileNotFoundException e) {
+ output
+ .get(ERRORS)
+ .output(
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(filePath, checkStateNotNull(e.getMessage()))
+ .build());
+ numErrorFiles.inc();
+ return;
+ }
}
// Check if the file path contains the provided prefix
@@ -256,9 +269,24 @@ public class AddFiles extends
PTransform<PCollection<String>, PCollectionRowTupl
InputFile inputFile = table.io().newInputFile(filePath);
- Metrics metrics =
- getFileMetrics(
- inputFile, format, MetricsConfig.forTable(table),
MappingUtil.create(table.schema()));
+ Metrics metrics;
+ try {
+ metrics =
+ getFileMetrics(
+ inputFile,
+ format,
+ MetricsConfig.forTable(table),
+ MappingUtil.create(table.schema()));
+ } catch (FileNotFoundException e) {
+ output
+ .get(ERRORS)
+ .output(
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(filePath, checkStateNotNull(e.getMessage()))
+ .build());
+ numErrorFiles.inc();
+ return;
+ }
// Figure out which partition this DataFile should go to
String partitionPath;
@@ -304,16 +332,23 @@ public class AddFiles extends
PTransform<PCollection<String>, PCollectionRowTupl
return transform.bind(type).apply((W) value);
}
- private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
- PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
schema);
+ private Table getOrCreateTable(String filePath, FileFormat format) throws
IOException {
+ TableIdentifier tableId = TableIdentifier.parse(identifier);
try {
- return tableProps == null
- ?
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema,
spec)
- : catalogConfig
- .catalog()
- .createTable(TableIdentifier.parse(identifier), schema, spec,
tableProps);
- } catch (AlreadyExistsException e) { // if table already exists, just
load it
- return
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+ return catalogConfig.catalog().loadTable(tableId);
+ } catch (NoSuchTableException e) {
+ try {
+ org.apache.iceberg.Schema schema = getSchema(filePath, format);
+ PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
schema);
+
+ return tableProps == null
+ ?
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema,
spec)
+ : catalogConfig
+ .catalog()
+ .createTable(TableIdentifier.parse(identifier), schema,
spec, tableProps);
+ } catch (AlreadyExistsException e2) { // if table already exists, just
load it
+ return
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+ }
}
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
index 287b9140e00..56ba36919e5 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
@@ -23,6 +23,7 @@ import static
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.getParti
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@@ -422,8 +423,6 @@ public class AddFilesTest {
@Test
public void testRecognizesBucketPartitionMismatch() throws IOException {
- catalog.dropTable(tableId);
-
String file1 = root + "data1.parquet";
wrapper.wrap(record(-1, "And", 30));
DataWriter<Record> writer = createWriter(file1, wrapper.copy());
@@ -464,6 +463,30 @@ public class AddFilesTest {
pipeline.run().waitUntilFinish();
}
+ @Test
+ public void testCatchFileNotFoundException() throws IOException {
+ String file = root + "non-existent.parquet";
+
+ PCollectionRowTuple outputTuple =
+ pipeline
+ .apply("Create Input", Create.of(file))
+ .apply(new AddFiles(catalogConfig, tableId.toString(), null, null,
null, 1, null));
+
+ PAssert.that(outputTuple.get("errors"))
+ .satisfies(
+ rows -> {
+ Row error = Iterables.getOnlyElement(rows);
+ String errorFile = error.getString("file");
+ String message = error.getString("error");
+
+ assertEquals(file, errorFile);
+ assertThat(message, containsString("No files found"));
+ assertThat(message, containsString(errorFile));
+ return null;
+ });
+ pipeline.run().waitUntilFinish();
+ }
+
@Test
public void testGetPartitionFromMetrics() throws IOException,
InterruptedException {
PartitionSpec partitionSpec =