hi Matt, This is because `arrow::compute::IsIn` is being called on each batch materialized by the datasets API so the internal kernel state is being set up and torn down for every batch. This means that a large hash table is being set up and torn down many times rather than only created once as with pandas.
https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/filter.cc#L1223 This is definitely a performance problem that must be fixed at some point. https://issues.apache.org/jira/browse/ARROW-10097 Thanks, Wes On Fri, Sep 25, 2020 at 10:49 AM Matthew Corley <[email protected]> wrote: > > I don't want to cloud this discussion, but I do want to mention that when I > tested isin filtering with a high cardinality (on the order of many millions) > set on a parquet dataset with a single rowgroup (so filtering should have to > happen after data load), it performed much worse in terms of runtime and peak > memory utilization than waiting to do the filtering after converting the > Table to a pandas DataFrame. This surprised me given all the copying that > has to occur in the latter case. > > I don't have my exact experiment laying around anymore to give concrete > numbers, but it might be worth investigating while the isin filter code is > under consideration. > > On Fri, Sep 25, 2020 at 6:55 AM Josh Mayer <[email protected]> wrote: >> >> Thanks Joris, that info is very helpful. A few follow up questions, you >> mention that: >> >> > ... it actually needs to parse the statistics of all row groups of all >> > files to determine which can be skipped ... >> >> Is that something that is only done once (and perhaps stored inside a >> dataset object in some optimized form) or performed on every to_table call? >> >> In the case that I am creating a dataset from a common metadata file is it >> possible to attach manual partitioning information (using field expressions >> on to each file), similar to how it is done in the manual dataset creation >> case >> (https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset)? >> >> Josh >> >> On Fri, Sep 25, 2020 at 8:34 AM Joris Van den Bossche >> <[email protected]> wrote: >>> >>> Using a small toy example, the "isin" filter is indeed not working for >>> filtering row groups: >>> >>> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value": >>> >>> np.arange(20)}) >>> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5) >>> >>> dataset = ds.dataset("test_filter_string.parquet") >>> # get the single file fragment (dataset consists of one file) >>> >>> fragment = list(dataset.get_fragments())[0] >>> >>> fragment.ensure_complete_metadata() >>> >>> # check that we do have statistics for our row groups >>> >>> fragment.row_groups[0].statistics >>> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}} >>> >>> # I created the file such that there are 4 row groups (each with a unique >>> value in the name column) >>> >>> fragment.split_by_row_group() >>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>] >>> >>> # simple equality filter works as expected -> only single row group left >>> >>> filter = ds.field("name") == "a" >>> >>> fragment.split_by_row_group(filter) >>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>] >>> >>> # isin filter does not work >>> >>> filter = ds.field("name").isin(["a", "b"]) >>> >>> fragment.split_by_row_group(filter) >>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>, >>> <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>] >>> >>> While filtering with "isin" on partition columns is working fine. I opened >>> https://issues.apache.org/jira/browse/ARROW-10091 to track this as a >>> possible enhancement. >>> Now, to explain why for partitions this is an "easier" case: the partition >>> information gets translated into an equality expression, with your example >>> an expression like "name == 'a' ", while the statistics give a >>> bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from >>> the min/max). So for the equality it is more trivial to compare this with >>> an "isin" expression like "name in ['a', 'b']" (for the min/max expression, >>> we would need to check the special case where min/max is equal). >>> >>> Joris >>> >>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche >>> <[email protected]> wrote: >>>> >>>> Hi Josh, >>>> >>>> Thanks for the question! >>>> >>>> In general, filtering on partition columns will be faster than filtering >>>> on actual data columns using row group statistics. For partition-based >>>> filtering, the scanner can skip full files based on the information from >>>> the file path (in your example case, there are 11 files, for which it can >>>> select 4 of them to actually read), while for row-group-based filtering, >>>> it actually needs to parse the statistics of all row groups of all files >>>> to determine which can be skipped, which is typically more information to >>>> process compared to the file paths. >>>> >>>> That said, there are some oddities I noticed: >>>> >>>> - As I mentioned, I expect partition-based filtering to faster, but not >>>> that much faster (certainly in a case with a limited number of files, the >>>> overhead of the parsing / filtering row groups should be really minimal) >>>> - Inspecting the result a bit, it seems that for the first dataset >>>> (without partitioning) it's not actually applying the filter correctly. >>>> The min/max for the name column are included in the row group statistics, >>>> but the isin filter didn't actually filter them out. Something to >>>> investigate, but that certainly explains the difference in performance >>>> (it's actually reading all data, and only filtering after reading, not >>>> skipping some parts before reading) >>>> - In your case, the partitioning has the same name as one of the actual >>>> columns in the data files. I am not sure this corner case of duplicate >>>> fields is tested very well, or how the filtering will work? >>>> >>>> Joris >>>> >>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <[email protected]> wrote: >>>>> >>>>> I am comparing two datasets with a filter on a string column (that is >>>>> also a partition column). I create the dataset from a common metadata >>>>> file. In the first case I omit the partitioning information whereas in >>>>> the second I include it. I would expect the performance to be similar >>>>> since the column statistics should be able to identify the same row >>>>> groups as the partitioning. However, I'm seeing the first case run almost >>>>> 3x slower. Is this expected? >>>>> >>>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1): >>>>> >>>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f
