sungwy commented on code in PR #2255: URL: https://github.com/apache/iceberg-python/pull/2255#discussion_r2274999411
########## pyiceberg/io/pyarrow.py: ########## @@ -1528,22 +1556,64 @@ def _task_to_record_batches( yield result_batch -def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: - deletes_per_file: Dict[str, List[ChunkedArray]] = {} - unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) - if len(unique_deletes) > 0: +def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: + deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] = {} + + positional_deletes = { + df + for task in tasks + for df in task.delete_files + if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN + } + if positional_deletes: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( lambda args: _read_deletes(*args), - [(io, delete_file) for delete_file in unique_deletes], + [(io, delete_file) for delete_file in positional_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): if file in deletes_per_file: deletes_per_file[file].append(arr) else: deletes_per_file[file] = [arr] + deletion_vectors = { + df + for task in tasks + for df in task.delete_files + if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN + } + if deletion_vectors: + executor = ExecutorFactory.get_or_create() + dv_results: Iterator[Dict[str, ChunkedArray]] = executor.map( + lambda args: _read_deletes(*args), + [(io, delete_file) for delete_file in deletion_vectors], + ) + for delete in dv_results: + for file, arr in delete.items(): + # Deletion vectors replace all position deletes for a file + deletes_per_file[file] = [arr] + + equality_delete_tasks = [] + for task in tasks: + equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] + if equality_deletes: + for delete_file in equality_deletes: + # create a group of datafile to associated equality delete + equality_delete_tasks.append((task.file.file_path, delete_file)) + + if equality_delete_tasks: + executor = ExecutorFactory.get_or_create() + # Processing equality delete tasks in parallel like position deletes + equality_delete_results = executor.map( + lambda args: (args[0], _read_deletes(io, args[1])), + equality_delete_tasks, + ) Review Comment: We are already getting a subset of the files that have equality deletes, so it would make sense to use a different function to read the deletes, than using the convoluted function `_read_deletes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org