Thank you @Jacek Pliszka<mailto:jacek.plis...@gmail.com> for you feedback! 
Below my answers:

From: Jacek Pliszka <jacek.plis...@gmail.com>
Sent: venerdì 1 dicembre 2023 17:35
To: user@arrow.apache.org
Subject: Re: Usage of Azure filesystem with fsspec and adlfs and pyarrow to 
download a list of blobs (parquets) concurrently with columns pruning and rows 
filtering

Hi!

These files seem to be below 4MB which is the default Azure block. Possibly 
they are all read in full. Does someone know if in the approach below the blob 
is read only once from Azure even if multiple reads are called?
Yes, they are below 4MB each. I don’t know the answer and the implications 
about it so I can’t directly answer your question, sadly. If someone doesn’t 
mind elaborating more about it, I would be interested.

Is there correlation between "my_index" and the filenames you could possibly 
use to avoid reading all of them?
The index is a timestamp with type datetime64[ns] (from numpy/pandas/pyarrow 
AFAIK). The filenames are generated with the following pattern 
“year/month/day/hour/asset_name” so with time-granularity of 1 hour. Now we 
generate the list of blobs to download handling the logic of getting only the 
blobs which potentially have some data that we are interested in (so inside the 
1-hour granularity). The filters of pyarrow or post-downloading are needed if 
we require a finer granularity below the 1 hour or with ranges below it.

Another suggestion would be merging the files - either yourself or using 
something like Delta Tables and vacuum/compact methods.
We will assess this type of refactoring in the future, I believe. Can I do 
something with the current state of having many small files?

Best Regards,

Jacek

Thank you,
Luca Maurelli

pt., 1 gru 2023 o 15:46 Weston Pace 
<weston.p...@gmail.com<mailto:weston.p...@gmail.com>> napisał(a):
Those files are quite small.  For every single file pyarrow is going to need to 
read the metadata, determine which columns to read (column filtering), 
determine if any of the rows need to be read (using row filtering) and then 
actually issue the read.  If you combined all those files into one file then I 
would expect better performance.  The ability to read a single file in parallel 
is not going to be important here (each file is very small).  However, you will 
want to make sure it is reading multiple files at once.  I would expect that it 
is doing so but this would be a good thing to verify if you can.

One quick test you can always try is to run your script twice, at the same 
time.  If the total runtime is significantly faster than running the script 
twice, one after the other, then you can confirm that there are unexploited 
resources on the system.

It also looks like your data is partitioned by time and your filters are time 
based filters.  You might want to investigate dataset partitioning as that 
should be able to help.

On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli 
<lmaure...@camozzi.com<mailto:lmaure...@camozzi.com>> wrote:
I’m new to these libraries so bear with me, I am learning a lot these days.

I started using fsspec and adlfs with the idea of switching between a cloud 
storage to a local storage with little effort. I read that adlfs makes use of 
the Azure Blob Storage Python SDK which supports the use of async/await pattern 
to implement concurrent IO.
The Python SDK also exposes the max_concurrency argument in the download_blob 
function, for instance, to enable the download of a single blob with a thread 
pool (note: the single blob, I believe the use case here is that if the blob is 
very big you can split the download in parallel with this argument).

Now I wish to use adlfs with pyarrow/pandas to download a list of blobs 
(parquet) by exploiting the async methods of the Python SDK. Not knowing the 
libraries and their integration, I hope this is already taken care of, so I 
tried to code the following snippet:

import pandas as pd
import pyarrow.parquet as pq
import adlfs
import time
CONNECTION_STRING = "my_connection_string"
CONTAINER = "raw"
FILEPATHS = [
    
f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}/file.parquet"
    for day in range(1, 31)
    for hour in range(24)
]
fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING)
FILTERS = [
    [
        ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")),
        ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")),
    ]
]
COLUMNS = ["col1", "col2", "col3"]
start_time = time.time()
dataset = pq.ParquetDataset(
    path_or_paths=FILEPATHS,
    filters=FILTERS,
    filesystem=fs,
)
elapsed_time = time.time() - start_time
print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds")
start_time = time.time()
df = dataset.read_pandas(
    columns=COLUMNS
).to_pandas()
elapsed_time = time.time() - start_time
print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds")
Each blob has around 3600 rows and 95 columns. It tries to download 720 blobs 
in total. The final dataframe is 236404 rows x 95 columns with no columns/rows 
filtering.
If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE 1). If I 
also enforce the rows filtering, it has 1544 rows x 95 columns (CASE 2).

The timing of the cases is as follows:

  1.
# Elapsed time for ParquetDataset: 0.886232 seconds
# Elapsed time for read_pandas: 146.798920 seconds

  1.
# Elapsed time for ParquetDataset: 0.298594 seconds
# Elapsed time for read_pandas: 203.801083 seconds

I was expecting the case 1 to be faster since from the timestamp only the first 
blob should be actually downloaded and read (AFAIK parquet is smart and it 
makes use of schema/metadata for the rows/columns filtering).
I also was expecting case 2 to be faster in general: this is just a feeling 
(maybe I was expecting more from concurrent/parallel IO?).

My question: Can I do something better w.r.t performances here? The parquet 
files are really smalls compared to other online examples of dealing with 
parquet files. Maybe I can tweak some pyarrow arguments?

Thank you,
Luca

Luca Maurelli

Data Scientist
__________________________

[Camozzi Digital s.r.l.]
Camozzi Digital s.r.l.
Via Cassala 52
25126 Brescia (BS)
ITALY
Phone:
Fax:
Mobile:
Email:
lmaure...@camozzi.com<mailto:lmaure...@camozzi.com>
Website:
www.camozzidigital.com<http://www.camozzidigital.com/>
Questo messaggio di posta elettronica, comprensivo di eventuali allegati, è ad 
uso esclusivo di colui al quale è indirizzato e potrebbe contenere informazioni 
riservate. Nel caso in cui abbiate ricevuto questa comunicazione per errore, Vi 
invitiamo cortesemente a darcene notizia - contattando il mittente o 
telefonando al numero (+ 39) 030.37921 - ed a distruggere e cancellare il 
messaggio dal Vostro computer. This e-mail message, including any attachments, 
is for the exclusive use of the person to whom it is addressed and may contain 
confidential information. In case you are not the intended recipient, we kindly 
ask you to inform us - by contacting the sender or by calling the number (+ 39) 
030.37921 - and to destroy and delete the message from your computer.

Reply via email to