aokolnychyi commented on code in PR #9251: URL: https://github.com/apache/iceberg/pull/9251#discussion_r1426529884
########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -474,68 +420,64 @@ private Collection<DeleteFile> loadDeleteFiles() { DeleteFileIndex build() { Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); - boolean useColumnStatsFiltering = false; + EqualityDeletes globalDeletes = new EqualityDeletes(); + PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById); + PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById); + CharSequenceMap<PositionDeletes> posDeletesByPath = CharSequenceMap.create(); - // build a map from (specId, partition) to delete file entries - Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap(); - ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile> deleteFilesByPartition = - Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile file : files) { - int specId = file.specId(); - PartitionSpec spec = specsById.get(specId); - StructLikeWrapper wrapper = - wrappersBySpecId - .computeIfAbsent(specId, id -> StructLikeWrapper.forType(spec.partitionType())) - .copyFor(file.partition()); - IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file); - deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile); - - if (!useColumnStatsFiltering) { - useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds(); + switch (file.content()) { + case POSITION_DELETES: + PositionDeletes posGroup = findGroup(posDeletesByPath, posDeletesByPartition, file); + posGroup.add(file); + break; + + case EQUALITY_DELETES: + PartitionSpec spec = specsById.get(file.specId()); + EqualityDeleteFile eqFile = new EqualityDeleteFile(spec, file); + EqualityDeletes eqGroup = findGroup(globalDeletes, eqDeletesByPartition, eqFile); + eqGroup.add(eqFile); + break; + + default: + throw new UnsupportedOperationException("Unsupported content: " + file.content()); } ScanMetricsUtil.indexedDeleteFile(scanMetrics, file); } - // sort the entries in each map value by sequence number and split into sequence numbers and - // delete files lists - Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> sortedDeletesByPartition = - Maps.newHashMap(); - // also, separate out equality deletes in an unpartitioned spec that should be applied - // globally - DeleteFileGroup globalDeletes = null; - for (Pair<Integer, StructLikeWrapper> partition : deleteFilesByPartition.keySet()) { Review Comment: Instead of doing two passes and always indexing/sorting everything, I switched to one pass and indexing on demand. We will load all files but index only those partitions that are affected by the query. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org