Oh! I didn't know that. Thanks. I think this suggests even more that the problem is with the discovery taking too long and not with the filter being applied but I suppose we should benchmark both at some point.
On Thu, Aug 4, 2022 at 5:01 PM David Li <lidav...@apache.org> wrote: > FWIW, we _should_ already perform the "subtree" filtering (see > subtree_internal.h [1]) so either it's not the bottleneck or the > optimization is not as effective as we would like. Or possibly we need to > maintain the files as the tree in the first place instead of trying to > recover the structure later [2]. > > [1]: > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/subtree_internal.h > [2]: > https://github.com/apache/arrow/blob/60a1919527003a55a39785633bdbbbeef412c362/cpp/src/arrow/dataset/file_base.cc#L207-L268 > > On Thu, Aug 4, 2022, at 17:30, Weston Pace wrote: > > Awesome. > > # Partitioning (src/arrow/dataset/partition.h) > > The first spot to look at might be to understand the Partitioning class. > A Partitioning (e.g. hive partitioning, directory partitioning, filename > partitioning) has two main methods that convert between a path (e.g. > "/state_code=11/city_code=106/chunk-0.parquet") and an expression (e.g. > field_ref("state_code") == 11 && field_ref("city_code") == 106). > > virtual Result<compute::Expression> Parse(const std::string& path) const = > 0; > virtual Result<PartitionPathFormat> Format(const compute::Expression& > expr) const = 0; > > We use expressions instead of something simpler like a dictionary of > key/value pairs. I believe the intention was to leave the door opening for > unique partitionings that might map something like "temp=medium" to an > expression like "30 < field_ref("temp") < 60" but in practice these > expressions are always a collection of equality expressions and'd together. > > One thing I'm fairly certain should work, but we might want to verify (and > potentially add unit tests), is that hive & directory partitioning can > correctly convert directories (e.g. "/state_code=11") into expressions and > that we aren't relying on full paths. > > # Discovery (src/arrow/dataset/discovery.h) > > The next class is DatasetFactory and, specifically, we are probably > interested in FileSystemDatasetFactory. The FileSystemDatasetFactory class > scans a directory, discovering files. The FileSystemDatasetFactory can > also discover a partitioning. This is all usually transparent in pyarrow > but there are bindings and you can use this class explicitly: > > import pyarrow.dataset as ds > import pyarrow.fs as fs > local_fs = fs.LocalFileSystem() > format = ds.ParquetFileFormat() > opts = > ds.FileSystemFactoryOptions(partitioning=ds.HivePartitioning.discover()) > factory = ds.FileSystemDatasetFactory(local_fs, > fs.FileSelector('/tmp/my_dataset', recursive=True), format, opts) > sch = factory.inspect() > my_dataset = factory.finish(sch) > print(my_dataset.partitioning.schema) > # state_code: int32 > # city_code: int32 > > Although, in practice, we usually would write something much shorter: > > import pyarrow.dataset as ds > my_dataset = ds.dataset('/tmp/my_dataset', partitioning='hive') # This one > line will expand to everything above > > If, however, we want to apply a filter while we are discovering the > dataset, then we cannot rely on dataset discovery to also discover our > partition. We will have to specify it manually (or you could run dataset > discovery once, discover the partitioning, and then run it many times using > this discovered partition). So I think, in the end, we want to be able to > support something like this: > > import pyarrow.dataset as ds > partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code', > pa.int32()), pa.field('city_code', pa.int32())])) > filter = (ds.field('state_code') == 31) & (ds.field('city_code') == 6200) > ds = ds.dataset('/tmp/my_dataset', partitioning=partitioning, > filter=filter) > > This will require making changes to FileSystemDatasetFactory. We will > want to add "filter" to FileSystemFactoryOptions... > > struct FileSystemFactoryOptions { > > PartitioningOrFactory partitioning{Partitioning::Default()}; > std::string partition_base_dir; > bool exclude_invalid_files = false; > std::vector<std::string> selector_ignore_prefixes = {".", "_",}; > // `compute::literal(true)` is often used as a "default" value for a > filter > compute::Expression filter = compute::literal(true); > }; > > * If a filter is specified (is not literal(true)) then partitioning MUST > be a partitioning (and not a partitioning factory). > > Then we need to modify the discovery process itself to only list files > that match the filter. Unfortunately, that might end up being a little > tricky. The code right now is simply... > > ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector)); > > So FileSystemDatasetFactory is not actually doing the recursive directory > walking logic. Instead it is just asking for all files matching a > selector. Maybe GetFileInfo could be extended to take an optional > predicate which will be applied to directories before entering them. > Although this might not be straightforward for remote filesystem > implementations (e.g. S3, GCS) that don't actually "walk" a directory but > instead do simple prefix matching. Another option could be to extend > Partitioning so that it could take in an expression and return a prefix. > GetFileInfo could then be extended to only return files that match the > given prefix. > > Finally, you'd want to create some kind of benchmark and/or unit tests to > justify the change and prevent regression. > > A simple python benchmark (using today's code without a filter) could be: > > import os > import tempfile > import time > > import pyarrow as pa > import pyarrow.dataset as ds > import pyarrow.parquet as pq > > simple_table = pa.Table.from_pydict({'x': [1, 2, 3]}) > > with tempfile.TemporaryDirectory() as tempdir: > > for state_code in range(100): > for city_code in range(100): > pq_dir = os.path.join(tempdir, f'state_code={state_code}', > f'city_code={city_code}') > os.makedirs(pq_dir) > pq_path = os.path.join(pq_dir, 'chunk-0.parquet') > pq.write_table(simple_table, pq_path) > > start = time.time() > partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code', > pa.int32()), pa.field('city_code', pa.int32())])) > my_dataset = ds.dataset(tempdir, partitioning=partitioning) > end = time.time() > print(f'Elapsed: {end - start}s') > > # Approach 2, faster filtering > > There is an entirely different approach that could be taken which wouldn't > speed up the discovery at all, but should speed up the filtering. In this > approach you could modify FileSystemDataset so that, instead of storing a > flat list of FileFragment objects, it stored a tree of FileFragment > objects. Each node of the tree would have its own partitioning > expression. Then, GetFragments(predicate) could walk the tree (in DFS > order), skipping entire nodes that fail the predicate. > > On Thu, Aug 4, 2022 at 9:54 AM Tomaz Maia Suller <tmsul...@stefanini.com> > wrote: > > Weston, I'm interested in following up. > > > > ------------------------------ > > *De:* Weston Pace <weston.p...@gmail.com> > *Enviado:* quinta-feira, 4 de agosto de 2022 12:15 > *Para:* user@arrow.apache.org <user@arrow.apache.org> > *Assunto:* Re: Issue filtering partitioned Parquet files on partition > keys using PyArrow > > > Você não costuma receber emails de weston.p...@gmail.com. Saiba por que > isso é importante <https://aka.ms/LearnAboutSenderIdentification> > > *[EXTERNAL EMAIL]* > > > There is a lot of room for improvement here. In the datasets API the call > that you have described (read_parquet) is broken into two steps: > > * dataset discovery > > During dataset discovery we don't use any partition filter. The goal is > to create the "total dataset" of all the files. So in your case this means > listing out all 150,120 directories. For every file we discover we capture > a partition expression for this file. This is probably where the bulk of > time is being spent (listing the directories). > > * dataset read > > During the dataset read we apply the partition filter. So we are going to > iterate through all ~150k files and compare the filter expression with the > previously captured partition expression, eliminating files that don't > match. In this phase we don't have any idea of the original directory > structure. So instead of performing 27 top-level comparisons + 5560 > second-level comparisons we end up having to calculate all 150k comparisons. > > Both of these steps are considerably longer than they need to be. If I > were to guess I would guess that a majority of the time is spent in that > first step but I don't think the time spent in that second step is > negligible. > > One fairly straightforward solution would be to allow the partition filter > to be used during dataset discovery. This would yield a much smaller > dataset so step 2 would be much faster but it could also allow the > discovery process to skip entire directories. If anyone is interested in > working on a fix for this I'd be happy to point them at the files that will > need to be changed and go into a more detailed discussion of potential > solutions. > > > On Thu, Aug 4, 2022 at 5:53 AM Tomaz Maia Suller <tmsul...@stefanini.com> > wrote: > > Hi David, > > I wonder if the problem with the attachments has to do with the files not > having extensions... I'm trying to send them with .prof this time. > > Anyway: > > 1. I'm writing to a local filesystem; I've mounted a NFTS partition > which is on a HDD. Since the dataset is only ~1.5 GB, I'll try to move it > to the SSD I have available and see if I get lower access times. > 2. I'm using trying to use ParquetDataset; though I'm using it > directly most of the time, i.e. I'm using Pandas which then itself uses (if > I understood it correctly) ParquetDataset. > > I've tried accessing with both the legacy and new versions of the API, > according to that use_legacy_dataset parameter. The legacy API is > significantly faster, with access time of about 1 second, though still > ridiculously slow compared to accessing the path straight away. > > If the attachments still don't work for some reason, I'll write up what I > ran: > > > >>> pq_command_new = "pq.ParquetDataset('.', filters=[('state_code', '==', > 31), ('city_code', '==', 6200)], use_legacy_dataset=False)" > >>> pq_command_old = "pq.ParquetDataset('.', filters=[('state_code', '==', > 31), ('city_code', '==', 6200)], use_legacy_dataset=True)" > >>> pq_baseline = "pq.ParquetDataset('./state_code=31/city_code=6200')" > > >>> cProfile.run(pq_command_new, '/tmp/pq_legacy_false.prof') > This took about 17 seconds. > > >>> cProfile.run(pq_command_old, '/tmp/pq_legacy_true.prof') > This took about 1 second. > > >>> cProfile.run(pq_baseline, '/tmp/pq_legacy_true.prof') > This took 0.0075 second. > > These runs were all after the first run after booting up the computer, > which took over 500 seconds as I've said. > > I'm starting to think I should send this to the development mailing list > rather than the user one, since the obvious solution is specifying the > paths directly rather than trying to use the API. > > ------------------------------ > > *De:* Lee, David <david....@blackrock.com> > *Enviado:* quarta-feira, 3 de agosto de 2022 19:49 > *Para:* user@arrow.apache.org <user@arrow.apache.org> > *Assunto:* RE: Issue filtering partitioned Parquet files on partition > keys using PyArrow > > > Você não costuma receber emails de david....@blackrock.com. Saiba por que > isso é importante <https://aka.ms/LearnAboutSenderIdentification> > > *[EXTERNAL EMAIL]* > > > > > The attachments didn’t come through properly.. > > > > I’ve got additional questions. > > > > 1. What filesystem are these files stored on? I’ve seen issues using > S3 if HEAD operations aren’t prioritized. I’m assuming that without HEAD > operations you can’t effectively scan a parquet file’s footer and reading > the entire file isn’t efficient. > > > > *available (eventual consistency for HEAD operations)* Behaves the same > as the “read-after-new-write” consistency level, but only provides eventual > consistency for HEAD operations. Offers higher availability for HEAD > operations than “read-after-new-write” if Storage Nodes are unavailable. > Differs from AWS S3 consistency guarantees for HEAD operations only. > > > 2. Are you using pyarrow.parquet.ParquetDataset or pyarrow.dataset? > > https://arrow.apache.org/docs/python/parquet.html > > *Note* > > The ParquetDataset is being reimplemented based on the new generic Dataset > API (see the Tabular Datasets > <https://arrow.apache.org/docs/python/dataset.html#dataset> docs for an > overview). This is not yet the default, but can already be enabled by > passing the use_legacy_dataset=False keyword to *ParquetDataset* or > *read_table()*: > > pq.ParquetDataset('dataset_name/', use_legacy_dataset=*False*) > > Enabling this gives the following new features: > > - Filtering on all columns (using row group statistics) instead of > only on the partition keys. > - More fine-grained partitioning: support for a directory partitioning > scheme in addition to the Hive-like partitioning (e.g. “/2019/11/15/” > instead of “/year=2019/month=11/day=15/”), and the ability to specify a > schema for the partition keys. > - General performance improvement and bug fixes. > > It also has the following changes in behaviour: > > - The partition keys need to be explicitly included in the columns keyword > when you want to include them in the result while reading a subset of the > columns > > This new implementation is already enabled in read_table, and in the > future, this will be turned on by default for ParquetDataset. The new > implementation does not yet cover all existing ParquetDataset features > (e.g. specifying the metadata, or the pieces property API). Feedback is > very welcome. > > > > > > *From:* Tomaz Maia Suller <tmsul...@stefanini.com> > *Sent:* Wednesday, August 3, 2022 2:54 PM > *To:* user@arrow.apache.org > *Subject:* Issue filtering partitioned Parquet files on partition keys > using PyArrow > > > > External Email: Use caution with links and attachments > > Hi, > > > > I'm trying to load a dataset I created consisting of Parquet files > partitioned on two columns, but reading from a single partition takes over > 10 minutes on the first try and still over 15 seconds on any subsequent one > while specifying the path to the partition directly takes 50 milliseconds. > Am I doing something wrong? > > > > The data is arranged in the following way: > > $ tree -d > > . > > ├── state_code=11 > > │ ├── city_code=1005 > > │ ├── city_code=106 > > │ ├── city_code=1104 > > │ ├── city_code=114 > > │ ├── city_code=1203 > > │ ├── city_code=122 > > │ ├── city_code=130 > > │ ├── city_code=1302 > > ... > > > > There are 27 state codes and 5560 city codes, so 27 "first level" > partitions and 5560 "second level" partitions in total. Each partition > often contains only a few kBs worth of Parquet files and nome is greates > than ~5MB. These files were written using PySpark and I have full control > of how they're generated, in case you think there's a better way to arrange > them. I chose this partitioning since I wish to analyse one city at a time; > I have also experimented with having only a single level partitioning with > 5560 partitions, but didn't see any increase in performance. > > > > I'm using Pandas to read the files, and have tried using PyArrow directly > as well. Regardless, I've profiled the reading of a single partition using > cProfile, and the results clearly show PyArrow is taking the longest to > run. I've attached the results of two runs I did using IPython: one right > after rebooting my computer, which took well over 500 seconds; and one > executed right after that, which took about 15 seconds, with the following > command: > > > > >>> command = "pd.read_parquet('.', engine='pyarrow', > filters=[('state_code', '==', 31), ('city_code', '==', 6200)])" > > >>> cProfile.run(command, '/path/to/stats') > > > > It was much better the second time around but still terrible compared to > specifying the path manually, which took around 50 milliseconds according > to %timeit. > > > > I have absolutely no idea why the filesystem scan is taking so long. I > have seen this issue https://issues.apache.org/jira/browse/ARROW-11781 > <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/ARROW-11781__;!!KSjYCgUGsB4!a08YdpWy7XPfHdgrfg8lMkMTia4epHAyid9h4ZrOEkINTFDbNphftopUuNpDDeL2-ZUSQttF_62sxBUHeVVk_g$> > related > to the same problem, but it mentions there should be no performance issue > as of July 2021, whereas I'm having problems right now. > > > > I think I'll stick to specifying the partitions to read "by hand", but I > was really really curious on whether I messed something up, or if (Py)Arrow > really is inefficient in a task which seems so trivial at first sight. > > > > Thanks, > > Tomaz. > > > > P.S.: It's my first time sending an email to a mailing list, so I hope > sending attachments is okay, and sorry if it isn't. > > > > > > > Importante: As informações deste e-mail são confidenciais. O uso não > autorizado é proibido por lei. Por favor, considere o ambiente antes de > imprimir. > > Important: The information on this e-mail is confidential. Non-authorized > use is prohibited by law. Please Consider the Environment Before Printing. > > > > > > This message may contain information that is confidential or privileged. > If you are not the intended recipient, please advise the sender immediately > and delete this message. See > http://www.blackrock.com/corporate/compliance/email-disclaimers for > further information. Please refer to > http://www.blackrock.com/corporate/compliance/privacy-policy for more > information about BlackRock’s Privacy Policy. > > > > > > For a list of BlackRock's office addresses worldwide, see > http://www.blackrock.com/corporate/about-us/contacts-locations. > > © 2022 BlackRock, Inc. All rights reserved. > > > >