szehon-ho commented on code in PR #7326:
URL: https://github.com/apache/iceberg/pull/7326#discussion_r1239124685


##########
gradle.properties:
##########
@@ -20,7 +20,7 @@ systemProp.defaultFlinkVersions=1.17
 systemProp.knownFlinkVersions=1.15,1.16,1.17
 systemProp.defaultHiveVersions=2
 systemProp.knownHiveVersions=2,3
-systemProp.defaultSparkVersions=3.4
+systemProp.defaultSparkVersions=3.4,3.3

Review Comment:
   Unncessary change



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -101,6 +103,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
         COMPUTE_UPDATES_PARAM,
         REMOVE_CARRYOVERS_PARAM,
         IDENTIFIER_COLUMNS_PARAM,
+        NET_CHANGES,

Review Comment:
   @flyrain yea, so I think my suggestion to make RemoveCarryOverMode be a 
three-way enum, and deprecate RemoveCarryOver boolean.
   
   Currently with three boolean configs, user gets 8 possibilites.   With it as 
a three-way enum (none, net-changes, per-snapshot), user gets 6 possibilites.  
Extra one being compute_update with no remove_carryover, which we can validate 
against.  
   
   wdyt?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a 
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ *   <li>The row iterator is partitioned by all columns.
+ *   <li>The row iterator is sorted by all columns, change order, and change 
type. The change order
+ *       is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+  private final int[] indicesToIdentifySameRow;
+
+  private Row cachedNextRow = null;
+  private Row cachedRow = null;
+  private long cachedRowCount = 0;
+
+  protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType 
rowType) {
+    super(rowIterator, rowType);
+    this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (cachedRowCount > 0) {
+      return true;
+    }
+
+    if (cachedNextRow != null) {
+      return true;
+    }
+
+    return rowIterator().hasNext();
+  }
+
+  @Override
+  public Row next() {
+    // if there are cached rows, return one of them from the beginning
+    if (cachedRowCount > 0) {
+      cachedRowCount--;
+      return cachedRow;
+    }
+
+    Row currentRow = getCurrentRow();
+
+    // return it directly if the current row is the last row
+    if (!rowIterator().hasNext()) {
+      return currentRow;
+    }
+
+    Row nextRow = rowIterator().next();
+
+    cachedRow = currentRow;
+    cachedRowCount = 1;
+
+    // pull rows from the iterator until two consecutive rows are different
+    while (isSameRecord(currentRow, nextRow)) {
+      if (oppositeChangeType(currentRow, nextRow)) {
+        // two rows with opposite change types means no net changes
+        cachedRowCount--;
+        nextRow = null;
+      } else {
+        // two rows with same change types means potential net changes
+        nextRow = null;
+        cachedRowCount++;
+      }
+
+      // stop pulling rows if there is no more rows or the next row is 
different
+      if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
+        break;
+      }
+
+      nextRow = rowIterator().next();
+    }
+
+    // if they are different rows, hit the boundary, cache the next row
+    cachedNextRow = nextRow;
+    return null;
+  }
+
+  private Row getCurrentRow() {
+    Row currentRow;
+    if (cachedNextRow != null) {
+      currentRow = cachedNextRow;
+      cachedNextRow = null;
+    } else {
+      currentRow = rowIterator().next();
+    }
+    return currentRow;
+  }
+
+  private boolean oppositeChangeType(Row currentRow, Row nextRow) {
+    return (nextRow.getString(changeTypeIndex()).equals(INSERT)
+            && currentRow.getString(changeTypeIndex()).equals(DELETE))
+        || (nextRow.getString(changeTypeIndex()).equals(DELETE)
+            && currentRow.getString(changeTypeIndex()).equals(INSERT));
+  }
+
+  private int[] generateIndicesToIdentifySameRow() {
+    int changeOrdinalIndex = 
rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name());

Review Comment:
   Cant we share the code by doing the same thing and making a set to identify 
metadata column index?
   ```
     private int[] generateIndicesToIdentifySameRow() {
       Set<Integer> metadataColumnIndices = Sets.newHashSet(
           rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()),
           rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()),
           changeTypeIndex());
       return generateIndicesToIdentifySameRow(metadataColumnIndices);
     }
   
     private int[] generateIndicesToIdentifySameRow(Set<Integer> 
metadataColumnIndices) {
       int[] indices = new int[rowType().size() - metadataColumnIndices.size()];
   
       for (int i = 0, j = 0; i < indices.length; i++) {
         if (!metadataColumnIndices.contains(i)) {
           indices[j] = i;
           j++;
         }
       }
       return indices;
     }
   ```
   
   From RemoveCarryoverIterator, the set will be only changeTypeIndex?  Let me 
know if I miss something.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a 
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ *   <li>The row iterator is partitioned by all columns.
+ *   <li>The row iterator is sorted by all columns, change order, and change 
type. The change order
+ *       is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+  private final int[] indicesToIdentifySameRow;
+
+  private Row cachedNextRow = null;
+  private Row cachedRow = null;
+  private long cachedRowCount = 0;
+
+  protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType 
rowType) {
+    super(rowIterator, rowType);
+    this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (cachedRowCount > 0) {
+      return true;
+    }
+
+    if (cachedNextRow != null) {
+      return true;
+    }
+
+    return rowIterator().hasNext();
+  }
+
+  @Override
+  public Row next() {
+    // if there are cached rows, return one of them from the beginning
+    if (cachedRowCount > 0) {
+      cachedRowCount--;
+      return cachedRow;
+    }
+
+    Row currentRow = getCurrentRow();
+
+    // return it directly if the current row is the last row
+    if (!rowIterator().hasNext()) {
+      return currentRow;
+    }
+
+    Row nextRow = rowIterator().next();

Review Comment:
   Maybe not clear:
   
   ```
   
     @Override
     public Row next() {
       // if there are cached rows, return one of them from the beginning
       if (cachedRowCount > 0) {
         cachedRowCount--;
         return cachedRow;
       }
   
       this.cachedRow = getCurrentRow();
   
       // return it directly if the current row is the last row
       if (!rowIterator().hasNext()) {
         return cachedRow;
       }
   
       this.cachedNextRow = rowIterator().next();
       cachedRowCount = 1;
   
       // pull rows from the iterator until two consecutive rows are different
       while (isSameRecord(cachedRow, cachedNextRow)) {
         if (oppositeChangeType(cachedRow, cachedNextRow)) {
           // two rows with opposite change types means no net changes, remove 
both
           cachedRowCount--;
         } else {
           // two rows with same change types means potential net changes, 
cache the next row, reset it
           // to null
           cachedRowCount++;
         }
   
         // stop pulling rows if there is no more rows or the next row is 
different
         if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
           this.cachedNextRow = null;
           break;
         }
   
         this.cachedNextRow = rowIterator().next();
       }
   
       return null;
     }
   ```
   
   I think it will work to remove 'currentRow'.  But I am not sure if I am too 
aggressive for removing 'nextRow'.  Please check if I made a mistake.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Iterator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This class computes the net changes across multiple snapshots. It takes a 
row iterator, and
+ * assumes the following:
+ *
+ * <ul>
+ *   <li>The row iterator is partitioned by all columns.
+ *   <li>The row iterator is sorted by all columns, change order, and change 
type. The change order
+ *       is 1-to-1 mapping to snapshot id.
+ * </ul>
+ */
+public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
+
+  private final int[] indicesToIdentifySameRow;
+
+  private Row cachedNextRow = null;
+  private Row cachedRow = null;
+  private long cachedRowCount = 0;
+
+  protected RemoveNetCarryoverIterator(Iterator<Row> rowIterator, StructType 
rowType) {
+    super(rowIterator, rowType);
+    this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (cachedRowCount > 0) {
+      return true;
+    }
+
+    if (cachedNextRow != null) {
+      return true;
+    }
+
+    return rowIterator().hasNext();
+  }
+
+  @Override
+  public Row next() {
+    // if there are cached rows, return one of them from the beginning
+    if (cachedRowCount > 0) {
+      cachedRowCount--;
+      return cachedRow;
+    }
+
+    Row currentRow = getCurrentRow();
+
+    // return it directly if the current row is the last row
+    if (!rowIterator().hasNext()) {
+      return currentRow;
+    }
+
+    Row nextRow = rowIterator().next();
+
+    cachedRow = currentRow;
+    cachedRowCount = 1;
+
+    // pull rows from the iterator until two consecutive rows are different
+    while (isSameRecord(currentRow, nextRow)) {
+      if (oppositeChangeType(currentRow, nextRow)) {
+        // two rows with opposite change types means no net changes
+        cachedRowCount--;
+        nextRow = null;
+      } else {
+        // two rows with same change types means potential net changes
+        nextRow = null;

Review Comment:
   I mean, let's move it out of the if/else.  Even Intellij suggests 'Common 
part can be extracted from 'if' 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/RemoveCarryoverIterator.java:
##########
@@ -151,13 +153,21 @@ private int[] generateIndicesToIdentifySameRow(int 
columnSize) {
     return indices;
   }
 
-  private boolean isSameRecord(Row currentRow, Row nextRow) {
-    for (int idx : indicesToIdentifySameRow) {
+  protected boolean isSameRecord(Row currentRow, Row nextRow) {

Review Comment:
   Code-wise, I am thinking that it is a bit harder to read to have 
RemoveNetCarryoverIterator to extend RemoveCarryoverIterator.  So was thinking 
we can move have an extra base class, then its easier to see what is the 
different methods and what is same, not sure what you think.  Now its a bit 
trickier to see that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to