This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 0168ec31c Spark: Backport CommitStateUnknownException handling for 
RewriteManifestSparkAction (#4850) (#4854)
0168ec31c is described below

commit 0168ec31c3f097d5c20ab877fda103b871b6da12
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Tue May 24 20:50:24 2022 +0200

    Spark: Backport CommitStateUnknownException handling for 
RewriteManifestSparkAction (#4850) (#4854)
    
    
    Co-authored-by: Prashant Singh <[email protected]>
---
 .../actions/BaseRewriteManifestsSparkAction.java   |  4 ++
 .../spark/actions/TestRewriteManifestsAction.java  | 69 ++++++++++++++++++++++
 .../actions/BaseRewriteManifestsSparkAction.java   |  4 ++
 .../spark/actions/TestRewriteManifestsAction.java  | 69 ++++++++++++++++++++++
 .../actions/BaseRewriteManifestsSparkAction.java   |  4 ++
 .../spark/actions/TestRewriteManifestsAction.java  | 69 ++++++++++++++++++++++
 6 files changed, 219 insertions(+)

diff --git 
a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
 
b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 05a845102..f887fedd4 100644
--- 
a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ 
b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ public class BaseRewriteManifestsSparkAction
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      // don't clean up added manifest files, because they may have been 
successfully committed.
+      throw commitStateUnknownException;
     } catch (Exception e) {
       // delete all new manifests because the rewrite failed
       deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
diff --git 
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index ec5c54d21..40adb7d4c 100644
--- 
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManifestsWithCommitStateUnknownException() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+
+    SparkActions actions = SparkActions.get();
+
+    // create a spy which would throw a CommitStateUnknownException after 
successful commit.
+    org.apache.iceberg.RewriteManifests newRewriteManifests = 
table.rewriteManifests();
+    org.apache.iceberg.RewriteManifests spyNewRewriteManifests = 
spy(newRewriteManifests);
+    doAnswer(invocation -> {
+      newRewriteManifests.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter 
on Fire"));
+    }).when(spyNewRewriteManifests).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
+
+    AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown 
Exception",
+        RuntimeException.class,
+        "Datacenter on Fire",
+        () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> 
true).execute());
+
+    table.refresh();
+
+    // table should reflect the changes, since the commit was successful
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+
+    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
+    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
+    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   @Test
   public void testRewriteSmallManifestsPartitionedTable() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
diff --git 
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 05a845102..f887fedd4 100644
--- 
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ public class BaseRewriteManifestsSparkAction
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      // don't clean up added manifest files, because they may have been 
successfully committed.
+      throw commitStateUnknownException;
     } catch (Exception e) {
       // delete all new manifests because the rewrite failed
       deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
diff --git 
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index ec5c54d21..40adb7d4c 100644
--- 
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManifestsWithCommitStateUnknownException() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+
+    SparkActions actions = SparkActions.get();
+
+    // create a spy which would throw a CommitStateUnknownException after 
successful commit.
+    org.apache.iceberg.RewriteManifests newRewriteManifests = 
table.rewriteManifests();
+    org.apache.iceberg.RewriteManifests spyNewRewriteManifests = 
spy(newRewriteManifests);
+    doAnswer(invocation -> {
+      newRewriteManifests.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter 
on Fire"));
+    }).when(spyNewRewriteManifests).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
+
+    AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown 
Exception",
+        RuntimeException.class,
+        "Datacenter on Fire",
+        () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> 
true).execute());
+
+    table.refresh();
+
+    // table should reflect the changes, since the commit was successful
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+
+    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
+    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
+    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   @Test
   public void testRewriteSmallManifestsPartitionedTable() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 05a845102..f887fedd4 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ public class BaseRewriteManifestsSparkAction
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      // don't clean up added manifest files, because they may have been 
successfully committed.
+      throw commitStateUnknownException;
     } catch (Exception e) {
       // delete all new manifests because the rewrite failed
       deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index ec5c54d21..40adb7d4c 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManifestsWithCommitStateUnknownException() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+
+    SparkActions actions = SparkActions.get();
+
+    // create a spy which would throw a CommitStateUnknownException after 
successful commit.
+    org.apache.iceberg.RewriteManifests newRewriteManifests = 
table.rewriteManifests();
+    org.apache.iceberg.RewriteManifests spyNewRewriteManifests = 
spy(newRewriteManifests);
+    doAnswer(invocation -> {
+      newRewriteManifests.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter 
on Fire"));
+    }).when(spyNewRewriteManifests).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
+
+    AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown 
Exception",
+        RuntimeException.class,
+        "Datacenter on Fire",
+        () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> 
true).execute());
+
+    table.refresh();
+
+    // table should reflect the changes, since the commit was successful
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+
+    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
+    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
+    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   @Test
   public void testRewriteSmallManifestsPartitionedTable() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)

Reply via email to