szehon-ho commented on code in PR #7326:
URL: https://github.com/apache/iceberg/pull/7326#discussion_r1246948452
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java:
##########
@@ -48,14 +50,16 @@
*/
class RemoveCarryoverIterator extends ChangelogIterator {
private final int[] indicesToIdentifySameRow;
+ private final StructType rowType;
private Row cachedDeletedRow = null;
private long deletedRowCount = 0;
private Row cachedNextRecord = null;
RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
super(rowIterator, rowType);
- this.indicesToIdentifySameRow =
generateIndicesToIdentifySameRow(rowType.size());
+ this.rowType = rowType;
Review Comment:
Nit: if we pass the variable to the super class , can we just store it in
super class and get it via super method? (I think cleaner if subclass has only
fields that it only knows about).
Also looks like both subclass do this.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java:
##########
@@ -88,7 +92,7 @@ public Row next() {
}
// If the current row is a delete row, drain all identical delete rows
- if (currentRow.getString(changeTypeIndex()).equals(DELETE) &&
rowIterator().hasNext()) {
+ if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Review Comment:
Same comment to change the order (I guess its from a previous change, but
while we are here)
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It is
different from {@link
+ * RemoveCarryoverIterator}, which only removes carry-over rows within a
single snapshot. It takes a
+ * row iterator, and assumes the following:
+ *
+ * <ul>
+ * <li>The row iterator is partitioned by all columns.
+ * <li>The row iterator is sorted by all columns, change order, and change
type. The change order
+ * is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends ChangelogIterator {
+
+ private final int[] indicesToIdentifySameRow;
+ private final StructType rowType;
+
+ private Row cachedNextRow = null;
Review Comment:
Nit: rely on java default
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java:
##########
@@ -81,15 +81,13 @@ public Row next() {
// either a cached record which is not an UPDATE or the next record in the
iterator.
Row currentRow = currentRow();
- if (currentRow.getString(changeTypeIndex()).equals(DELETE) &&
rowIterator().hasNext()) {
+ if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Review Comment:
Nit, can we reverse the order of equals, to reduce chance of NPE? (and for
the other places in the method)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]