>
> The ability to read a single file in parallel is not going to be important
> here (each file is very small).  However, you will want to make sure it
> is reading multiple files at once.  I would expect that it is doing so but
> this would be a good thing to verify if you can.
>
> One quick test you can always try is to run your script twice, at the same
> time.  If the total runtime is significantly faster than running the
> script twice, one after the other, then you can confirm that there are
> unexploited resources on the system.
>
>    1. This was my concern, indeed. I can run the tests as you mentioned
>    to assess the use of resources and see if they are downloaded in parallel.
>    2. However, I am also interested in the right use of the pyarrow
>    library to verify the concurrency of multiple IO. Is pyarrow implementing
>    this logic or is it derived from the fsspec integration? I’m asking this to
>    know if I should ask here or from the fsspec developers, instead.
>
> Pyarrow should apply some parallelism itself.  If there are multiple files
in the dataset it may read multiple files at once.  For each file, if there
are multiple row groups, it may read multiple row groups at once.  For each
row group, if you are fetching multiple columns, it may fetch individual
columns in parallel.  Pyarrow does not expect the filesystem to provide any
parallelism on its own and would not expect any fsspec methods to be
internally parallel (though it does not prevent this).

It also looks like your data is partitioned by time and your filters are
> time based filters.  You might want to investigate dataset partitioning
> as that should be able to help.
>
>    1. My data are time-series information where the timestamp of the
>    value is used as indexing. The columns are different time-series related to
>    the same asset (the blob file) and they are grouped with a granularity of 1
>    hour now.
>
> Do you believe portioning with such small files might be beneficial
> actually? I am worried that the overhead of the portioning would be higher
> that then pros, but it is just my personal feeling about it.
>

>    1. My other question is if I can do something by tweaking pyarrow
>    arguments or options for my use case of small files without the refactor of
>    point 1).
>
>
I agree, partitioning the files even smaller is probably not a good idea.
In fact, you may even want to partition by day instead of by hour (feel
free to experiment and benchmark).  Your sample query is targeting a 30
minute window and so I wanted to ensure you are only accessing a single
file (since only a single file should correspond to that 30 minute window
if I understand).

In your example you are creating a dataset with (30 * 24) files in it.
However, you are not providing any partitioning information to pyarrow
(pyarrow does not know that part of the filename is the date).  This means
that pyarrow is reading and searching all 720 files when it should only
need to search a single file
(az://{CONTAINER}/2023/11/08/08/file.parquet).  By providing partitioning
information to pyarrow you should be able to get a considerable speedup.

Take a look at
https://arrow.apache.org/docs/python/dataset.html#reading-partitioned-data
for more explanation.

On Mon, Dec 4, 2023 at 7:53 AM Jacek Pliszka <[email protected]>
wrote:

>
>
> pon., 4 gru 2023 o 14:41 Luca Maurelli <[email protected]> napisał(a):
>
>> Thank you @Jacek Pliszka <[email protected]> for you feedback!
>> Below my answers:
>>
>>
>>
>> *From:* Jacek Pliszka <[email protected]>
>> *Sent:* venerdì 1 dicembre 2023 17:35
>> *To:* [email protected]
>> *Subject:* Re: Usage of Azure filesystem with fsspec and adlfs and
>> pyarrow to download a list of blobs (parquets) concurrently with columns
>> pruning and rows filtering
>>
>>
>>
>> Hi!
>>
>>
>>
>> These files seem to be below 4MB which is the default Azure block.
>> Possibly they are all read in full. Does someone know if in the approach
>> below the blob is read only once from Azure even if multiple reads are
>> called?
>>
>> Yes, they are below 4MB each. I don’t know the answer and the
>> implications about it so I can’t directly answer your question, sadly. If
>> someone doesn’t mind elaborating more about it, I would be interested.
>>
>>
>>
>> Is there correlation between "my_index" and the filenames you could
>> possibly use to avoid reading all of them?
>>
>> The index is a timestamp with type datetime64[ns] (from
>> numpy/pandas/pyarrow AFAIK). The filenames are generated with the following
>> pattern “year/month/day/hour/asset_name” so with time-granularity of 1
>> hour. Now we generate the list of blobs to download handling the logic of
>> getting only the blobs which potentially have some data that we are
>> interested in (so inside the 1-hour granularity). The filters of pyarrow or
>> post-downloading are needed if we require a finer granularity below the 1
>> hour or with ranges below it.
>>
>
> I am not sure you do -  the filters the way you wrote them are applied
> after the file is downloaded, so you download files for all hours but read
> only for 8:00-8:30 - you can download only the files for 8th hour.
> If the half an hour is what you do - this will mean 24 times fewer files
> to read.
>
>
>> Another suggestion would be merging the files - either yourself or using
>> something like Delta Tables and vacuum/compact methods.
>>
>> We will assess this type of refactoring in the future, I believe. Can I
>> do something with the current state of having many small files?
>>
>>
> Others may give you better answers as I am not up to date but some time
> ago I used Azure SDK directly and downloaded small files to memory buffer:
>
> content = container.download_blob(path).readall() # container is
> azure.storage.blob.ContainerClient
> buffer = pa.BufferReader(content)
> pyarrow.parquet.read_table(buffer, columns=columns,filter=..)
> Then pa.concatenate_tables and combine_chunks
>
> I know it defeats the purpose of using adlfs and you have to parallelize
> it yourself but it worked well for me.
>
> BR,
>
> Jacek
>
>
>>
>> pt., 1 gru 2023 o 15:46 Weston Pace <[email protected]> napisał(a):
>>
>> Those files are quite small.  For every single file pyarrow is going to
>> need to read the metadata, determine which columns to read (column
>> filtering), determine if any of the rows need to be read (using row
>> filtering) and then actually issue the read.  If you combined all those
>> files into one file then I would expect better performance.  The ability to
>> read a single file in parallel is not going to be important here (each file
>> is very small).  However, you will want to make sure it is reading multiple
>> files at once.  I would expect that it is doing so but this would be a good
>> thing to verify if you can.
>>
>>
>>
>> One quick test you can always try is to run your script twice, at the
>> same time.  If the total runtime is significantly faster than running the
>> script twice, one after the other, then you can confirm that there are
>> unexploited resources on the system.
>>
>>
>>
>> It also looks like your data is partitioned by time and your filters are
>> time based filters.  You might want to investigate dataset partitioning as
>> that should be able to help.
>>
>>
>>
>> On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli <[email protected]>
>> wrote:
>>
>> I’m new to these libraries so bear with me, I am learning a lot these
>> days.
>>
>>
>>
>> I started using fsspec and adlfs with the idea of switching between a
>> cloud storage to a local storage with little effort. I read that adlfs
>> makes use of the Azure Blob Storage Python SDK which supports the use of
>> async/await pattern to implement concurrent IO.
>>
>> The Python SDK also exposes the max_concurrency argument in the
>> download_blob function, for instance, to enable the download of a single
>> blob with a thread pool (note: the single blob, I believe the use case here
>> is that if the blob is very big you can split the download in parallel with
>> this argument).
>>
>>
>>
>> Now I wish to use adlfs with pyarrow/pandas to download a list of blobs
>> (parquet) by exploiting the async methods of the Python SDK. Not knowing
>> the libraries and their integration, I hope this is already taken care of,
>> so I tried to code the following snippet:
>>
>>
>>
>> import pandas as pd
>>
>> import pyarrow.parquet as pq
>>
>> import adlfs
>>
>> import time
>>
>> CONNECTION_STRING = "my_connection_string"
>>
>> CONTAINER = "raw"
>>
>> FILEPATHS = [
>>
>>     f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}
>> /file.parquet"
>>
>>     for day in range(1, 31)
>>
>>     for hour in range(24)
>>
>> ]
>>
>> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING)
>>
>> FILTERS = [
>>
>>     [
>>
>>         ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")),
>>
>>         ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")),
>>
>>     ]
>>
>> ]
>>
>> COLUMNS = ["col1", "col2", "col3"]
>>
>> start_time = time.time()
>>
>> dataset = pq.ParquetDataset(
>>
>>     path_or_paths=FILEPATHS,
>>
>>     filters=FILTERS,
>>
>>     filesystem=fs,
>>
>> )
>>
>> elapsed_time = time.time() - start_time
>>
>> print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds")
>>
>> start_time = time.time()
>>
>> df = dataset.read_pandas(
>>
>>     columns=COLUMNS
>>
>> ).to_pandas()
>>
>> elapsed_time = time.time() - start_time
>>
>> print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds")
>>
>> Each blob has around 3600 rows and 95 columns. It tries to download 720
>> blobs in total. The final dataframe is 236404 rows x 95 columns with no
>> columns/rows filtering.
>>
>> If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE
>> 1). If I also enforce the rows filtering, it has 1544 rows x 95 columns
>> (CASE 2).
>>
>>
>>
>> The timing of the cases is as follows:
>>
>>    1.
>>
>> # Elapsed time for ParquetDataset: 0.886232 seconds
>>
>> # Elapsed time for read_pandas: 146.798920 seconds
>>
>>    1.
>>
>> # Elapsed time for ParquetDataset: 0.298594 seconds
>>
>> # Elapsed time for read_pandas: 203.801083 seconds
>>
>>
>>
>> I was expecting the case 1 to be faster since from the timestamp only the
>> first blob should be actually downloaded and read (AFAIK parquet is smart
>> and it makes use of schema/metadata for the rows/columns filtering).
>>
>> I also was expecting case 2 to be faster in general: this is just a
>> feeling (maybe I was expecting more from concurrent/parallel IO?).
>>
>>
>>
>> My question: Can I do something better w.r.t performances here? The
>> parquet files are really smalls compared to other online examples of
>> dealing with parquet files. Maybe I can tweak some pyarrow arguments?
>>
>>
>>
>> Thank you,
>>
>> Luca
>>
>>
>>
>> *Luca Maurelli*
>>
>> Data Scientist
>> *__________________________*
>>
>> [image: Camozzi Digital s.r.l.]
>> * Camozzi Digital s.r.l.*
>>
>>
>> * Via Cassala 52 25126 Brescia (BS) ITALY*
>>
>> *Phone:*
>>
>> *Fax:*
>>
>> *Mobile:*
>>
>> *Email:*
>>
>> [email protected]
>>
>> *Website:*
>>
>> www.camozzidigital.com
>>
>> Questo messaggio di posta elettronica, comprensivo di eventuali allegati,
>> è ad uso esclusivo di colui al quale è indirizzato e potrebbe contenere
>> informazioni riservate. Nel caso in cui abbiate ricevuto questa
>> comunicazione per errore, Vi invitiamo cortesemente a darcene notizia -
>> contattando il mittente o telefonando al numero (+ 39) 030.37921 - ed a
>> distruggere e cancellare il messaggio dal Vostro computer. This e-mail
>> message, including any attachments, is for the exclusive use of the person
>> to whom it is addressed and may contain confidential information. In case
>> you are not the intended recipient, we kindly ask you to inform us - by
>> contacting the sender or by calling the number (+ 39) 030.37921 - and to
>> destroy and delete the message from your computer.
>>
>>
>>
>>

Reply via email to