This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 17198be0313 fix: prevent NoSuchElementException in AppendFilesToTables
(#37217)
17198be0313 is described below
commit 17198be0313932021a9fca621e2273ab7cef380d
Author: liferoad <[email protected]>
AuthorDate: Fri Jan 16 05:31:14 2026 -0500
fix: prevent NoSuchElementException in AppendFilesToTables (#37217)
---
.../beam/sdk/io/iceberg/AppendFilesToTables.java | 33 +++++---
.../beam/sdk/io/iceberg/SerializableDataFile.java | 2 +-
.../sdk/io/iceberg/AppendFilesToTablesTest.java | 98 ++++++++++++++++++++++
3 files changed, 121 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index 1789932d69a..db95c670385 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -135,7 +135,8 @@ class AppendFilesToTables
}
// vast majority of the time, we will simply append data files.
- // in the rare case we get a batch that contains multiple partition
specs, we will group
+ // in the rare case we get a batch that contains multiple partition
specs, we
+ // will group
// data into manifest files and append.
// note: either way, we must use a single commit operation for atomicity.
if (containsMultiplePartitionSpecs(fileWriteResults)) {
@@ -163,11 +164,14 @@ class AppendFilesToTables
update.commit();
}
- // When a user updates their table partition spec during runtime, we can
end up with
- // a batch of files where some are written with the old spec and some are
written with the new
+ // When a user updates their table partition spec during runtime, we can
end up
+ // with
+ // a batch of files where some are written with the old spec and some are
+ // written with the new
// spec.
// A table commit is limited to a single partition spec.
- // To handle this, we create a manifest file for each partition spec, and
group data files
+ // To handle this, we create a manifest file for each partition spec, and
group
+ // data files
// accordingly.
// Afterward, we append all manifests using a single commit operation.
private void appendManifestFiles(Table table, Iterable<FileWriteResult>
fileWriteResults)
@@ -211,14 +215,18 @@ class AppendFilesToTables
return ManifestFiles.write(spec, io.newOutputFile(location));
}
- // If the process call fails immediately after a successful commit, it
gets retried with
+ // If the process call fails immediately after a successful commit, it gets
+ // retried with
// the same data, possibly leading to data duplication.
- // To mitigate, we skip the current batch of files if it matches the most
recently committed
+ // To mitigate, we skip the current batch of files if it matches the most
+ // recently committed
// batch.
//
- // TODO(ahmedabu98): This does not cover concurrent writes from other
pipelines, where the
- // "last successful snapshot" might reflect commits from other sources.
Ideally, we would make
- // this stateful, but that is update incompatible.
+ // TODO(ahmedabu98): This does not cover concurrent writes from other
pipelines,
+ // where the
+ // "last successful snapshot" might reflect commits from other sources.
Ideally,
+ // we would make
+ // this stateful, but that is update incompatible.
// TODO(ahmedabu98): add load test pipelines with intentional periodic
crashing
private boolean shouldSkip(Table table, Iterable<FileWriteResult>
fileWriteResults) {
if (table.currentSnapshot() == null) {
@@ -231,8 +239,11 @@ class AppendFilesToTables
// Check if the current batch is identical to the most recently
committed batch.
// Upstream GBK means we always get the same batch of files on retry,
// so a single overlapping file means the whole batch is identical.
- String sampleCommittedDataFilePath =
-
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString();
+ Iterable<DataFile> addedDataFiles =
table.currentSnapshot().addedDataFiles(table.io());
+ if (!addedDataFiles.iterator().hasNext()) {
+ return false;
+ }
+ String sampleCommittedDataFilePath =
addedDataFiles.iterator().next().location().toString();
for (FileWriteResult result : fileWriteResults) {
if
(result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath))
{
return true;
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 5c994c3e565..f54cef16c15 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -141,7 +141,7 @@ abstract class SerializableDataFile {
static SerializableDataFile from(DataFile f, String partitionPath) {
return SerializableDataFile.builder()
- .setPath(f.path().toString())
+ .setPath(f.location().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
new file mode 100644
index 00000000000..c4709256b4d
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class AppendFilesToTablesTest implements Serializable {
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public transient TestDataWarehouse warehouse = new
TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+ @Test
+ public void testAppendAfterDelete() throws Exception {
+ TableIdentifier tableId =
+ TableIdentifier.of("default", "table" +
Long.toString(UUID.randomUUID().hashCode(), 16));
+
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
+
+ IcebergCatalogConfig catalog =
+ IcebergCatalogConfig.builder()
+ .setCatalogName("name")
+ .setCatalogProperties(catalogProps)
+ .build();
+
+ // 1. Create table and write some data using first pipeline
+ Pipeline p1 = Pipeline.create(PipelineOptionsFactory.create());
+ p1.apply("Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+ .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));
+
+ p1.run().waitUntilFinish();
+
+ // 2. Delete the data
+ Table table = warehouse.loadTable(tableId);
+ DeleteFiles delete = table.newDelete();
+ // Delete all data files in the current snapshot
+
table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile);
+ delete.commit();
+
+ // 3. Write more data using a fresh second pipeline
+ Pipeline p2 = Pipeline.create(PipelineOptionsFactory.create());
+ p2.apply("More Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2)))
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+ .apply("Append More To Table",
IcebergIO.writeRows(catalog).to(tableId));
+
+ p2.run().waitUntilFinish();
+
+ // Verify data - after delete and append, only FILE1SNAPSHOT2 should be
present
+ table.refresh();
+ List<Record> writtenRecords =
ImmutableList.copyOf(IcebergGenerics.read(table).build());
+ assertThat(writtenRecords,
Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray()));
+ }
+}