Lance Dacey created ARROW-15716: ----------------------------------- Summary: [Dataset][Python] Parse a list of fragment paths to gather filters Key: ARROW-15716 URL: https://issues.apache.org/jira/browse/ARROW-15716 Project: Apache Arrow Issue Type: Wish Affects Versions: 7.0.0 Reporter: Lance Dacey
Is it possible for partitioning.parse() to be updated to parse a list of paths instead of just a single path? I am passing the .paths from file_visitor to downstream tasks to process data which was recently saved, but I can run into problems with this if I overwrite data with delete_matching in order to consolidate small files since the paths won't exist. Here is the output of my current approach to use filters instead of reading the paths directly: {code:java} # Fragments saved during write_dataset ['dev/dataset/fragments/date_id=20210813/data-0.parquet', 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 'dev/dataset/fragments/date_id=20210114/data-0.parquet'] # Run partitioning.parse() on each fragment [<pyarrow.compute.Expression (date_id == 20210813)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>] # Format those expressions into a list of tuples [('date_id', 'in', [20210114, 20210813])] # Convert to an expression which is used as a filter in .to_table() is_in(date_id, {value_set=int64:[ 20210114, 20210813 ], skip_nulls=false}) {code} And here is how I am creating the filter from a list of .paths (perhaps there is a better way?): {code:python} partitioning = ds.HivePartitioning(partition_schema) expressions = [] for file in paths: expressions.append(partitioning.parse(file)) values = [] filters = [] for expression in expressions: partitions = ds._get_partition_keys(expression) if len(partitions.keys()) > 1: element = [(k, "==", v) for k, v in partitions.items()] if element not in filters: filters.append(element) else: for k, v in partitions.items(): if v not in values: values.append(v) filters = [(k, "in", sorted(values))] filt_exp = pa.parquet._filters_to_expression(filters) dataset.to_table(filter=filt_exp) {code} My hope would be to do something like filt_exp = partitioning.parse(paths) which would return a dataset expression. -- This message was sent by Atlassian Jira (v8.20.1#820001)