This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a2656872c Core: Defer reading Avro metadata until ManifestFile is read
(#5206)
a2656872c is described below
commit a2656872c2d3967d279d9f90c35a72da506aa8d6
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 6 07:01:38 2022 -0700
Core: Defer reading Avro metadata until ManifestFile is read (#5206)
---
.../java/org/apache/iceberg/ManifestGroup.java | 57 ++++++++++++++--------
1 file changed, 37 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 1120bdfb3..37b103d74 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -241,30 +243,45 @@ class ManifestGroup {
return Iterables.transform(
matchingManifests,
- manifest -> {
- ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io,
specsById)
- .filterRows(dataFilter)
- .filterPartitions(partitionFilter)
- .caseSensitive(caseSensitive)
- .select(columns);
-
- CloseableIterable<ManifestEntry<DataFile>> entries =
reader.entries();
- if (ignoreDeleted) {
- entries = reader.liveEntries();
- }
+ manifest -> new CloseableIterable<T>() {
+ private CloseableIterable<T> iterable;
- if (ignoreExisting) {
- entries = CloseableIterable.filter(entries,
- entry -> entry.status() != ManifestEntry.Status.EXISTING);
- }
+ @Override
+ public CloseableIterator<T> iterator() {
+ ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io,
specsById)
+ .filterRows(dataFilter)
+ .filterPartitions(partitionFilter)
+ .caseSensitive(caseSensitive)
+ .select(columns);
+
+ CloseableIterable<ManifestEntry<DataFile>> entries =
reader.entries();
+ if (ignoreDeleted) {
+ entries = reader.liveEntries();
+ }
+
+ if (ignoreExisting) {
+ entries = CloseableIterable.filter(entries,
+ entry -> entry.status() != ManifestEntry.Status.EXISTING);
+ }
- if (evaluator != null) {
- entries = CloseableIterable.filter(entries,
- entry -> evaluator.eval((GenericDataFile) entry.file()));
+ if (evaluator != null) {
+ entries = CloseableIterable.filter(entries,
+ entry -> evaluator.eval((GenericDataFile) entry.file()));
+ }
+
+ entries = CloseableIterable.filter(entries,
manifestEntryPredicate);
+
+ iterable = entryFn.apply(manifest, entries);
+
+ return iterable.iterator();
}
- entries = CloseableIterable.filter(entries, manifestEntryPredicate);
- return entryFn.apply(manifest, entries);
+ @Override
+ public void close() throws IOException {
+ if (iterable != null) {
+ iterable.close();
+ }
+ }
});
}
}