This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 17198be0313 fix: prevent NoSuchElementException in AppendFilesToTables 
(#37217)
17198be0313 is described below

commit 17198be0313932021a9fca621e2273ab7cef380d
Author: liferoad <[email protected]>
AuthorDate: Fri Jan 16 05:31:14 2026 -0500

    fix: prevent NoSuchElementException in AppendFilesToTables (#37217)
---
 .../beam/sdk/io/iceberg/AppendFilesToTables.java   | 33 +++++---
 .../beam/sdk/io/iceberg/SerializableDataFile.java  |  2 +-
 .../sdk/io/iceberg/AppendFilesToTablesTest.java    | 98 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index 1789932d69a..db95c670385 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -135,7 +135,8 @@ class AppendFilesToTables
       }
 
       // vast majority of the time, we will simply append data files.
-      // in the rare case we get a batch that contains multiple partition 
specs, we will group
+      // in the rare case we get a batch that contains multiple partition 
specs, we
+      // will group
       // data into manifest files and append.
       // note: either way, we must use a single commit operation for atomicity.
       if (containsMultiplePartitionSpecs(fileWriteResults)) {
@@ -163,11 +164,14 @@ class AppendFilesToTables
       update.commit();
     }
 
-    // When a user updates their table partition spec during runtime, we can 
end up with
-    // a batch of files where some are written with the old spec and some are 
written with the new
+    // When a user updates their table partition spec during runtime, we can 
end up
+    // with
+    // a batch of files where some are written with the old spec and some are
+    // written with the new
     // spec.
     // A table commit is limited to a single partition spec.
-    // To handle this, we create a manifest file for each partition spec, and 
group data files
+    // To handle this, we create a manifest file for each partition spec, and 
group
+    // data files
     // accordingly.
     // Afterward, we append all manifests using a single commit operation.
     private void appendManifestFiles(Table table, Iterable<FileWriteResult> 
fileWriteResults)
@@ -211,14 +215,18 @@ class AppendFilesToTables
       return ManifestFiles.write(spec, io.newOutputFile(location));
     }
 
-    // If the process call fails immediately after a successful commit, it 
gets retried with
+    // If the process call fails immediately after a successful commit, it gets
+    // retried with
     // the same data, possibly leading to data duplication.
-    // To mitigate, we skip the current batch of files if it matches the most 
recently committed
+    // To mitigate, we skip the current batch of files if it matches the most
+    // recently committed
     // batch.
     //
-    // TODO(ahmedabu98): This does not cover concurrent writes from other 
pipelines, where the
-    //  "last successful snapshot" might reflect commits from other sources. 
Ideally, we would make
-    //  this stateful, but that is update incompatible.
+    // TODO(ahmedabu98): This does not cover concurrent writes from other 
pipelines,
+    // where the
+    // "last successful snapshot" might reflect commits from other sources. 
Ideally,
+    // we would make
+    // this stateful, but that is update incompatible.
     // TODO(ahmedabu98): add load test pipelines with intentional periodic 
crashing
     private boolean shouldSkip(Table table, Iterable<FileWriteResult> 
fileWriteResults) {
       if (table.currentSnapshot() == null) {
@@ -231,8 +239,11 @@ class AppendFilesToTables
       // Check if the current batch is identical to the most recently 
committed batch.
       // Upstream GBK means we always get the same batch of files on retry,
       // so a single overlapping file means the whole batch is identical.
-      String sampleCommittedDataFilePath =
-          
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString();
+      Iterable<DataFile> addedDataFiles = 
table.currentSnapshot().addedDataFiles(table.io());
+      if (!addedDataFiles.iterator().hasNext()) {
+        return false;
+      }
+      String sampleCommittedDataFilePath = 
addedDataFiles.iterator().next().location().toString();
       for (FileWriteResult result : fileWriteResults) {
         if 
(result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath))
 {
           return true;
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 5c994c3e565..f54cef16c15 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -141,7 +141,7 @@ abstract class SerializableDataFile {
   static SerializableDataFile from(DataFile f, String partitionPath) {
 
     return SerializableDataFile.builder()
-        .setPath(f.path().toString())
+        .setPath(f.location().toString())
         .setFileFormat(f.format().toString())
         .setRecordCount(f.recordCount())
         .setFileSizeInBytes(f.fileSizeInBytes())
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
new file mode 100644
index 00000000000..c4709256b4d
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class AppendFilesToTablesTest implements Serializable {
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public transient TestDataWarehouse warehouse = new 
TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+  @Test
+  public void testAppendAfterDelete() throws Exception {
+    TableIdentifier tableId =
+        TableIdentifier.of("default", "table" + 
Long.toString(UUID.randomUUID().hashCode(), 16));
+
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
+
+    IcebergCatalogConfig catalog =
+        IcebergCatalogConfig.builder()
+            .setCatalogName("name")
+            .setCatalogProperties(catalogProps)
+            .build();
+
+    // 1. Create table and write some data using first pipeline
+    Pipeline p1 = Pipeline.create(PipelineOptionsFactory.create());
+    p1.apply("Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
+        
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+        .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));
+
+    p1.run().waitUntilFinish();
+
+    // 2. Delete the data
+    Table table = warehouse.loadTable(tableId);
+    DeleteFiles delete = table.newDelete();
+    // Delete all data files in the current snapshot
+    
table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile);
+    delete.commit();
+
+    // 3. Write more data using a fresh second pipeline
+    Pipeline p2 = Pipeline.create(PipelineOptionsFactory.create());
+    p2.apply("More Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2)))
+        
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+        .apply("Append More To Table", 
IcebergIO.writeRows(catalog).to(tableId));
+
+    p2.run().waitUntilFinish();
+
+    // Verify data - after delete and append, only FILE1SNAPSHOT2 should be 
present
+    table.refresh();
+    List<Record> writtenRecords = 
ImmutableList.copyOf(IcebergGenerics.read(table).build());
+    assertThat(writtenRecords, 
Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray()));
+  }
+}

Reply via email to