This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 70b7aa534b API, Core, Spark: Change behavior of fastForward/replace to 
create the from branch if it does not exist (#9196)
70b7aa534b is described below

commit 70b7aa534b2c79ccd7b6c0e0fd1be980772bb20a
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Jan 22 14:56:43 2024 -0800

    API, Core, Spark: Change behavior of fastForward/replace to create the from 
branch if it does not exist (#9196)
---
 .../java/org/apache/iceberg/ManageSnapshots.java   |  6 ++-
 .../iceberg/UpdateSnapshotReferencesOperation.java |  6 ++-
 .../org/apache/iceberg/TestSnapshotManager.java    | 35 ++++++++------
 .../extensions/TestFastForwardBranchProcedure.java | 54 +++++++++++++---------
 .../procedures/FastForwardBranchProcedure.java     | 17 +++----
 5 files changed, 68 insertions(+), 50 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java 
b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
index 986bbb6f58..12cd5021fa 100644
--- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
@@ -164,7 +164,8 @@ public interface ManageSnapshots extends 
PendingUpdate<Snapshot> {
 
   /**
    * Replaces the {@code from} branch to point to the {@code to} snapshot. The 
{@code to} will
-   * remain unchanged, and {@code from} branch will retain its retention 
properties.
+   * remain unchanged, and {@code from} branch will retain its retention 
properties. If the {@code
+   * from} branch does not exist, it will be created with default retention 
properties.
    *
    * @param from Branch to replace
    * @param to The branch {@code from} should be replaced with
@@ -175,7 +176,8 @@ public interface ManageSnapshots extends 
PendingUpdate<Snapshot> {
   /**
    * Performs a fast-forward of {@code from} up to the {@code to} snapshot if 
{@code from} is an
    * ancestor of {@code to}. The {@code to} will remain unchanged, and {@code 
from} will retain its
-   * retention properties.
+   * retention properties. If the {@code from} branch does not exist, it will 
be created with
+   * default retention properties.
    *
    * @param from Branch to fast-forward
    * @param to Ref for the {@code from} branch to be fast forwarded to
diff --git 
a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java 
b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
index 2c3c6c1f7e..9d15bf0ee2 100644
--- 
a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
+++ 
b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
@@ -120,9 +120,11 @@ class UpdateSnapshotReferencesOperation implements 
PendingUpdate<Map<String, Sna
     Preconditions.checkNotNull(to, "Destination ref cannot be null");
     SnapshotRef branchToUpdate = updatedRefs.get(from);
     SnapshotRef toRef = updatedRefs.get(to);
-    Preconditions.checkArgument(
-        branchToUpdate != null, "Branch to update does not exist: %s", from);
     Preconditions.checkArgument(toRef != null, "Ref does not exist: %s", to);
+    if (branchToUpdate == null) {
+      return createBranch(from, toRef.snapshotId());
+    }
+
     Preconditions.checkArgument(branchToUpdate.isBranch(), "Ref %s is a tag 
not a branch", from);
 
     // Nothing to replace
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
index d561d697d3..fd22ae24d0 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
@@ -408,14 +408,6 @@ public class TestSnapshotManager extends TableTestBase {
         table.ops().refresh().ref("branch1").snapshotId(), 
secondSnapshot.snapshotId());
   }
 
-  @Test
-  public void testReplaceBranchNonExistingBranchToUpdateFails() {
-    Assertions.assertThatThrownBy(
-            () -> table.manageSnapshots().replaceBranch("non-existing", 
"other-branch").commit())
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Branch to update does not exist: non-existing");
-  }
-
   @Test
   public void testReplaceBranchNonExistingToBranchFails() {
     table.newAppend().appendFile(FILE_A).commit();
@@ -428,12 +420,27 @@ public class TestSnapshotManager extends TableTestBase {
   }
 
   @Test
-  public void testFastForwardBranchNonExistingFromBranchFails() {
-    Assertions.assertThatThrownBy(
-            () ->
-                table.manageSnapshots().fastForwardBranch("non-existing", 
"other-branch").commit())
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Branch to update does not exist: non-existing");
+  public void testFastForwardBranchNonExistingFromBranchCreatesTheBranch() {
+    table.newAppend().appendFile(FILE_A).commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createBranch("branch1", snapshotId).commit();
+    table.manageSnapshots().fastForwardBranch("new-branch", 
"branch1").commit();
+
+    
Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
+    Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
+        .isEqualTo(snapshotId);
+  }
+
+  @Test
+  public void testReplaceBranchNonExistingFromBranchCreatesTheBranch() {
+    table.newAppend().appendFile(FILE_A).commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createBranch("branch1", snapshotId).commit();
+    table.manageSnapshots().replaceBranch("new-branch", "branch1").commit();
+
+    
Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
+    Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
+        .isEqualTo(snapshotId);
   }
 
   @Test
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
index 99bc862485..0c99c3e07f 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -190,21 +190,8 @@ public class TestFastForwardBranchProcedure extends 
SparkExtensionsTestBase {
   }
 
   @Test
-  public void testFastForwardNonExistingBranchCases() {
+  public void testFastForwardNonExistingToRefFails() {
     sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", 
tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-    table.refresh();
-
-    assertThatThrownBy(
-            () ->
-                sql(
-                    "CALL %s.system.fast_forward(table => '%s', branch => 
'%s', to => '%s')",
-                    catalogName, tableIdent, "non_existing_branch", 
SnapshotRef.MAIN_BRANCH))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Branch to fast-forward does not exist: 
non_existing_branch");
-
-    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
     assertThatThrownBy(
             () ->
                 sql(
@@ -237,14 +224,37 @@ public class TestFastForwardBranchProcedure extends 
SparkExtensionsTestBase {
     sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableNameWithBranch2);
     table.refresh();
     Snapshot branch2Snapshot = table.snapshot(branch2);
+    assertThat(
+            sql(
+                "CALL %s.system.fast_forward('%s', '%s', '%s')",
+                catalogName, tableIdent, branch1, branch2))
+        .containsExactly(row(branch1, branch1Snapshot.snapshotId(), 
branch2Snapshot.snapshotId()));
+  }
 
-    List<Object[]> output =
-        sql(
-            "CALL %s.system.fast_forward('%s', '%s', '%s')",
-            catalogName, tableIdent, branch1, branch2);
-    List<Object> outputRow = 
Arrays.stream(output.get(0)).collect(Collectors.toList());
-    assertThat(outputRow.get(0)).isEqualTo(branch1);
-    assertThat(outputRow.get(1)).isEqualTo(branch1Snapshot.snapshotId());
-    assertThat(outputRow.get(2)).isEqualTo(branch2Snapshot.snapshotId());
+  @Test
+  public void testFastForwardNonExistingFromMainCreatesBranch() {
+    sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", 
tableName);
+    String branch1 = "branch1";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch1);
+    String branchIdentifier = String.format("%s.branch_%s", tableName, 
branch1);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", branchIdentifier);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", branchIdentifier);
+    Table table = validationCatalog.loadTable(tableIdent);
+    table.refresh();
+    Snapshot branch1Snapshot = table.snapshot(branch1);
+
+    assertThat(
+            sql(
+                "CALL %s.system.fast_forward('%s', '%s', '%s')",
+                catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, branch1))
+        .containsExactly(row(SnapshotRef.MAIN_BRANCH, null, 
branch1Snapshot.snapshotId()));
+
+    // Ensure the same behavior for non-main branches
+    String branch2 = "branch2";
+    assertThat(
+            sql(
+                "CALL %s.system.fast_forward('%s', '%s', '%s')",
+                catalogName, tableIdent, branch2, branch1))
+        .containsExactly(row(branch2, null, branch1Snapshot.snapshotId()));
   }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
index 83908f284b..11ea5d44c9 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg.spark.procedures;
 
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -73,19 +71,18 @@ public class FastForwardBranchProcedure extends 
BaseProcedure {
   @Override
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    String source = args.getString(1);
-    String target = args.getString(2);
+    String from = args.getString(1);
+    String to = args.getString(2);
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Snapshot currentSnapshot = table.snapshot(source);
-          Preconditions.checkArgument(
-              currentSnapshot != null, "Branch to fast-forward does not exist: 
%s", source);
-          table.manageSnapshots().fastForwardBranch(source, target).commit();
-          long latest = table.snapshot(source).snapshotId();
+          Long snapshotBefore =
+              table.snapshot(from) != null ? table.snapshot(from).snapshotId() 
: null;
+          table.manageSnapshots().fastForwardBranch(from, to).commit();
+          long snapshotAfter = table.snapshot(from).snapshotId();
           InternalRow outputRow =
-              newInternalRow(UTF8String.fromString(source), 
currentSnapshot.snapshotId(), latest);
+              newInternalRow(UTF8String.fromString(from), snapshotBefore, 
snapshotAfter);
           return new InternalRow[] {outputRow};
         });
   }

Reply via email to