This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 1e3b38eb73 Spark: backport #8656 and update docs (#9512)
1e3b38eb73 is described below
commit 1e3b38eb73804e7901e2382c6fdb0dd3d3a77eda
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jan 19 12:32:01 2024 +0530
Spark: backport #8656 and update docs (#9512)
---
docs/spark-procedures.md | 1 -
.../TestCreateChangelogViewProcedure.java | 72 ----------------------
.../procedures/CreateChangelogViewProcedure.java | 23 +------
.../TestCreateChangelogViewProcedure.java | 72 ----------------------
.../procedures/CreateChangelogViewProcedure.java | 23 +------
.../procedures/CreateChangelogViewProcedure.java | 4 +-
6 files changed, 8 insertions(+), 187 deletions(-)
diff --git a/docs/spark-procedures.md b/docs/spark-procedures.md
index 45a9f80ea6..ee25de8f33 100644
--- a/docs/spark-procedures.md
+++ b/docs/spark-procedures.md
@@ -770,7 +770,6 @@ Creates a view that contains the changes from a given table.
| `net_changes` | | boolean | Whether to output
net changes (see below for more information). Defaults to false.
|
| `compute_updates` | | boolean | Whether to compute
pre/post update images (see below for more information). Defaults to false.
|
| `identifier_columns` | | array<string> | The list of
identifier columns to compute updates. If the argument `compute_updates` is set
to true and `identifier_columns` are not provided, the table’s current
identifier fields will be used. |
-| `remove_carryovers` | | boolean | Whether to remove
carry-over rows (see below for more information). Defaults to true. Deprecated
since 1.4.0, will be removed in 1.5.0; Please query `SparkChangelogTable` to
view carry-over rows. |
Here is a list of commonly used Spark read options:
* `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it
reads from the table’s first snapshot inclusively.
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index 015f2ad5fd..9aa4bd3d7c 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -186,41 +186,6 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
sql("select * from %s order by _change_ordinal, id",
returns.get(0)[0]));
}
- @Test
- public void testWithCarryovers() {
- createTableWithTwoColumns();
- sql("INSERT INTO %s VALUES (1, 'a')", tableName);
- Table table = validationCatalog.loadTable(tableIdent);
- Snapshot snap0 = table.currentSnapshot();
-
- sql("INSERT INTO %s VALUES (2, 'b')", tableName);
- table.refresh();
- Snapshot snap1 = table.currentSnapshot();
-
- sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
- table.refresh();
- Snapshot snap2 = table.currentSnapshot();
-
- List<Object[]> returns =
- sql(
- "CALL %s.system.create_changelog_view("
- + "remove_carryovers => false,"
- + "table => '%s')",
- catalogName, tableName, "cdc_view");
-
- String viewName = (String) returns.get(0)[0];
- assertEquals(
- "Rows should match",
- ImmutableList.of(
- row(1, "a", INSERT, 0, snap0.snapshotId()),
- row(2, "b", INSERT, 1, snap1.snapshotId()),
- row(-2, "b", INSERT, 2, snap2.snapshotId()),
- row(2, "b", DELETE, 2, snap2.snapshotId()),
- row(2, "b", INSERT, 2, snap2.snapshotId()),
- row(2, "b", INSERT, 2, snap2.snapshotId())),
- sql("select * from %s order by _change_ordinal, id, _change_type",
viewName));
- }
-
@Test
public void testUpdate() {
createTableWithTwoColumns();
@@ -474,41 +439,4 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
"CALL %s.system.create_changelog_view(table => '%s',
identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}
-
- @Test
- public void testNotRemoveCarryOvers() {
- createTableWithThreeColumns();
-
- sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
- Table table = validationCatalog.loadTable(tableIdent);
- Snapshot snap1 = table.currentSnapshot();
-
- // carry-over row (2, 'e', 12)
- sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
- table.refresh();
- Snapshot snap2 = table.currentSnapshot();
-
- List<Object[]> returns =
- sql(
- "CALL %s.system.create_changelog_view("
- + "remove_carryovers => false,"
- + "table => '%s')",
- catalogName, tableName);
-
- String viewName = (String) returns.get(0)[0];
-
- assertEquals(
- "Rows should match",
- ImmutableList.of(
- row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
- row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
- row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
- row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
- row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
- // the following two rows are carry-over rows
- row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
- row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
- row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
- sql("select * from %s order by _change_ordinal, id, data,
_change_type", viewName));
- }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 259254aa2d..44ec8e7193 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
/**
* A procedure that creates a view for changed rows.
*
- * <p>The procedure removes the carry-over rows by default. If you want to
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link
SparkChangelogTable}
+ * instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If
you want to compute
* them, you can set "compute_updates" to be true in the options.
@@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
-
- /**
- * Enable or disable the remove carry-over rows.
- *
- * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will
always remove carry-over
- * rows. Please query {@link SparkChangelogTable} instead for the use
cases doesn't remove
- * carry-over rows.
- */
- @Deprecated
- private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
- ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
-
private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
@@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
- REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
@@ -163,7 +150,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with
update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
- } else if (shouldRemoveCarryoverRows(input)) {
+ } else {
df = removeCarryoverRows(df, netChanges);
}
@@ -195,10 +182,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}
- private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
- return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
- }
-
private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean
netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
index 015f2ad5fd..9aa4bd3d7c 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -186,41 +186,6 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
sql("select * from %s order by _change_ordinal, id",
returns.get(0)[0]));
}
- @Test
- public void testWithCarryovers() {
- createTableWithTwoColumns();
- sql("INSERT INTO %s VALUES (1, 'a')", tableName);
- Table table = validationCatalog.loadTable(tableIdent);
- Snapshot snap0 = table.currentSnapshot();
-
- sql("INSERT INTO %s VALUES (2, 'b')", tableName);
- table.refresh();
- Snapshot snap1 = table.currentSnapshot();
-
- sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
- table.refresh();
- Snapshot snap2 = table.currentSnapshot();
-
- List<Object[]> returns =
- sql(
- "CALL %s.system.create_changelog_view("
- + "remove_carryovers => false,"
- + "table => '%s')",
- catalogName, tableName, "cdc_view");
-
- String viewName = (String) returns.get(0)[0];
- assertEquals(
- "Rows should match",
- ImmutableList.of(
- row(1, "a", INSERT, 0, snap0.snapshotId()),
- row(2, "b", INSERT, 1, snap1.snapshotId()),
- row(-2, "b", INSERT, 2, snap2.snapshotId()),
- row(2, "b", DELETE, 2, snap2.snapshotId()),
- row(2, "b", INSERT, 2, snap2.snapshotId()),
- row(2, "b", INSERT, 2, snap2.snapshotId())),
- sql("select * from %s order by _change_ordinal, id, _change_type",
viewName));
- }
-
@Test
public void testUpdate() {
createTableWithTwoColumns();
@@ -474,41 +439,4 @@ public class TestCreateChangelogViewProcedure extends
SparkExtensionsTestBase {
"CALL %s.system.create_changelog_view(table => '%s',
identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}
-
- @Test
- public void testNotRemoveCarryOvers() {
- createTableWithThreeColumns();
-
- sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
- Table table = validationCatalog.loadTable(tableIdent);
- Snapshot snap1 = table.currentSnapshot();
-
- // carry-over row (2, 'e', 12)
- sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
- table.refresh();
- Snapshot snap2 = table.currentSnapshot();
-
- List<Object[]> returns =
- sql(
- "CALL %s.system.create_changelog_view("
- + "remove_carryovers => false,"
- + "table => '%s')",
- catalogName, tableName);
-
- String viewName = (String) returns.get(0)[0];
-
- assertEquals(
- "Rows should match",
- ImmutableList.of(
- row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
- row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
- row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
- row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
- row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
- // the following two rows are carry-over rows
- row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
- row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
- row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
- sql("select * from %s order by _change_ordinal, id, data,
_change_type", viewName));
- }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index 259254aa2d..44ec8e7193 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
/**
* A procedure that creates a view for changed rows.
*
- * <p>The procedure removes the carry-over rows by default. If you want to
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link
SparkChangelogTable}
+ * instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If
you want to compute
* them, you can set "compute_updates" to be true in the options.
@@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);
-
- /**
- * Enable or disable the remove carry-over rows.
- *
- * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will
always remove carry-over
- * rows. Please query {@link SparkChangelogTable} instead for the use
cases doesn't remove
- * carry-over rows.
- */
- @Deprecated
- private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
- ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);
-
private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
@@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
- REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
@@ -163,7 +150,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with
update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
- } else if (shouldRemoveCarryoverRows(input)) {
+ } else {
df = removeCarryoverRows(df, netChanges);
}
@@ -195,10 +182,6 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}
- private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
- return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
- }
-
private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean
netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index d46e3cab0b..b4594d91c0 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -49,8 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String;
/**
* A procedure that creates a view for changed rows.
*
- * <p>The procedure removes the carry-over rows by default. If you want to
keep them, you can set
- * "remove_carryovers" to be false in the options.
+ * <p>The procedure always removes the carry-over rows. Please query {@link
SparkChangelogTable}
+ * instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If
you want to compute
* them, you can set "compute_updates" to be true in the options.