Sorry for the slow reply.

> This could be something on the Azure side but I find I am being bottlenecked 
> on the download speed and have noticed if I spin up multiple Python sessions 
> (or in my case interactive windows) I can increase my throughput. Hence I can 
> download each year of the taxinyc dataset in separate interactive windows and 
> increase my bandwidth consumed.

I would say that this is always a problem.  In the datasets API the
goal is to maximize the resource usage within a single process.  Now,
it may be a known or expected problem :)

> Does the Dataset API make use of memory mapping? Do I have the correct 
> understanding that memory mapping is only intended for dealing with large 
> data stored on a local file system. Where as data stored on a cloud file 
> system in the feather format effectively cannot be memory mapped?

I think the benefits of memory mapping are rather subtle and often
misleading.  Datasets can make use of memory mapping for local
filesystems.  Doing so will, at best, have a slight performance
benefit (avoiding a memcpy) but would most likely decrease performance
(by introducing I/O where it is not expected) and it will have no
effect whatsoever on the amount of RAM used.

> This works as well as noted previosuly, so I assume the python operators are 
> mapped across similar to what happens when you use the operators against a 
> numpy or pandas series it just executes a np.multiply or pd. multiply in the 
> background.

Yes.  However the functions that get mapped can sometimes be
surprising.  Specifically, logical operations map to the _kleene
variation and arithmetic maps to the _checked variation.  You can find
the implementation at [1].  For multiplication this boils down to:

```
@staticmethod
cdef Expression _expr_or_scalar(object expr):
    if isinstance(expr, Expression):
        return (<Expression> expr)
    return (<Expression> Expression._scalar(expr))

...

def __mul__(Expression self, other):
    other = Expression._expr_or_scalar(other)
    return Expression._call("multiply_checked", [self, other])
```


On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <jacek.plis...@gmail.com> wrote:
>
> Re 2.   In Python Azure SDK there is logic for partial blob read:
>
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
>
> However I was unable to use it as it does not support parquet files
> with decimal columns and these are the ones I have.
>
> BR
>
> J
>
> pt., 16 wrz 2022 o 02:26 Aldrin <akmon...@ucsc.edu> napisaƂ(a):
> >
> > For Question 2:
> > At a glance, I don't see anything in adlfs or azure that is able to do 
> > partial reads of a blob. If you're using block blobs, then likely you would 
> > want to store blocks of your file as separate blocks of a blob, and then 
> > you can do partial data transfers that way. I could be misunderstanding the 
> > SDKs or how Azure stores data, but my guess is that a whole blob is 
> > retrieved and then the local file is able to support partial, block-based 
> > reads as you expect from local filesystems. You may be able to double check 
> > how much data is being retrieved by looking at where adlfs is mounting your 
> > blob storage.
> >
> > For Question 3:
> > you can memory map remote files, it's just that every page fault will be 
> > even more expensive than for local files. I am not sure how to tell the 
> > dataset API to do memory mapping, and I'm not sure how well that would work 
> > over adlfs.
> >
> > For Question 4:
> > Can you try using `pc.scalar(1000)` as shown in the first code excerpt in 
> > [1]:
> >
> > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> > >> pc.multiply(x, y)
> > <pyarrow.DoubleScalar: 72.54>
> >
> > [1]: 
> > https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> >
> > Aldrin Montana
> > Computer Science PhD Student
> > UC Santa Cruz
> >
> >
> > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <nikhilmaka...@gmail.com> wrote:
> >>
> >> Hi There,
> >>
> >> I have been experimenting with Tabular Datasets for data that can be 
> >> larger than memory and had a few questions related to what's going on 
> >> under the hood and how to work with it (I understand it is still 
> >> experimental).
> >>
> >> Question 1: Reading Data from Azure Blob Storage
> >> Now I know the filesystems don't fully support this yet, but there is an 
> >> fsspec compatible library (adlfs) which is shown in the file system 
> >> example which I have used. Example below with the nyc taxi dataset, where 
> >> I am pulling the whole dataset through and writing to disk to the feather 
> >> format.
> >>
> >> import adlfs
> >> import pyarrow.dataset as ds
> >>
> >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> >>
> >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
> >>
> >> scanner = dataset.scanner()
> >> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
> >>
> >> This could be something on the Azure side but I find I am being 
> >> bottlenecked on the download speed and have noticed if I spin up multiple 
> >> Python sessions (or in my case interactive windows) I can increase my 
> >> throughput. Hence I can download each year of the taxinyc dataset in 
> >> separate interactive windows and increase my bandwidth consumed. The 
> >> tabular dataset documentation notes 'optionally parallel reading.' Do you 
> >> know how I can control this? Or perhaps control the number of concurrent 
> >> connections. Or has this got nothing to do with the arrow and sits purley 
> >> on the Azure side? I have increased the io thread count from the default 8 
> >> to 16 and saw no difference, but could still spin up more interactive 
> >> windows to maximise bandwidth.
> >>
> >> Question 2: Reading Filtered Data from Azure Blob Storage
> >> Unfortunately I don't quite have a repeatable example here. However using 
> >> the same data above, only this time I have each year as a feather file 
> >> instead of a parquet file. I have uploaded this to my own Azure blob 
> >> storage account.
> >> I am trying to read a subset of this data from the blob storage by 
> >> selecting columns and filtering the data. The final result should be a 
> >> dataframe that takes up around 240 mb of memory (I have tested this by 
> >> working with the data locally). However when I run this by connecting to 
> >> the Azure blob storage it takes over an hour to run and it's clear it's 
> >> downloading a lot more data than I would have thought. Given the file 
> >> formats are feather that supports random access I would have thought I 
> >> would only have to download the 240 mb?
> >>
> >> Is there more going on in the background? Perhaps I am using this 
> >> incorrectly?
> >>
> >> import adlfs
> >> import pyarrow.dataset as ds
> >>
> >> connection_string = ''
> >> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
> >>
> >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
> >>
> >> df = (
> >>     ds_f
> >>     .scanner(
> >>         columns={ # Selections and Projections
> >>             'passengerCount': ds.field(('passengerCount'))*1000,
> >>             'tripDistance': ds.field(('tripDistance'))
> >>         },
> >>         filter=(ds.field('vendorID') == 1)
> >>     )
> >>     .to_table()
> >>     .to_pandas()
> >> )
> >>
> >> df.info()
> >>
> >> Question 3: How is memory mapping being applied?
> >> Does the Dataset API make use of memory mapping? Do I have the correct 
> >> understanding that memory mapping is only intended for dealing with large 
> >> data stored on a local file system. Where as data stored on a cloud file 
> >> system in the feather format effectively cannot be memory mapped?
> >>
> >> Question 4: Projections
> >> I noticed in the scanner function when projecting a column I am unable to 
> >> use any compute functions (I get a Type Error: only other expressions 
> >> allowed as arguments) yet I am able to multiply this using standard python 
> >> arithmetic.
> >>
> >> 'passengerCount': ds.field(('passengerCount'))*1000,
> >>
> >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
> >>
> >> Is this correct or am I to process this using an iterator via record batch 
> >> to do this out of core? Is it actually even doing it out of core with " 
> >> *1000 ".
> >>
> >> Thanks for your help in advance. I have been following the Arrow project 
> >> for the last two years but have only recently decided to dive into it in 
> >> depth to explore it for various use cases. I am particularly interested in 
> >> the out-of-core data processing and the interaction with cloud storages to 
> >> retrieve only a selection of data from feather files. Hopefully at some 
> >> point when I have enough knowledge I can contribute to this amazing 
> >> project.
> >>
> >> Kind regards
> >> Nikhil Makan

Reply via email to