This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new aec0b1136e5 HIVE-28843: Iceberg iterators refactor (Denys Kuzmenko,
reviewed by Ayush Saxena, Sourabh Badhya)
aec0b1136e5 is described below
commit aec0b1136e5bc838587f3896d9dda98d94104591
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Apr 16 16:51:15 2025 +0300
HIVE-28843: Iceberg iterators refactor (Denys Kuzmenko, reviewed by Ayush
Saxena, Sourabh Badhya)
Closes #5549
---
.../iceberg/mr/hive/vector/HiveDeleteFilter.java | 136 ++++++++++-----------
.../iceberg/mr/mapreduce/IcebergRecordReader.java | 23 ++--
2 files changed, 77 insertions(+), 82 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
index 8ed185e7996..73ea486a339 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.NoSuchElementException;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.iceberg.FileScanTask;
@@ -85,105 +86,98 @@ protected InputFile getInputFile(String location) {
*/
public CloseableIterable<HiveBatchContext>
filterBatch(CloseableIterable<HiveBatchContext> batches) {
+ CloseableIterator<HiveBatchContext> iterator = new
DeleteFilterBatchIterator(batches);
+
+ return new CloseableIterable<HiveBatchContext>() {
+
+ @Override
+ public CloseableIterator<HiveBatchContext> iterator() {
+ return iterator;
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterator.close();
+ }
+ };
+ }
+
+ // VRB iterator with the delete filter
+ private class DeleteFilterBatchIterator implements
CloseableIterator<HiveBatchContext> {
+
// Delete filter pipeline setup logic:
// A HiveRow iterable (deleteInputIterable) is provided as input iterable
for the DeleteFilter.
// The content in deleteInputIterable is provided by row iterators from
the incoming VRBs i.e. on the arrival of
// a new batch the underlying iterator gets swapped.
- SwappableHiveRowIterable deleteInputIterable = new
SwappableHiveRowIterable();
+ private final SwappableHiveRowIterable deleteInputIterable;
// Output iterable of DeleteFilter, and its iterator
- CloseableIterable<HiveRow> deleteOutputIterable =
filter(deleteInputIterable);
- CloseableIterator<HiveRow> deleteOutputIterator =
deleteOutputIterable.iterator();
+ private final CloseableIterable<HiveRow> deleteOutputIterable;
- return new CloseableIterable<HiveBatchContext>() {
+ private final CloseableIterator<HiveBatchContext> srcIterator;
- @Override
- public CloseableIterator<HiveBatchContext> iterator() {
+ DeleteFilterBatchIterator(CloseableIterable<HiveBatchContext> batches) {
+ deleteInputIterable = new SwappableHiveRowIterable();
+ deleteOutputIterable = filter(deleteInputIterable);
+ srcIterator = batches.iterator();
+ }
- CloseableIterator<HiveBatchContext> srcIterator = batches.iterator();
+ @Override
+ public boolean hasNext() {
+ return srcIterator.hasNext();
+ }
- return new CloseableIterator<HiveBatchContext>() {
+ @Override
+ public HiveBatchContext next() {
+ try {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ HiveBatchContext batchContext = srcIterator.next();
+ VectorizedRowBatch batch = batchContext.getBatch();
- @Override
- public boolean hasNext() {
- return srcIterator.hasNext();
- }
+ int oldSize = batch.size;
+ int newSize = 0;
- @Override
- public HiveBatchContext next() {
- try {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- HiveBatchContext currentBatchContext = srcIterator.next();
- deleteInputIterable.currentRowIterator =
currentBatchContext.rowIterator();
- VectorizedRowBatch batch = currentBatchContext.getBatch();
-
- int oldSize = batch.size;
- int newSize = 0;
-
- // Apply delete filtering and adjust the selected array so that
undeleted row indices are filled with it.
- while (deleteOutputIterator.hasNext()) {
- HiveRow row = deleteOutputIterator.next();
- if (!row.isDeleted()) {
- batch.selected[newSize++] = row.physicalBatchIndex();
- }
- }
-
- if (newSize < oldSize) {
- batch.size = newSize;
- batch.selectedInUse = true;
- }
- return currentBatchContext;
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
+ try (CloseableIterator<HiveRow> rowIterator =
batchContext.rowIterator()) {
+ deleteInputIterable.currentRowIterator = rowIterator;
- @Override
- public void close() throws IOException {
- srcIterator.close();
+ // Apply delete filtering and adjust the selected array so that
undeleted row indices are filled with it.
+ for (HiveRow row : deleteOutputIterable) {
+ if (!row.isDeleted()) {
+ batch.selected[newSize++] = row.physicalBatchIndex();
+ }
}
- };
+ }
+ if (newSize < oldSize) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ return batchContext;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
+ }
- @Override
- public void close() throws IOException {
- batches.close();
- }
- };
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(srcIterator, deleteOutputIterable);
+ }
}
// HiveRow iterable that wraps an interchangeable source HiveRow iterable
- static class SwappableHiveRowIterable implements CloseableIterable<HiveRow> {
+ private static class SwappableHiveRowIterable implements
CloseableIterable<HiveRow> {
private CloseableIterator<HiveRow> currentRowIterator;
@Override
public CloseableIterator<HiveRow> iterator() {
-
- return new CloseableIterator<HiveRow>() {
-
- @Override
- public boolean hasNext() {
- return currentRowIterator.hasNext();
- }
-
- @Override
- public HiveRow next() {
- return currentRowIterator.next();
- }
-
- @Override
- public void close() throws IOException {
- currentRowIterator.close();
- }
- };
+ return currentRowIterator;
}
@Override
public void close() throws IOException {
- currentRowIterator.close();
+ IOUtils.close(currentRowIterator);
}
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
index 7208a5f4076..dbaf5421968 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -63,7 +64,6 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
@@ -91,7 +91,7 @@ public final class IcebergRecordReader<T> extends
AbstractIcebergRecordReader<T>
}
}
- private Iterable<FileScanTask> tasks;
+ private Iterator<FileScanTask> tasks;
private CloseableIterator<T> currentIterator;
private T current;
@@ -99,14 +99,13 @@ public final class IcebergRecordReader<T> extends
AbstractIcebergRecordReader<T>
public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
super.initialize(split, newContext);
- ScanTaskGroup<FileScanTask> task = ((IcebergSplit) split).taskGroup();
- this.tasks = task.tasks();
+ ScanTaskGroup<FileScanTask> taskGroup = ((IcebergSplit) split).taskGroup();
+ this.tasks = taskGroup.tasks().iterator();
this.currentIterator = nextTask();
}
private CloseableIterator<T> nextTask() {
- CloseableIterator<T> closeableIterator = CloseableIterable.concat(
- Iterables.transform(tasks, task -> open(task,
expectedSchema))).iterator();
+ CloseableIterator<T> closeableIterator = open(tasks.next(),
expectedSchema).iterator();
if (!isFetchVirtualColumns() || Utilities.getIsVectorized(conf)) {
return closeableIterator;
}
@@ -116,13 +115,15 @@ private CloseableIterator<T> nextTask() {
@Override
public boolean nextKeyValue() throws IOException {
- if (currentIterator.hasNext()) {
- current = currentIterator.next();
- return true;
- } else {
+ while (!currentIterator.hasNext()) {
currentIterator.close();
- return false;
+ if (!tasks.hasNext()) {
+ return false;
+ }
+ currentIterator = nextTask();
}
+ current = currentIterator.next();
+ return true;
}
@Override