This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e3b38eb73 Spark: backport #8656 and update docs (#9512)
1e3b38eb73 is described below

commit 1e3b38eb73804e7901e2382c6fdb0dd3d3a77eda
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jan 19 12:32:01 2024 +0530

    Spark: backport #8656 and update docs (#9512)
---
 docs/spark-procedures.md                           |  1 -
 .../TestCreateChangelogViewProcedure.java          | 72 ----------------------
 .../procedures/CreateChangelogViewProcedure.java   | 23 +------
 .../TestCreateChangelogViewProcedure.java          | 72 ----------------------
 .../procedures/CreateChangelogViewProcedure.java   | 23 +------
 .../procedures/CreateChangelogViewProcedure.java   |  4 +-
 6 files changed, 8 insertions(+), 187 deletions(-)

diff --git a/docs/spark-procedures.md b/docs/spark-procedures.md
index 45a9f80ea6..ee25de8f33 100644
--- a/docs/spark-procedures.md
+++ b/docs/spark-procedures.md
@@ -770,7 +770,6 @@ Creates a view that contains the changes from a given table.
 | `net_changes`        |           | boolean             | Whether to output 
net changes (see below for more information). Defaults to false.                
                                                                                
                   |
 | `compute_updates`    |           | boolean             | Whether to compute 
pre/post update images (see below for more information). Defaults to false.     
                                                                                
                  | 
 | `identifier_columns` |           | array<string>       | The list of 
identifier columns to compute updates. If the argument `compute_updates` is set 
to true and `identifier_columns` are not provided, the table’s current 
identifier fields will be used.   |
-| `remove_carryovers`  |           | boolean             | Whether to remove 
carry-over rows (see below for more information). Defaults to true. Deprecated 
since 1.4.0, will be removed in 1.5.0;  Please query `SparkChangelogTable` to 
view carry-over rows. |
 
 Here is a list of commonly used Spark read options:
 * `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it 
reads from the table’s first snapshot inclusively. 
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index 015f2ad5fd..9aa4bd3d7c 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -186,41 +186,6 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
         sql("select * from %s order by _change_ordinal, id", 
returns.get(0)[0]));
   }
 
-  @Test
-  public void testWithCarryovers() {
-    createTableWithTwoColumns();
-    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
-    Table table = validationCatalog.loadTable(tableIdent);
-    Snapshot snap0 = table.currentSnapshot();
-
-    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
-    table.refresh();
-    Snapshot snap1 = table.currentSnapshot();
-
-    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
-    table.refresh();
-    Snapshot snap2 = table.currentSnapshot();
-
-    List<Object[]> returns =
-        sql(
-            "CALL %s.system.create_changelog_view("
-                + "remove_carryovers => false,"
-                + "table => '%s')",
-            catalogName, tableName, "cdc_view");
-
-    String viewName = (String) returns.get(0)[0];
-    assertEquals(
-        "Rows should match",
-        ImmutableList.of(
-            row(1, "a", INSERT, 0, snap0.snapshotId()),
-            row(2, "b", INSERT, 1, snap1.snapshotId()),
-            row(-2, "b", INSERT, 2, snap2.snapshotId()),
-            row(2, "b", DELETE, 2, snap2.snapshotId()),
-            row(2, "b", INSERT, 2, snap2.snapshotId()),
-            row(2, "b", INSERT, 2, snap2.snapshotId())),
-        sql("select * from %s order by _change_ordinal, id, _change_type", 
viewName));
-  }
-
   @Test
   public void testUpdate() {
     createTableWithTwoColumns();
@@ -474,41 +439,4 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
                 "CALL %s.system.create_changelog_view(table => '%s', 
identifier_columns => array('id'), net_changes => true)",
                 catalogName, tableName));
   }
-
-  @Test
-  public void testNotRemoveCarryOvers() {
-    createTableWithThreeColumns();
-
-    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
-    Table table = validationCatalog.loadTable(tableIdent);
-    Snapshot snap1 = table.currentSnapshot();
-
-    // carry-over row (2, 'e', 12)
-    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
-    table.refresh();
-    Snapshot snap2 = table.currentSnapshot();
-
-    List<Object[]> returns =
-        sql(
-            "CALL %s.system.create_changelog_view("
-                + "remove_carryovers => false,"
-                + "table => '%s')",
-            catalogName, tableName);
-
-    String viewName = (String) returns.get(0)[0];
-
-    assertEquals(
-        "Rows should match",
-        ImmutableList.of(
-            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
-            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
-            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
-            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
-            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
-            // the following two rows are carry-over rows
-            row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
-            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
-            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
-        sql("select * from %s order by _change_ordinal, id, data, 
_change_type", viewName));
-  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 259254aa2d..44ec8e7193 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * A procedure that creates a view for changed rows.
  *
- * <p>The procedure removes the carry-over rows by default. If you want to 
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link 
SparkChangelogTable}
+ * instead when carry-over rows are required.
  *
  * <p>The procedure doesn't compute the pre/post update images by default. If 
you want to compute
  * them, you can set "compute_updates" to be true in the options.
@@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
       ProcedureParameter.optional("options", STRING_MAP);
   private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
       ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
-
-  /**
-   * Enable or disable the remove carry-over rows.
-   *
-   * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will 
always remove carry-over
-   *     rows. Please query {@link SparkChangelogTable} instead for the use 
cases doesn't remove
-   *     carry-over rows.
-   */
-  @Deprecated
-  private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
-      ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
-
   private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
       ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
   private static final ProcedureParameter NET_CHANGES =
@@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
         CHANGELOG_VIEW_PARAM,
         OPTIONS_PARAM,
         COMPUTE_UPDATES_PARAM,
-        REMOVE_CARRYOVERS_PARAM,
         IDENTIFIER_COLUMNS_PARAM,
         NET_CHANGES,
       };
@@ -163,7 +150,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     if (shouldComputeUpdateImages(input)) {
       Preconditions.checkArgument(!netChanges, "Not support net changes with 
update images");
       df = computeUpdateImages(identifierColumns(input, tableIdent), df);
-    } else if (shouldRemoveCarryoverRows(input)) {
+    } else {
       df = removeCarryoverRows(df, netChanges);
     }
 
@@ -195,10 +182,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
   }
 
-  private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
-    return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
-  }
-
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean 
netChanges) {
     Predicate<String> columnsToKeep;
     if (netChanges) {
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index 015f2ad5fd..9aa4bd3d7c 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -186,41 +186,6 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
         sql("select * from %s order by _change_ordinal, id", 
returns.get(0)[0]));
   }
 
-  @Test
-  public void testWithCarryovers() {
-    createTableWithTwoColumns();
-    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
-    Table table = validationCatalog.loadTable(tableIdent);
-    Snapshot snap0 = table.currentSnapshot();
-
-    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
-    table.refresh();
-    Snapshot snap1 = table.currentSnapshot();
-
-    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
-    table.refresh();
-    Snapshot snap2 = table.currentSnapshot();
-
-    List<Object[]> returns =
-        sql(
-            "CALL %s.system.create_changelog_view("
-                + "remove_carryovers => false,"
-                + "table => '%s')",
-            catalogName, tableName, "cdc_view");
-
-    String viewName = (String) returns.get(0)[0];
-    assertEquals(
-        "Rows should match",
-        ImmutableList.of(
-            row(1, "a", INSERT, 0, snap0.snapshotId()),
-            row(2, "b", INSERT, 1, snap1.snapshotId()),
-            row(-2, "b", INSERT, 2, snap2.snapshotId()),
-            row(2, "b", DELETE, 2, snap2.snapshotId()),
-            row(2, "b", INSERT, 2, snap2.snapshotId()),
-            row(2, "b", INSERT, 2, snap2.snapshotId())),
-        sql("select * from %s order by _change_ordinal, id, _change_type", 
viewName));
-  }
-
   @Test
   public void testUpdate() {
     createTableWithTwoColumns();
@@ -474,41 +439,4 @@ public class TestCreateChangelogViewProcedure extends 
SparkExtensionsTestBase {
                 "CALL %s.system.create_changelog_view(table => '%s', 
identifier_columns => array('id'), net_changes => true)",
                 catalogName, tableName));
   }
-
-  @Test
-  public void testNotRemoveCarryOvers() {
-    createTableWithThreeColumns();
-
-    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
-    Table table = validationCatalog.loadTable(tableIdent);
-    Snapshot snap1 = table.currentSnapshot();
-
-    // carry-over row (2, 'e', 12)
-    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
-    table.refresh();
-    Snapshot snap2 = table.currentSnapshot();
-
-    List<Object[]> returns =
-        sql(
-            "CALL %s.system.create_changelog_view("
-                + "remove_carryovers => false,"
-                + "table => '%s')",
-            catalogName, tableName);
-
-    String viewName = (String) returns.get(0)[0];
-
-    assertEquals(
-        "Rows should match",
-        ImmutableList.of(
-            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
-            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
-            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
-            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
-            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
-            // the following two rows are carry-over rows
-            row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
-            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
-            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
-        sql("select * from %s order by _change_ordinal, id, data, 
_change_type", viewName));
-  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 259254aa2d..44ec8e7193 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * A procedure that creates a view for changed rows.
  *
- * <p>The procedure removes the carry-over rows by default. If you want to 
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link 
SparkChangelogTable}
+ * instead when carry-over rows are required.
  *
  * <p>The procedure doesn't compute the pre/post update images by default. If 
you want to compute
  * them, you can set "compute_updates" to be true in the options.
@@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
       ProcedureParameter.optional("options", STRING_MAP);
   private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
       ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
-
-  /**
-   * Enable or disable the remove carry-over rows.
-   *
-   * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will 
always remove carry-over
-   *     rows. Please query {@link SparkChangelogTable} instead for the use 
cases doesn't remove
-   *     carry-over rows.
-   */
-  @Deprecated
-  private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
-      ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
-
   private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
       ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
   private static final ProcedureParameter NET_CHANGES =
@@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
         CHANGELOG_VIEW_PARAM,
         OPTIONS_PARAM,
         COMPUTE_UPDATES_PARAM,
-        REMOVE_CARRYOVERS_PARAM,
         IDENTIFIER_COLUMNS_PARAM,
         NET_CHANGES,
       };
@@ -163,7 +150,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     if (shouldComputeUpdateImages(input)) {
       Preconditions.checkArgument(!netChanges, "Not support net changes with 
update images");
       df = computeUpdateImages(identifierColumns(input, tableIdent), df);
-    } else if (shouldRemoveCarryoverRows(input)) {
+    } else {
       df = removeCarryoverRows(df, netChanges);
     }
 
@@ -195,10 +182,6 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
   }
 
-  private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
-    return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
-  }
-
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean 
netChanges) {
     Predicate<String> columnsToKeep;
     if (netChanges) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index d46e3cab0b..b4594d91c0 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * A procedure that creates a view for changed rows.
  *
- * <p>The procedure removes the carry-over rows by default. If you want to 
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link 
SparkChangelogTable}
+ * instead when carry-over rows are required.
  *
  * <p>The procedure doesn't compute the pre/post update images by default. If 
you want to compute
  * them, you can set "compute_updates" to be true in the options.

Reply via email to