This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new aec0b1136e5 HIVE-28843: Iceberg iterators refactor (Denys Kuzmenko, 
reviewed by Ayush Saxena, Sourabh Badhya)
aec0b1136e5 is described below

commit aec0b1136e5bc838587f3896d9dda98d94104591
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Apr 16 16:51:15 2025 +0300

    HIVE-28843: Iceberg iterators refactor (Denys Kuzmenko, reviewed by Ayush 
Saxena, Sourabh Badhya)
    
    Closes #5549
---
 .../iceberg/mr/hive/vector/HiveDeleteFilter.java   | 136 ++++++++++-----------
 .../iceberg/mr/mapreduce/IcebergRecordReader.java  |  23 ++--
 2 files changed, 77 insertions(+), 82 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
index 8ed185e7996..73ea486a339 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.NoSuchElementException;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.iceberg.FileScanTask;
@@ -85,105 +86,98 @@ protected InputFile getInputFile(String location) {
    */
   public CloseableIterable<HiveBatchContext> 
filterBatch(CloseableIterable<HiveBatchContext> batches) {
 
+    CloseableIterator<HiveBatchContext> iterator = new 
DeleteFilterBatchIterator(batches);
+
+    return new CloseableIterable<HiveBatchContext>() {
+
+      @Override
+      public CloseableIterator<HiveBatchContext> iterator() {
+        return iterator;
+      }
+
+      @Override
+      public void close() throws IOException {
+        iterator.close();
+      }
+    };
+  }
+
+  // VRB iterator with the delete filter
+  private class DeleteFilterBatchIterator implements 
CloseableIterator<HiveBatchContext> {
+
     // Delete filter pipeline setup logic:
     // A HiveRow iterable (deleteInputIterable) is provided as input iterable 
for the DeleteFilter.
     // The content in deleteInputIterable is provided by row iterators from 
the incoming VRBs i.e. on the arrival of
     // a new batch the underlying iterator gets swapped.
-    SwappableHiveRowIterable deleteInputIterable = new 
SwappableHiveRowIterable();
+    private final SwappableHiveRowIterable deleteInputIterable;
 
     // Output iterable of DeleteFilter, and its iterator
-    CloseableIterable<HiveRow> deleteOutputIterable = 
filter(deleteInputIterable);
-    CloseableIterator<HiveRow> deleteOutputIterator = 
deleteOutputIterable.iterator();
+    private final CloseableIterable<HiveRow> deleteOutputIterable;
 
-    return new CloseableIterable<HiveBatchContext>() {
+    private final CloseableIterator<HiveBatchContext> srcIterator;
 
-      @Override
-      public CloseableIterator<HiveBatchContext> iterator() {
+    DeleteFilterBatchIterator(CloseableIterable<HiveBatchContext> batches) {
+      deleteInputIterable = new SwappableHiveRowIterable();
+      deleteOutputIterable = filter(deleteInputIterable);
+      srcIterator = batches.iterator();
+    }
 
-        CloseableIterator<HiveBatchContext> srcIterator = batches.iterator();
+    @Override
+    public boolean hasNext() {
+      return srcIterator.hasNext();
+    }
 
-        return new CloseableIterator<HiveBatchContext>() {
+    @Override
+    public HiveBatchContext next() {
+      try {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        HiveBatchContext batchContext = srcIterator.next();
+        VectorizedRowBatch batch = batchContext.getBatch();
 
-          @Override
-          public boolean hasNext() {
-            return srcIterator.hasNext();
-          }
+        int oldSize = batch.size;
+        int newSize = 0;
 
-          @Override
-          public HiveBatchContext next() {
-            try {
-              if (!hasNext()) {
-                throw new NoSuchElementException();
-              }
-              HiveBatchContext currentBatchContext = srcIterator.next();
-              deleteInputIterable.currentRowIterator = 
currentBatchContext.rowIterator();
-              VectorizedRowBatch batch = currentBatchContext.getBatch();
-
-              int oldSize = batch.size;
-              int newSize = 0;
-
-              // Apply delete filtering and adjust the selected array so that 
undeleted row indices are filled with it.
-              while (deleteOutputIterator.hasNext()) {
-                HiveRow row = deleteOutputIterator.next();
-                if (!row.isDeleted()) {
-                  batch.selected[newSize++] = row.physicalBatchIndex();
-                }
-              }
-
-              if (newSize < oldSize) {
-                batch.size = newSize;
-                batch.selectedInUse = true;
-              }
-              return currentBatchContext;
-            } catch (IOException e) {
-              throw new UncheckedIOException(e);
-            }
-          }
+        try (CloseableIterator<HiveRow> rowIterator = 
batchContext.rowIterator()) {
+          deleteInputIterable.currentRowIterator = rowIterator;
 
-          @Override
-          public void close() throws IOException {
-            srcIterator.close();
+          // Apply delete filtering and adjust the selected array so that 
undeleted row indices are filled with it.
+          for (HiveRow row : deleteOutputIterable) {
+            if (!row.isDeleted()) {
+              batch.selected[newSize++] = row.physicalBatchIndex();
+            }
           }
-        };
+        }
+        if (newSize < oldSize) {
+          batch.size = newSize;
+          batch.selectedInUse = true;
+        }
+        return batchContext;
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
       }
+    }
 
-      @Override
-      public void close() throws IOException {
-        batches.close();
-      }
-    };
+    @Override
+    public void close() throws IOException {
+      IOUtils.close(srcIterator, deleteOutputIterable);
+    }
   }
 
   // HiveRow iterable that wraps an interchangeable source HiveRow iterable
-  static class SwappableHiveRowIterable implements CloseableIterable<HiveRow> {
+  private static class SwappableHiveRowIterable implements 
CloseableIterable<HiveRow> {
 
     private CloseableIterator<HiveRow> currentRowIterator;
 
     @Override
     public CloseableIterator<HiveRow> iterator() {
-
-      return new CloseableIterator<HiveRow>() {
-
-        @Override
-        public boolean hasNext() {
-          return currentRowIterator.hasNext();
-        }
-
-        @Override
-        public HiveRow next() {
-          return currentRowIterator.next();
-        }
-
-        @Override
-        public void close() throws IOException {
-          currentRowIterator.close();
-        }
-      };
+      return currentRowIterator;
     }
 
     @Override
     public void close() throws IOException {
-      currentRowIterator.close();
+      IOUtils.close(currentRowIterator);
     }
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
index 7208a5f4076..dbaf5421968 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -63,7 +64,6 @@
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
@@ -91,7 +91,7 @@ public final class IcebergRecordReader<T> extends 
AbstractIcebergRecordReader<T>
     }
   }
 
-  private Iterable<FileScanTask> tasks;
+  private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
   private T current;
 
@@ -99,14 +99,13 @@ public final class IcebergRecordReader<T> extends 
AbstractIcebergRecordReader<T>
   public void initialize(InputSplit split, TaskAttemptContext newContext) {
     // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
     super.initialize(split, newContext);
-    ScanTaskGroup<FileScanTask> task = ((IcebergSplit) split).taskGroup();
-    this.tasks = task.tasks();
+    ScanTaskGroup<FileScanTask> taskGroup = ((IcebergSplit) split).taskGroup();
+    this.tasks = taskGroup.tasks().iterator();
     this.currentIterator = nextTask();
   }
 
   private CloseableIterator<T> nextTask() {
-    CloseableIterator<T> closeableIterator = CloseableIterable.concat(
-        Iterables.transform(tasks, task -> open(task, 
expectedSchema))).iterator();
+    CloseableIterator<T> closeableIterator = open(tasks.next(), 
expectedSchema).iterator();
     if (!isFetchVirtualColumns() || Utilities.getIsVectorized(conf)) {
       return closeableIterator;
     }
@@ -116,13 +115,15 @@ private CloseableIterator<T> nextTask() {
 
   @Override
   public boolean nextKeyValue() throws IOException {
-    if (currentIterator.hasNext()) {
-      current = currentIterator.next();
-      return true;
-    } else {
+    while (!currentIterator.hasNext()) {
       currentIterator.close();
-      return false;
+      if (!tasks.hasNext()) {
+        return false;
+      }
+      currentIterator = nextTask();
     }
+    current = currentIterator.next();
+    return true;
   }
 
   @Override

Reply via email to