Oh!  I didn't know that.  Thanks.  I think this suggests even more that the
problem is with the discovery taking too long and not with the filter being
applied but I suppose we should benchmark both at some point.

On Thu, Aug 4, 2022 at 5:01 PM David Li <lidav...@apache.org> wrote:

> FWIW, we _should_ already perform the "subtree" filtering (see
> subtree_internal.h [1]) so either it's not the bottleneck or the
> optimization is not as effective as we would like. Or possibly we need to
> maintain the files as the tree in the first place instead of trying to
> recover the structure later [2].
>
> [1]:
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/subtree_internal.h
> [2]:
> https://github.com/apache/arrow/blob/60a1919527003a55a39785633bdbbbeef412c362/cpp/src/arrow/dataset/file_base.cc#L207-L268
>
> On Thu, Aug 4, 2022, at 17:30, Weston Pace wrote:
>
> Awesome.
>
> # Partitioning (src/arrow/dataset/partition.h)
>
> The first spot to look at might be to understand the Partitioning class.
> A Partitioning (e.g. hive partitioning, directory partitioning, filename
> partitioning) has two main methods that convert between a path (e.g.
> "/state_code=11/city_code=106/chunk-0.parquet") and an expression (e.g.
> field_ref("state_code") == 11 && field_ref("city_code") == 106).
>
> virtual Result<compute::Expression> Parse(const std::string& path) const =
> 0;
> virtual Result<PartitionPathFormat> Format(const compute::Expression&
> expr) const = 0;
>
> We use expressions instead of something simpler like a dictionary of
> key/value pairs.  I believe the intention was to leave the door opening for
> unique partitionings that might map something like "temp=medium" to an
> expression like "30 < field_ref("temp") < 60" but in practice these
> expressions are always a collection of equality expressions and'd together.
>
> One thing I'm fairly certain should work, but we might want to verify (and
> potentially add unit tests), is that hive & directory partitioning can
> correctly convert directories (e.g. "/state_code=11") into expressions and
> that we aren't relying on full paths.
>
> # Discovery (src/arrow/dataset/discovery.h)
>
> The next class is DatasetFactory and, specifically, we are probably
> interested in FileSystemDatasetFactory.  The FileSystemDatasetFactory class
> scans a directory, discovering files.  The FileSystemDatasetFactory can
> also discover a partitioning.  This is all usually transparent in pyarrow
> but there are bindings and you can use this class explicitly:
>
> import pyarrow.dataset as ds
> import pyarrow.fs as fs
> local_fs = fs.LocalFileSystem()
> format = ds.ParquetFileFormat()
> opts =
> ds.FileSystemFactoryOptions(partitioning=ds.HivePartitioning.discover())
> factory = ds.FileSystemDatasetFactory(local_fs,
> fs.FileSelector('/tmp/my_dataset', recursive=True), format, opts)
> sch = factory.inspect()
> my_dataset = factory.finish(sch)
> print(my_dataset.partitioning.schema)
> # state_code: int32
> # city_code: int32
>
> Although, in practice, we usually would write something much shorter:
>
> import pyarrow.dataset as ds
> my_dataset = ds.dataset('/tmp/my_dataset', partitioning='hive') # This one
> line will expand to everything above
>
> If, however, we want to apply a filter while we are discovering the
> dataset, then we cannot rely on dataset discovery to also discover our
> partition.  We will have to specify it manually (or you could run dataset
> discovery once, discover the partitioning, and then run it many times using
> this discovered partition).  So I think, in the end, we want to be able to
> support something like this:
>
> import pyarrow.dataset as ds
> partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
> pa.int32()), pa.field('city_code', pa.int32())]))
> filter = (ds.field('state_code') == 31) & (ds.field('city_code') == 6200)
> ds = ds.dataset('/tmp/my_dataset', partitioning=partitioning,
> filter=filter)
>
> This will require making changes to FileSystemDatasetFactory.  We will
> want to add "filter" to FileSystemFactoryOptions...
>
> struct FileSystemFactoryOptions {
>
>   PartitioningOrFactory partitioning{Partitioning::Default()};
>   std::string partition_base_dir;
>   bool exclude_invalid_files = false;
>   std::vector<std::string> selector_ignore_prefixes = {".", "_",};
>   // `compute::literal(true)` is often used as a "default" value for a
> filter
>   compute::Expression filter = compute::literal(true);
> };
>
> * If a filter is specified (is not literal(true)) then partitioning MUST
> be a partitioning (and not a partitioning factory).
>
> Then we need to modify the discovery process itself to only list files
> that match the filter.  Unfortunately, that might end up being a little
> tricky.  The code right now is simply...
>
> ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
>
> So FileSystemDatasetFactory is not actually doing the recursive directory
> walking logic.  Instead it is just asking for all files matching a
> selector.  Maybe GetFileInfo could be extended to take an optional
> predicate which will be applied to directories before entering them.
> Although this might not be straightforward for remote filesystem
> implementations (e.g. S3, GCS) that don't actually "walk" a directory but
> instead do simple prefix matching.  Another option could be to extend
> Partitioning so that it could take in an expression and return a prefix.
> GetFileInfo could then be extended to only return files that match the
> given prefix.
>
> Finally, you'd want to create some kind of benchmark and/or unit tests to
> justify the change and prevent regression.
>
> A simple python benchmark (using today's code without a filter) could be:
>
> import os
> import tempfile
> import time
>
> import pyarrow as pa
> import pyarrow.dataset as ds
> import pyarrow.parquet as pq
>
> simple_table = pa.Table.from_pydict({'x': [1, 2, 3]})
>
> with tempfile.TemporaryDirectory() as tempdir:
>
>     for state_code in range(100):
>         for city_code in range(100):
>             pq_dir = os.path.join(tempdir, f'state_code={state_code}',
> f'city_code={city_code}')
>             os.makedirs(pq_dir)
>             pq_path = os.path.join(pq_dir, 'chunk-0.parquet')
>             pq.write_table(simple_table, pq_path)
>
>     start = time.time()
>     partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
> pa.int32()), pa.field('city_code', pa.int32())]))
>     my_dataset = ds.dataset(tempdir, partitioning=partitioning)
>     end = time.time()
>     print(f'Elapsed: {end - start}s')
>
> # Approach 2, faster filtering
>
> There is an entirely different approach that could be taken which wouldn't
> speed up the discovery at all, but should speed up the filtering.  In this
> approach you could modify FileSystemDataset so that, instead of storing a
> flat list of FileFragment objects, it stored a tree of FileFragment
> objects.  Each node of the tree would have its own partitioning
> expression.  Then, GetFragments(predicate) could walk the tree (in DFS
> order), skipping entire nodes that fail the predicate.
>
> On Thu, Aug 4, 2022 at 9:54 AM Tomaz Maia Suller <tmsul...@stefanini.com>
> wrote:
>
> Weston, I'm interested in following up.
>
>
>
> ------------------------------
>
> *De:* Weston Pace <weston.p...@gmail.com>
> *Enviado:* quinta-feira, 4 de agosto de 2022 12:15
> *Para:* user@arrow.apache.org <user@arrow.apache.org>
> *Assunto:* Re: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
>
> Você não costuma receber emails de weston.p...@gmail.com. Saiba por que
> isso é importante <https://aka.ms/LearnAboutSenderIdentification>
>
> *[EXTERNAL EMAIL]*
>
>
> There is a lot of room for improvement here.  In the datasets API the call
> that you have described (read_parquet) is broken into two steps:
>
>  * dataset discovery
>
> During dataset discovery we don't use any partition filter.  The goal is
> to create the "total dataset" of all the files.  So in your case this means
> listing out all 150,120 directories.  For every file we discover we capture
> a partition expression for this file.  This is probably where the bulk of
> time is being spent (listing the directories).
>
>  * dataset read
>
> During the dataset read we apply the partition filter.  So we are going to
> iterate through all ~150k files and compare the filter expression with the
> previously captured partition expression, eliminating files that don't
> match.  In this phase we don't have any idea of the original directory
> structure.  So instead of performing 27 top-level comparisons + 5560
> second-level comparisons we end up having to calculate all 150k comparisons.
>
> Both of these steps are considerably longer than they need to be.  If I
> were to guess I would guess that a majority of the time is spent in that
> first step but I don't think the time spent in that second step is
> negligible.
>
> One fairly straightforward solution would be to allow the partition filter
> to be used during dataset discovery.  This would yield a much smaller
> dataset so step 2 would be much faster but it could also allow the
> discovery process to skip entire directories.  If anyone is interested in
> working on a fix for this I'd be happy to point them at the files that will
> need to be changed and go into a more detailed discussion of potential
> solutions.
>
>
> On Thu, Aug 4, 2022 at 5:53 AM Tomaz Maia Suller <tmsul...@stefanini.com>
> wrote:
>
> Hi David,
>
> I wonder if the problem with the attachments has to do with the files not
> having extensions... I'm trying to send them with .prof this time.
>
> Anyway:
>
>    1. I'm writing to a local filesystem; I've mounted a NFTS partition
>    which is on a HDD. Since the dataset is only ~1.5 GB, I'll try to move it
>    to the SSD I have available and see if I get lower access times.
>    2. I'm using trying to use ParquetDataset; though I'm using it
>    directly most of the time, i.e. I'm using Pandas which then itself uses (if
>    I understood it correctly) ParquetDataset.
>
> I've tried accessing with both the legacy and new versions of the API,
> according to that use_legacy_dataset parameter. The legacy API is
> significantly faster, with access time of about 1 second, though still
> ridiculously slow compared to accessing the path straight away.
>
> If the attachments still don't work for some reason, I'll write up what I
> ran:
>
>
> >>> pq_command_new = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=False)"
> >>> pq_command_old = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=True)"
> >>> pq_baseline = "pq.ParquetDataset('./state_code=31/city_code=6200')"
>
> >>> cProfile.run(pq_command_new, '/tmp/pq_legacy_false.prof')
> This took about 17 seconds.
>
> >>> cProfile.run(pq_command_old, '/tmp/pq_legacy_true.prof')
> This took about 1 second.
>
> >>> cProfile.run(pq_baseline, '/tmp/pq_legacy_true.prof')
> This took 0.0075 second.
>
> These runs were all after the first run after booting up the computer,
> which took over 500 seconds as I've said.
>
> I'm starting to think I should send this to the development mailing list
> rather than the user one, since the obvious solution is specifying the
> paths directly rather than trying to use the API.
>
> ------------------------------
>
> *De:* Lee, David <david....@blackrock.com>
> *Enviado:* quarta-feira, 3 de agosto de 2022 19:49
> *Para:* user@arrow.apache.org <user@arrow.apache.org>
> *Assunto:* RE: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
>
> Você não costuma receber emails de david....@blackrock.com. Saiba por que
> isso é importante <https://aka.ms/LearnAboutSenderIdentification>
>
> *[EXTERNAL EMAIL]*
>
>
>
>
> The attachments didn’t come through properly..
>
>
>
> I’ve got additional questions.
>
>
>
>    1. What filesystem are these files stored on? I’ve seen issues using
>    S3 if HEAD operations aren’t prioritized. I’m assuming that without HEAD
>    operations you can’t effectively scan a parquet file’s footer and reading
>    the entire file isn’t efficient.
>
>
>
> *available (eventual consistency for HEAD operations)* Behaves the same
> as the “read-after-new-write” consistency level, but only provides eventual
> consistency for HEAD operations. Offers higher availability for HEAD
> operations than “read-after-new-write” if Storage Nodes are unavailable.
> Differs from AWS S3 consistency guarantees for HEAD operations only.
>
>
> 2.    Are you using pyarrow.parquet.ParquetDataset or pyarrow.dataset?
>
> https://arrow.apache.org/docs/python/parquet.html
>
> *Note*
>
> The ParquetDataset is being reimplemented based on the new generic Dataset
> API (see the Tabular Datasets
> <https://arrow.apache.org/docs/python/dataset.html#dataset> docs for an
> overview). This is not yet the default, but can already be enabled by
> passing the use_legacy_dataset=False keyword to *ParquetDataset* or
> *read_table()*:
>
> pq.ParquetDataset('dataset_name/', use_legacy_dataset=*False*)
>
> Enabling this gives the following new features:
>
>    - Filtering on all columns (using row group statistics) instead of
>    only on the partition keys.
>    - More fine-grained partitioning: support for a directory partitioning
>    scheme in addition to the Hive-like partitioning (e.g. “/2019/11/15/”
>    instead of “/year=2019/month=11/day=15/”), and the ability to specify a
>    schema for the partition keys.
>    - General performance improvement and bug fixes.
>
> It also has the following changes in behaviour:
>
>    - The partition keys need to be explicitly included in the columns keyword
>    when you want to include them in the result while reading a subset of the
>    columns
>
> This new implementation is already enabled in read_table, and in the
> future, this will be turned on by default for ParquetDataset. The new
> implementation does not yet cover all existing ParquetDataset features
> (e.g. specifying the metadata, or the pieces property API). Feedback is
> very welcome.
>
>
>
>
>
> *From:* Tomaz Maia Suller <tmsul...@stefanini.com>
> *Sent:* Wednesday, August 3, 2022 2:54 PM
> *To:* user@arrow.apache.org
> *Subject:* Issue filtering partitioned Parquet files on partition keys
> using PyArrow
>
>
>
> External Email: Use caution with links and attachments
>
> Hi,
>
>
>
> I'm trying to load a dataset I created consisting of Parquet files
> partitioned on two columns, but reading from a single partition takes over
> 10 minutes on the first try and still over 15 seconds on any subsequent one
> while specifying the path to the partition directly takes 50 milliseconds.
> Am I doing something wrong?
>
>
>
> The data is arranged in the following way:
>
> $ tree -d
>
> .
>
> ├── state_code=11
>
> │   ├── city_code=1005
>
> │   ├── city_code=106
>
> │   ├── city_code=1104
>
> │   ├── city_code=114
>
> │   ├── city_code=1203
>
> │   ├── city_code=122
>
> │   ├── city_code=130
>
> │   ├── city_code=1302
>
> ...
>
>
>
> There are 27 state codes and 5560 city codes, so 27 "first level"
> partitions and 5560 "second level" partitions in total. Each partition
> often contains only a few kBs worth of Parquet files and nome is greates
> than ~5MB. These files were written using PySpark and I have full control
> of how they're generated, in case you think there's a better way to arrange
> them. I chose this partitioning since I wish to analyse one city at a time;
> I have also experimented with having only a single level partitioning with
> 5560 partitions, but didn't see any increase in performance.
>
>
>
> I'm using Pandas to read the files, and have tried using PyArrow directly
> as well. Regardless, I've profiled the reading of a single partition using
> cProfile, and the results clearly show PyArrow is taking the longest to
> run. I've attached the results of two runs I did using IPython: one right
> after rebooting my computer, which took well over 500 seconds; and one
> executed right after that, which took about 15 seconds, with the following
> command:
>
>
>
> >>> command = "pd.read_parquet('.', engine='pyarrow',
> filters=[('state_code', '==', 31), ('city_code', '==', 6200)])"
>
> >>> cProfile.run(command, '/path/to/stats')
>
>
>
> It was much better the second time around but still terrible compared to
> specifying the path manually, which took around 50 milliseconds according
> to %timeit.
>
>
>
> I have absolutely no idea why the filesystem scan is taking so long. I
> have seen this issue https://issues.apache.org/jira/browse/ARROW-11781
> <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/ARROW-11781__;!!KSjYCgUGsB4!a08YdpWy7XPfHdgrfg8lMkMTia4epHAyid9h4ZrOEkINTFDbNphftopUuNpDDeL2-ZUSQttF_62sxBUHeVVk_g$>
>  related
> to the same problem, but it mentions there should be no performance issue
> as of July 2021, whereas I'm having problems right now.
>
>
>
> I think I'll stick to specifying the partitions to read "by hand", but I
> was really really curious on whether I messed something up, or if (Py)Arrow
> really is inefficient in a task which seems so trivial at first sight.
>
>
>
> Thanks,
>
> Tomaz.
>
>
>
> P.S.: It's my first time sending an email to a mailing list, so I hope
> sending attachments is okay, and sorry if it isn't.
>
>
>
>
>
>
> Importante: As informações deste e-mail são confidenciais. O uso não
> autorizado é proibido por lei. Por favor, considere o ambiente antes de
> imprimir.
>
> Important: The information on this e-mail is confidential. Non-authorized
> use is prohibited by law. Please Consider the Environment Before Printing.
>
>
>
>
>
> This message may contain information that is confidential or privileged.
> If you are not the intended recipient, please advise the sender immediately
> and delete this message. See
> http://www.blackrock.com/corporate/compliance/email-disclaimers for
> further information.  Please refer to
> http://www.blackrock.com/corporate/compliance/privacy-policy for more
> information about BlackRock’s Privacy Policy.
>
>
>
>
>
> For a list of BlackRock's office addresses worldwide, see
> http://www.blackrock.com/corporate/about-us/contacts-locations.
>
> © 2022 BlackRock, Inc. All rights reserved.
>
>
>
>

Reply via email to