szehon-ho commented on code in PR #7326:
URL: https://github.com/apache/iceberg/pull/7326#discussion_r1232504121
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -411,6 +413,69 @@ public void testRemoveCarryOversWithoutUpdatedRows() {
sql("select * from %s order by _change_ordinal, id, data", viewName));
}
+ @Test
+ public void testNetChangesWithRemoveCarryOvers() {
+ // partitioned by id
+ createTableWith3Columns();
Review Comment:
I think its not for this pr, but probably better to call it 'ThreeColumns'
as its easier to read and more consistent with other test class/method in
Iceberg:
> In scientific and technical writing, the prevailing style is to write out
numbers under ten.
Ref: https://www.grammarly.com/blog/when-to-spell-out-numbers/
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow = null;
+ private Row cachedRow = null;
+ private long cachedRowCount = 0;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ Row currentRow = getCurrentRow();
+
+ // return it directly if the current row is the last row
+ if (!rowIterator().hasNext()) {
+ return currentRow;
+ }
+
+ Row nextRow = rowIterator().next();
Review Comment:
Not 100% sure, but instead of 'cachedNextRow', can we just have 'nextRow' as
the member variable.
Then 'this.nextRow = rowIterator.next()'?
I think 'cachedNextRow' is used only in getCurrentRow() which is above this
code. So henceforth should be the same?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -228,22 +247,41 @@ private Dataset<Row> applyChangelogIterator(Dataset<Row>
df, Column[] repartitio
RowEncoder.apply(schema));
}
- private Dataset<Row> applyCarryoverRemoveIterator(Dataset<Row> df, Column[]
repartitionSpec) {
- Column[] sortSpec = sortSpec(df, repartitionSpec);
+ private Dataset<Row> applyCarryoverRemoveIterator(
+ Dataset<Row> df, Column[] repartitionSpec, boolean netChanges) {
+ Column[] sortSpec = sortSpec(df, repartitionSpec, netChanges);
StructType schema = df.schema();
return df.repartition(repartitionSpec)
.sortWithinPartitions(sortSpec)
.mapPartitions(
(MapPartitionsFunction<Row, Row>)
- rowIterator -> ChangelogIterator.removeCarryovers(rowIterator,
schema),
+ rowIterator ->
+ netChanges
+ ? ChangelogIterator.removeNetCarryovers(rowIterator,
schema)
+ : ChangelogIterator.removeCarryovers(rowIterator,
schema),
RowEncoder.apply(schema));
}
- private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
- Column[] sortSpec = new Column[repartitionSpec.length + 1];
- System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
+ private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec,
boolean netChanges) {
+ Column[] sortSpec;
+
+ if (netChanges) {
+ Column changeOrdinal = df.col(MetadataColumns.CHANGE_ORDINAL.name());
+ Preconditions.checkState(
Review Comment:
Is this necessary? Just asking as we dont do it for CHANGE_TYPE
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -411,6 +413,69 @@ public void testRemoveCarryOversWithoutUpdatedRows() {
sql("select * from %s order by _change_ordinal, id, data", viewName));
}
+ @Test
+ public void testNetChangesWithRemoveCarryOvers() {
+ // partitioned by id
+ createTableWith3Columns();
+
+ // insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12)
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ // delete rows: (2, 'b', 11) (2, 'e', 12), insert rows: (3, 'c', 13) (2,
'd', 11) (2, 'e', 12)
Review Comment:
Minor: do you think its easier if the insert is in a new line?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##########
@@ -79,6 +79,11 @@ public static Iterator<Row> removeCarryovers(Iterator<Row>
rowIterator, StructTy
return Iterators.filter(changelogIterator, Objects::nonNull);
}
+ public static Iterator<Row> removeNetCarryovers(Iterator<Row> rowIterator,
StructType rowType) {
Review Comment:
Minor nit: should we make it 'removeNetCarryover' to be consistent with the
iterator? If the previous method is released, feel free to ignore.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow = null;
+ private Row cachedRow = null;
+ private long cachedRowCount = 0;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ Row currentRow = getCurrentRow();
+
+ // return it directly if the current row is the last row
+ if (!rowIterator().hasNext()) {
+ return currentRow;
+ }
+
+ Row nextRow = rowIterator().next();
+
+ cachedRow = currentRow;
+ cachedRowCount = 1;
+
+ // pull rows from the iterator until two consecutive rows are different
+ while (isSameRecord(currentRow, nextRow)) {
+ if (oppositeChangeType(currentRow, nextRow)) {
+ // two rows with opposite change types means no net changes
+ cachedRowCount--;
+ nextRow = null;
+ } else {
+ // two rows with same change types means potential net changes
+ nextRow = null;
+ cachedRowCount++;
+ }
+
+ // stop pulling rows if there is no more rows or the next row is
different
+ if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
+ break;
+ }
+
+ nextRow = rowIterator().next();
+ }
+
+ // if they are different rows, hit the boundary, cache the next row
+ cachedNextRow = nextRow;
+ return null;
+ }
+
+ private Row getCurrentRow() {
+ Row currentRow;
+ if (cachedNextRow != null) {
+ currentRow = cachedNextRow;
+ cachedNextRow = null;
+ } else {
+ currentRow = rowIterator().next();
+ }
+ return currentRow;
+ }
+
+ private boolean oppositeChangeType(Row currentRow, Row nextRow) {
+ return (nextRow.getString(changeTypeIndex()).equals(INSERT)
Review Comment:
Minor: how about a method
changeType(Row) {
return row.getString(changeTypeIndex())
}
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow = null;
+ private Row cachedRow = null;
+ private long cachedRowCount = 0;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ Row currentRow = getCurrentRow();
+
+ // return it directly if the current row is the last row
+ if (!rowIterator().hasNext()) {
+ return currentRow;
+ }
+
+ Row nextRow = rowIterator().next();
+
+ cachedRow = currentRow;
+ cachedRowCount = 1;
+
+ // pull rows from the iterator until two consecutive rows are different
+ while (isSameRecord(currentRow, nextRow)) {
+ if (oppositeChangeType(currentRow, nextRow)) {
+ // two rows with opposite change types means no net changes
+ cachedRowCount--;
+ nextRow = null;
+ } else {
+ // two rows with same change types means potential net changes
+ nextRow = null;
Review Comment:
Maybe missing something, but why do we need to nullify nextRow here? And in
both if/else cases?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java:
##########
@@ -151,13 +153,21 @@ private int[] generateIndicesToIdentifySameRow(int
columnSize) {
return indices;
}
- private boolean isSameRecord(Row currentRow, Row nextRow) {
- for (int idx : indicesToIdentifySameRow) {
+ protected boolean isSameRecord(Row currentRow, Row nextRow) {
Review Comment:
Can we move this to base class?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow = null;
+ private Row cachedRow = null;
+ private long cachedRowCount = 0;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ Row currentRow = getCurrentRow();
+
+ // return it directly if the current row is the last row
+ if (!rowIterator().hasNext()) {
+ return currentRow;
+ }
+
+ Row nextRow = rowIterator().next();
+
+ cachedRow = currentRow;
+ cachedRowCount = 1;
+
+ // pull rows from the iterator until two consecutive rows are different
+ while (isSameRecord(currentRow, nextRow)) {
+ if (oppositeChangeType(currentRow, nextRow)) {
+ // two rows with opposite change types means no net changes
+ cachedRowCount--;
+ nextRow = null;
+ } else {
+ // two rows with same change types means potential net changes
+ nextRow = null;
+ cachedRowCount++;
+ }
+
+ // stop pulling rows if there is no more rows or the next row is
different
+ if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
+ break;
+ }
+
+ nextRow = rowIterator().next();
+ }
+
+ // if they are different rows, hit the boundary, cache the next row
+ cachedNextRow = nextRow;
+ return null;
+ }
+
+ private Row getCurrentRow() {
+ Row currentRow;
+ if (cachedNextRow != null) {
+ currentRow = cachedNextRow;
+ cachedNextRow = null;
+ } else {
+ currentRow = rowIterator().next();
+ }
+ return currentRow;
+ }
+
+ private boolean oppositeChangeType(Row currentRow, Row nextRow) {
+ return (nextRow.getString(changeTypeIndex()).equals(INSERT)
+ && currentRow.getString(changeTypeIndex()).equals(DELETE))
+ || (nextRow.getString(changeTypeIndex()).equals(DELETE)
+ && currentRow.getString(changeTypeIndex()).equals(INSERT));
+ }
+
+ private int[] generateIndicesToIdentifySameRow() {
+ int changeOrdinalIndex =
rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name());
Review Comment:
I'm a bit lost why this method is not the same as RemoveCarryoverIterator?
Are 'changeOrdinalIndex' and 'snapshotIdIndex' not in the rows of the other
iterator?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+ private final int[] indicesToIdentifySameRow;
+
+ private Row cachedNextRow = null;
+ private Row cachedRow = null;
+ private long cachedRowCount = 0;
+
+ protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType
rowType) {
+ super(rowIterator, rowType);
+ this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (cachedRowCount > 0) {
+ return true;
+ }
+
+ if (cachedNextRow != null) {
+ return true;
+ }
+
+ return rowIterator().hasNext();
+ }
+
+ @Override
+ public Row next() {
+ // if there are cached rows, return one of them from the beginning
+ if (cachedRowCount > 0) {
+ cachedRowCount--;
+ return cachedRow;
+ }
+
+ Row currentRow = getCurrentRow();
Review Comment:
Same for 'cachedRow'. Can we just have currentRow as member variable, and
do
this.currentRow = getCurrentRow();
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a
row iterator, and
Review Comment:
Thanks for the comment, can we add an explanation why we need this in
addition to RemoveCarryover. Like 'unlike RemoveCarryoverIterator, this
iterator...'
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java:
##########
@@ -151,13 +153,21 @@ private int[] generateIndicesToIdentifySameRow(int
columnSize) {
return indices;
}
- private boolean isSameRecord(Row currentRow, Row nextRow) {
- for (int idx : indicesToIdentifySameRow) {
+ protected boolean isSameRecord(Row currentRow, Row nextRow) {
+ for (int idx : indicesToIdentifySameRow()) {
if (isDifferentValue(currentRow, nextRow, idx)) {
return false;
}
}
return true;
}
+
+ protected StructType rowType() {
Review Comment:
Can we move these up to base class ?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -228,22 +247,41 @@ private Dataset<Row> applyChangelogIterator(Dataset<Row>
df, Column[] repartitio
RowEncoder.apply(schema));
}
- private Dataset<Row> applyCarryoverRemoveIterator(Dataset<Row> df, Column[]
repartitionSpec) {
- Column[] sortSpec = sortSpec(df, repartitionSpec);
+ private Dataset<Row> applyCarryoverRemoveIterator(
+ Dataset<Row> df, Column[] repartitionSpec, boolean netChanges) {
+ Column[] sortSpec = sortSpec(df, repartitionSpec, netChanges);
StructType schema = df.schema();
return df.repartition(repartitionSpec)
.sortWithinPartitions(sortSpec)
.mapPartitions(
(MapPartitionsFunction<Row, Row>)
- rowIterator -> ChangelogIterator.removeCarryovers(rowIterator,
schema),
+ rowIterator ->
+ netChanges
+ ? ChangelogIterator.removeNetCarryovers(rowIterator,
schema)
+ : ChangelogIterator.removeCarryovers(rowIterator,
schema),
RowEncoder.apply(schema));
}
- private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
- Column[] sortSpec = new Column[repartitionSpec.length + 1];
- System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
+ private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec,
boolean netChanges) {
+ Column[] sortSpec;
+
+ if (netChanges) {
Review Comment:
It looks a bit hard to read, what do you think like:
int[] generatedColumns = netChanges ? new Column[] {changeOrdinalCol,
changeTypeCol} : new Column[] { changeTypeCol };
Column[] sortSpec = new Column[repartitionSpec.length +
generatedColumns.length];
System.arrayCopy(repartitionSpec, 0, sortSpec, 0, ...);
System.arrayCopy(generatedColumns, 0, sortSpec, repartitionSpec.length,
repartitionSpec.length + generatedColumns.length);
Maybe can shorten the variable names to make it shorter.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -101,6 +103,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
+ NET_CHANGES,
Review Comment:
Is remove_carryover would be better as a three-way enum: 'keep', 'remove' //
per snapshot, 'remove_net'. Right now there's 3 knobs so 8 combos, vs 6? Or
did I mis-understand?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -179,13 +187,24 @@ private boolean shouldRemoveCarryoverRows(ProcedureInput
input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}
- private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+ private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean
netChanges) {
+ Predicate<String> columnsToRemove;
+ if (netChanges) {
+ columnsToRemove =
+ column ->
Review Comment:
I think @RussellSpitzer means define a set outside of CHANGE_TYPE,
CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -179,13 +187,24 @@ private boolean shouldRemoveCarryoverRows(ProcedureInput
input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}
- private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+ private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean
netChanges) {
+ Predicate<String> columnsToRemove;
+ if (netChanges) {
+ columnsToRemove =
+ column ->
+ column.equals(MetadataColumns.CHANGE_TYPE.name())
+ || column.equals(MetadataColumns.CHANGE_ORDINAL.name())
+ || column.equals(MetadataColumns.COMMIT_SNAPSHOT_ID.name());
+ } else {
+ columnsToRemove = column ->
column.equals(MetadataColumns.CHANGE_TYPE.name());
+ }
+
Column[] repartitionSpec =
Arrays.stream(df.columns())
- .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+ .filter(columnsToRemove.negate())
Review Comment:
Yea if we use set above, we can use notContains?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]