hi Matt,

This is because `arrow::compute::IsIn` is being called on each batch
materialized by the datasets API so the internal kernel state is being
set up and torn down for every batch. This means that a large hash
table is being set up and torn down many times rather than only
created once as with pandas.

https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/filter.cc#L1223

This is definitely a performance problem that must be fixed at some point.

https://issues.apache.org/jira/browse/ARROW-10097

Thanks,
Wes

On Fri, Sep 25, 2020 at 10:49 AM Matthew Corley <[email protected]> wrote:
>
> I don't want to cloud this discussion, but I do want to mention that when I 
> tested isin filtering with a high cardinality (on the order of many millions) 
> set on a parquet dataset with a single rowgroup (so filtering should have to 
> happen after data load), it performed much worse in terms of runtime and peak 
> memory utilization than waiting to do the filtering after converting the 
> Table to a pandas DataFrame.  This surprised me given all the copying that 
> has to occur in the latter case.
>
> I don't have my exact experiment laying around anymore to give concrete 
> numbers, but it might be worth investigating while the isin filter code is 
> under consideration.
>
> On Fri, Sep 25, 2020 at 6:55 AM Josh Mayer <[email protected]> wrote:
>>
>> Thanks Joris, that info is very helpful. A few follow up questions, you 
>> mention that:
>>
>> > ... it actually needs to parse the statistics of all row groups of all 
>> > files to determine which can be skipped ...
>>
>> Is that something that is only done once (and perhaps stored inside a 
>> dataset object in some optimized form) or performed on every to_table call?
>>
>> In the case that I am creating a dataset from a common metadata file is it 
>> possible to attach manual partitioning information (using field expressions 
>> on to each file), similar to how it is done in the manual dataset creation 
>> case 
>> (https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset)?
>>
>> Josh
>>
>> On Fri, Sep 25, 2020 at 8:34 AM Joris Van den Bossche 
>> <[email protected]> wrote:
>>>
>>> Using a small toy example, the "isin" filter is indeed not working for 
>>> filtering row groups:
>>>
>>> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value": 
>>> >>> np.arange(20)})
>>> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
>>> >>> dataset = ds.dataset("test_filter_string.parquet")
>>> # get the single file fragment (dataset consists of one file)
>>> >>> fragment = list(dataset.get_fragments())[0]
>>> >>> fragment.ensure_complete_metadata()
>>>
>>> # check that we do have statistics for our row groups
>>> >>> fragment.row_groups[0].statistics
>>> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
>>>
>>> # I created the file such that there are 4 row groups (each with a unique 
>>> value in the name column)
>>> >>> fragment.split_by_row_group()
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
>>>
>>> # simple equality filter works as expected -> only single row group left
>>> >>> filter = ds.field("name") == "a"
>>> >>> fragment.split_by_row_group(filter)
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
>>>
>>> # isin filter does not work
>>> >>> filter = ds.field("name").isin(["a", "b"])
>>> >>> fragment.split_by_row_group(filter)
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
>>>
>>> While filtering with "isin" on partition columns is working fine. I opened 
>>> https://issues.apache.org/jira/browse/ARROW-10091 to track this as a 
>>> possible enhancement.
>>> Now, to explain why for partitions this is an "easier" case: the partition 
>>> information gets translated into an equality expression, with your example 
>>> an expression like "name == 'a' ", while the statistics give a 
>>> bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from 
>>> the min/max). So for the equality it is more trivial to compare this with 
>>> an "isin" expression like "name in ['a', 'b']" (for the min/max expression, 
>>> we would need to check the special case where min/max is equal).
>>>
>>> Joris
>>>
>>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche 
>>> <[email protected]> wrote:
>>>>
>>>> Hi Josh,
>>>>
>>>> Thanks for the question!
>>>>
>>>> In general, filtering on partition columns will be faster than filtering 
>>>> on actual data columns using row group statistics. For partition-based 
>>>> filtering, the scanner can skip full files based on the information from 
>>>> the file path (in your example case, there are 11 files, for which it can 
>>>> select 4 of them to actually read), while for row-group-based filtering, 
>>>> it actually needs to parse the statistics of all row groups of all files 
>>>> to determine which can be skipped, which is typically more information to 
>>>> process compared to the file paths.
>>>>
>>>> That said, there are some oddities I noticed:
>>>>
>>>> - As I mentioned, I expect partition-based filtering to faster, but not 
>>>> that much faster (certainly in a case with a limited number of files, the 
>>>> overhead of the parsing / filtering row groups should be really minimal)
>>>> - Inspecting the result a bit, it seems that for the first dataset 
>>>> (without partitioning) it's not actually applying the filter correctly. 
>>>> The min/max for the name column are included in the row group statistics, 
>>>> but the isin filter didn't actually filter them out. Something to 
>>>> investigate, but that certainly explains the difference in performance 
>>>> (it's actually reading all data, and only filtering after reading, not 
>>>> skipping some parts before reading)
>>>> - In your case, the partitioning has the same name as one of the actual 
>>>> columns in the data files. I am not sure this corner case of duplicate 
>>>> fields is tested very well, or how the filtering will work?
>>>>
>>>> Joris
>>>>
>>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <[email protected]> wrote:
>>>>>
>>>>> I am comparing two datasets with a filter on a string column (that is 
>>>>> also a partition column). I create the dataset from a common metadata 
>>>>> file. In the first case I omit the partitioning information whereas in 
>>>>> the second I include it. I would expect the performance to be similar 
>>>>> since the column statistics should be able to identify the same row 
>>>>> groups as the partitioning. However, I'm seeing the first case run almost 
>>>>> 3x slower. Is this expected?
>>>>>
>>>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>>>>
>>>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f

Reply via email to