Re: [Python][Arrow IPC] Identify if RecordBatchReader is actually loading data or not

2024-06-07 Thread Weston Pace
If you are using the IPC reader to read memory mapped files then the data
will not be read into memory until you access the data.  I'm not really
sure why 15GB files would act differently than 300GB files.

> since IPC identifies when a data source is clearly too big for memory and
doesn't try to load it to disk until the data is requested
> So, any idea on how I can identify when the stream reader is deciding to
load in-memory or out-of-memory batches?

The IPC reader does not have any special logic to detect when a data source
is clearly too big for memory.  If you read (and try to access) all
contents of a file that is bigger than memory then you will run out of
memory and crash (or start swapping).

On Fri, Jun 7, 2024 at 1:05 PM Kevin Crouse  wrote:

> Hey folks,
>
> I've been using arrow/pyarrow and am writing a multithreaded loader for
> arbitrary IPC files (RecordBatchStreamReader based on a MemoryMappedFile)
> that processes batches to calculate custom statistics without loading into
> memory - and I'm running into issues with IPC outsmarting me.
>
> I want to know how I can identify when StreaReader is actually reading the
> full batch from disk and when it's just lazily pulling the batch
> metadata and actually reading the batch. I know in Arrow C++ you can
> specify cache options and turn lazy off, but that isn't in the pyarrow
> interface (and I'm not sure that I would want it anyway) - I'm more
> interested in checking what the stream reader is doing rather than
> manipulating how it's doing it.
>
> Example: since IPC identifies when a data source is clearly too big for
> memory and doesn't try to load it to disk until the data is requested, I'm
> running into situations where:
> - For 15GB files, each iteration of the stream reader loads the batch from
> disk, which is a slow and IO intensive process, and the full naive
> iteration of just looping over the reader takes a minute or so - but since
> the stream reader locks on each batch fetch, it's effectively the minimum
> bound for calculating the statistics.
> - For 300GB files, each iteration of the stream reader just grabs the
> metadata and a data summary, so the full iteration of the file takes 3s.
>
> So, I have an algorithm that is extremely efficient for the 15GB, not-lazy
> context, because it balances the IO dependency of the batch reader with the
> various CPU-intensive workers.
>
> For the 300GB, this is sub-optimal because now the workers have become IO
> limited and are fighting with each other for the lock. I have a different
> algorithm that works better in that scenario.
>
> So, any idea on how I can identify when the stream reader is deciding to
> load in-memory or out-of-memory batches?
>
> I'm also already binding some to the C++ interface, so if it's not in
> pyarrow but in the C++ interface, I can work with that too.
>
> Thanks,
>
> Kevin
>
>


Re: [Python][C++] Chunked Storage of N-dim arrays

2024-04-23 Thread Weston Pace
I've worked quite a bit with tensor data recently and `arrow.Tensor` (or
just the underlying FixedSizeList) has worked well for me for an in-memory
representation.

> If you compress it, you have no means to decompress individual chunks,
from what I can tell from prototyping within Python.

Correct.  This is more of a storage concern and less of an in-memory
concern correct?  Or are you hoping to have these tensors compressed in
memory?

> You also cannot attach metadata to it.

Can you create a table with however many columns of metadata you want and
one tensor column?

> I do have an associated Table as each spectrum has metadata

Yes, this is how I would expect metadata to be represented.

> but if I split up the spectra to one per row, I end up with 10s of
millions of individual `numpy.ndarray` objects which causes a lot of
performance issues.

I'm not sure I understand what you are after here.  Normally I would
process such a table in batches.  Then, I would process that batch in
rows.  For each row I would convert to `numpy.ndarray`, do whatever
calculation I need to do, and then convert back to Tensor.  After
generating a batch of tensors I yield the batch and move on to the next
batch.  This kind of streaming execution should avoid having 10s of
millions of `numpy.ndarray` objects.

> I took a look at breaking up the array into a list of RecordBatch and
`RecordBatchStreamReader` doesn't seem to allow you to read only selected
indices, so no real chunking support.

What does your access pattern look like?  If you are planning on processing
all of the data at once you can do:

```
for batch in my_reader:
  for row in range(batch.num_rows):
...
```

However, if you need to jump in and modify a small subset of the total data
then that isn't going to work so well.


On Tue, Apr 23, 2024 at 10:45 AM Robert McLeod  wrote:

> Hi everyone,
>
> For a project I'm working on I've picked Arrow as the library and either
> Feather or Parquet as our storage format for our tabular data. However, I
> also have some hyperspectral data to serialize and I'd prefer not to add
> another big dependency if I can avoid it so I've been trying to make
> something in Arrow work for my application. Typically our
> hyperspectral data is [N, 4096]-shaped, where N is in the tens of millions.
>
> Initially I looked at `arrow.Tensor` via the IPC module but it seems a bit
> limited. You can memory-map it, if it's uncompressed. If you compress it,
> you have no means to decompress individual chunks, from what I can tell
> from prototyping within Python. You also cannot attach metadata to it.
>
> I do have an associated Table as each spectrum has metadata, but if I
> split up the spectra to one per row, I end up with 10s of millions of
> individual `numpy.ndarray` objects which causes a lot of performance
> issues. The data is contiguous, but I would have to write some C-extension
> to slice and view the data (which would be a pain to manage the reference
> counting) and there's still no means to partially load the data.
>
> I could create a Table with one column per chunk and one cell per column.
> This is clunky.
>
> I took a look at breaking up the array into a list of RecordBatch and
> `RecordBatchStreamReader` doesn't seem to allow you to read only selected
> indices, so no real chunking support.
>
> Or is there some other lightweight (not HDF5), cloud-friendly solution
> that I should be looking at?
>
> Sincerely,
> Robert
>
> --
> Robert McLeod
> robbmcl...@gmail.com
> robert.mcl...@hitachi-hightech.com
>
>


Re: rows reshuffled on join

2024-04-16 Thread Weston Pace
> Can someone confirm it?

I can confirm that the current join implementation will potentially reorder
input.  The larger the input the more likely the chance of reordering.

> I think that ordering is only guaranteed if it has been sorted.

Close enough probably.  I think there is an implicit order (the order of
the defined by the files in the dataset and the rows in those files, or the
original order when the input is in memory) that will be respected if there
are no joins or aggregates.

On Tue, Apr 16, 2024 at 8:19 AM Aldrin  wrote:

> I think that ordering is only guaranteed if it has been sorted.
>
> Sent from Proton Mail  for iOS
>
>
> On Tue, Apr 16, 2024 at 08:12, Jacek Pliszka  > wrote:
>
> Hi!
>
> I just hit a very strange behaviour.
>
> I am joining two tables with "left outer" join.
>
> Naively I would expect that the output rows will match the order of the
> left table.
>
> But sometimes the order of rows is different ...
>
> Can someone confirm it?
>
> I would expect this would be mentioned in the docs.
>
> I am using 12.0.1 due to Python 3.7 dependency.
>
> Best Regards,
>
> Jacek Pliszka
>
>
>
>


Re: [C++] Recommended way to extract values from scalars

2024-02-22 Thread Weston Pace
>> ultimately, these do end up being loops at the lower levels (unless
there's some hardware support, eg SIMD/GPU etc).

> Even if you don't write explicit SIMD, (1) the compiler might
> vectorize the loop for you, and (2) the superscalar nature of modern
> CPUs means loops with less branches and memory indirections will run
> faster.

Probably getting into the weeds at this point but my concern was less
branch/simd/etc. and more that `GetScalar` requires a heap allocation.

On Thu, Feb 22, 2024 at 10:55 AM Felipe Oliveira Carvalho <
felipe...@gmail.com> wrote:

> > these do end up being loops at the lower levels
>
> Even if you don't write explicit SIMD, (1) the compiler might
> vectorize the loop for you, and (2) the superscalar nature of modern
> CPUs means loops with less branches and memory indirections will run
> faster.
>
> > Now I just need to figure out the best way to do this over multiple
> columns (row-wise).
>
> You can usually turn loops that go row-by-row into loops that go
> column-by-column by maintaining selection vectors or bitmaps that you
> can use as masks to operations on the remaining columns.
>
> On Thu, Feb 22, 2024 at 1:39 PM Blair Azzopardi  wrote:
> >
> > Thanks @Weston and @Felipe. This information has been very helpful and
> thank you for the examples too. I completely agree with vectorizing
> computations; although, ultimately, these do end up being loops at the
> lower levels (unless there's some hardware support, eg SIMD/GPU etc).
> >
> > @Weston, I managed to iterate over my chunked array as you suggested
> (found some useful examples under the test cases) i.e
> >
> > std::vector values;
> > for (auto elem :
> arrow::stl::Iterate(*chunked_array)) {
> > if (elem.has_value()) {
> > values.push_back(*elem);
> > }
> > }
> >
> > @Felipe, I had to adjust your snippet somewhat to get it to work
> (perhaps the API is in flux). Eventually I did something like this:
> >
> > for (auto  : chunked_array->chunks()) {
> > auto  = chunk->data();
> > arrow::ArraySpan array_span(*data);
> > auto len = array_span.buffers[1].size /
> static_cast(sizeof(double));
> > auto raw_values = array_span.GetSpan(1, len);
> > // able to inspect (double)*(raw_values.data_ + N)
> > }
> >
> > Now I just need to figure out the best way to do this over multiple
> columns (row-wise).
> >
> > Thanks again!
> >
> >
> > On Tue, 20 Feb 2024 at 19:51, Felipe Oliveira Carvalho <
> felipe...@gmail.com> wrote:
> >>
> >> In a Vectorized querying system, scalars and conditionals should be
> >> avoided at all costs. That's why it's called "vectorized" — it's about
> >> the vectors and not the scalars.
> >>
> >> Arrow Arrays (AKA "vectors" in other systems) are the unit of data you
> >> mainly deal with. Data abstraction (in the OOP sense) isn't possible
> >> while also keeping performance — classes like Scalar and DoubleScalar
> >> are not supposed to be instantiated for every scalar in an array when
> >> you're looping. The disadvantage is that your loop now depends on the
> >> type of the array you're dealing with (no data abstraction based on
> >> virtual dispatching).
> >>
> >> > Also, is there an efficient way to loop through a slice perhaps by
> incrementing a pointer?
> >>
> >> That's the right path. Given a ChunkedArray, this what you can do:
> >>
> >> auto  = chunked_array->type();
> >> assert(dt->id() == Type::DOUBLE);
> >> for (auto  : chunked_array->chunks()) {
> >>// each chunk is an arrow::Array
> >>ArrayData  = chunk->data();
> >>util::span raw_values = data.GetSpan(1); // 1
> >> is the data buffer
> >>// ^ all the scalars of the chunk ara tightly packed here
> >>// 64 bits for every double even if it's logically NULL
> >> }
> >>
> >> If data.IsNull(i), the value of raw_values[i] is undefined, depending
> >> on what you're doing with the raw_values, you don't have to care.
> >> Compute functions commonly have two different loops: one that handles
> >> nulls and a faster one (without checks in the loop body) that you can
> >> use when data.GetNullCount()==0.
> >>
> >> Another trick is to compute on all the values and carry the same
> >> validity-bitmap to the result. Possible when the operation is based on
> >> each value independently of the

Re: [C++] Recommended way to extract values from scalars

2024-02-19 Thread Weston Pace
There is no advantage to using a Datum here.  The Datum class is mainly
intended for representing something that might be a Scalar or might be an
Array.

> Also, is there an efficient way to loop through a slice perhaps by
incrementing a pointer?

You will want to cast the Array and avoid Scalar instances entirely.  For
example, if you know there are no nulls in your data then you can use
methods like `DoubleArray::raw_values` which will give you a `double*`.
Since it is a chunked array you would also have to deal with indexing and
iterating the chunks.

There are also some iterator utility classes like
`arrow::stl::ChunkedArrayIterator` which can be easier to use.

On Mon, Feb 19, 2024 at 3:54 AM Blair Azzopardi  wrote:

> On 2nd thoughts, the 2nd method could also be done in a single line.
>
> auto low3 =
> arrow::Datum(st_s_low.ValueOrDie()).scalar_as().value;
>
> That said, I'm still keen to hear if there's an advantage to using Datum
> or without; and on my 2nd question regarding efficiently looping through a
> slice's values.
>
> On Mon, 19 Feb 2024 at 09:24, Blair Azzopardi  wrote:
>
>> Hi
>>
>> I'm trying to figure out the optimal way for extracting scalar values
>> from a table; I've found two ways, using a dynamic cast or using Datum and
>> cast. Is one better than the other? The advantage of the dynamic cast,
>> seems at least, to be a one liner.
>>
>> auto c_val1 = table.GetColumnByName("Val1");
>> auto st_c_val1 = s_low->GetScalar(0);
>> if (st_c_val1.ok()) {
>>
>> // method 1 - via dyn cast
>> auto val1 =
>> std::dynamic_pointer_cast(st_c_val1.ValueOrDie())->value;
>>
>> // method 2 - via Datum & cast
>> arrow::Datum val(st_c_val1.ValueOrDie());
>> auto val1 = val.scalar_as().value;
>> }
>>
>> Also, is there an efficient way to loop through a slice perhaps by
>> incrementing a pointer? I know a chunked array might mean that the
>> underlying data isn't stored contiguously so perhaps this is tricky to do.
>> I imagine the compute functions might do this. Otherwise, it feels each
>> access to a value in memory requires calls to several functions
>> (GetScalar/ok/ValueOrDie etc).
>>
>> Thanks in advance
>> Blair
>>
>


Re: Usage of Azure filesystem with fsspec and adlfs and pyarrow to download a list of blobs (parquets) concurrently with columns pruning and rows filtering

2023-12-04 Thread Weston Pace
 getting only the blobs which potentially have some data that we are
>> interested in (so inside the 1-hour granularity). The filters of pyarrow or
>> post-downloading are needed if we require a finer granularity below the 1
>> hour or with ranges below it.
>>
>
> I am not sure you do -  the filters the way you wrote them are applied
> after the file is downloaded, so you download files for all hours but read
> only for 8:00-8:30 - you can download only the files for 8th hour.
> If the half an hour is what you do - this will mean 24 times fewer files
> to read.
>
>
>> Another suggestion would be merging the files - either yourself or using
>> something like Delta Tables and vacuum/compact methods.
>>
>> We will assess this type of refactoring in the future, I believe. Can I
>> do something with the current state of having many small files?
>>
>>
> Others may give you better answers as I am not up to date but some time
> ago I used Azure SDK directly and downloaded small files to memory buffer:
>
> content = container.download_blob(path).readall() # container is
> azure.storage.blob.ContainerClient
> buffer = pa.BufferReader(content)
> pyarrow.parquet.read_table(buffer, columns=columns,filter=..)
> Then pa.concatenate_tables and combine_chunks
>
> I know it defeats the purpose of using adlfs and you have to parallelize
> it yourself but it worked well for me.
>
> BR,
>
> Jacek
>
>
>>
>> pt., 1 gru 2023 o 15:46 Weston Pace  napisał(a):
>>
>> Those files are quite small.  For every single file pyarrow is going to
>> need to read the metadata, determine which columns to read (column
>> filtering), determine if any of the rows need to be read (using row
>> filtering) and then actually issue the read.  If you combined all those
>> files into one file then I would expect better performance.  The ability to
>> read a single file in parallel is not going to be important here (each file
>> is very small).  However, you will want to make sure it is reading multiple
>> files at once.  I would expect that it is doing so but this would be a good
>> thing to verify if you can.
>>
>>
>>
>> One quick test you can always try is to run your script twice, at the
>> same time.  If the total runtime is significantly faster than running the
>> script twice, one after the other, then you can confirm that there are
>> unexploited resources on the system.
>>
>>
>>
>> It also looks like your data is partitioned by time and your filters are
>> time based filters.  You might want to investigate dataset partitioning as
>> that should be able to help.
>>
>>
>>
>> On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli 
>> wrote:
>>
>> I’m new to these libraries so bear with me, I am learning a lot these
>> days.
>>
>>
>>
>> I started using fsspec and adlfs with the idea of switching between a
>> cloud storage to a local storage with little effort. I read that adlfs
>> makes use of the Azure Blob Storage Python SDK which supports the use of
>> async/await pattern to implement concurrent IO.
>>
>> The Python SDK also exposes the max_concurrency argument in the
>> download_blob function, for instance, to enable the download of a single
>> blob with a thread pool (note: the single blob, I believe the use case here
>> is that if the blob is very big you can split the download in parallel with
>> this argument).
>>
>>
>>
>> Now I wish to use adlfs with pyarrow/pandas to download a list of blobs
>> (parquet) by exploiting the async methods of the Python SDK. Not knowing
>> the libraries and their integration, I hope this is already taken care of,
>> so I tried to code the following snippet:
>>
>>
>>
>> import pandas as pd
>>
>> import pyarrow.parquet as pq
>>
>> import adlfs
>>
>> import time
>>
>> CONNECTION_STRING = "my_connection_string"
>>
>> CONTAINER = "raw"
>>
>> FILEPATHS = [
>>
>> f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}
>> /file.parquet"
>>
>> for day in range(1, 31)
>>
>> for hour in range(24)
>>
>> ]
>>
>> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING)
>>
>> FILTERS = [
>>
>> [
>>
>> ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")),
>>
>> ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")),
>>
&

Re: Usage of Azure filesystem with fsspec and adlfs and pyarrow to download a list of blobs (parquets) concurrently with columns pruning and rows filtering

2023-12-01 Thread Weston Pace
Those files are quite small.  For every single file pyarrow is going to
need to read the metadata, determine which columns to read (column
filtering), determine if any of the rows need to be read (using row
filtering) and then actually issue the read.  If you combined all those
files into one file then I would expect better performance.  The ability to
read a single file in parallel is not going to be important here (each file
is very small).  However, you will want to make sure it is reading multiple
files at once.  I would expect that it is doing so but this would be a good
thing to verify if you can.

One quick test you can always try is to run your script twice, at the same
time.  If the total runtime is significantly faster than running the script
twice, one after the other, then you can confirm that there are unexploited
resources on the system.

It also looks like your data is partitioned by time and your filters are
time based filters.  You might want to investigate dataset partitioning as
that should be able to help.

On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli  wrote:

> I’m new to these libraries so bear with me, I am learning a lot these days.
>
>
>
> I started using fsspec and adlfs with the idea of switching between a
> cloud storage to a local storage with little effort. I read that adlfs
> makes use of the Azure Blob Storage Python SDK which supports the use of
> async/await pattern to implement concurrent IO.
>
> The Python SDK also exposes the max_concurrency argument in the
> download_blob function, for instance, to enable the download of a single
> blob with a thread pool (note: the single blob, I believe the use case here
> is that if the blob is very big you can split the download in parallel with
> this argument).
>
>
>
> Now I wish to use adlfs with pyarrow/pandas to download a list of blobs
> (parquet) by exploiting the async methods of the Python SDK. Not knowing
> the libraries and their integration, I hope this is already taken care of,
> so I tried to code the following snippet:
>
>
>
> import pandas as pd
>
> import pyarrow.parquet as pq
>
> import adlfs
>
> import time
>
> CONNECTION_STRING = "my_connection_string"
>
> CONTAINER = "raw"
>
> FILEPATHS = [
>
> f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}
> /file.parquet"
>
> for day in range(1, 31)
>
> for hour in range(24)
>
> ]
>
> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING)
>
> FILTERS = [
>
> [
>
> ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")),
>
> ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")),
>
> ]
>
> ]
>
> COLUMNS = ["col1", "col2", "col3"]
>
> start_time = time.time()
>
> dataset = pq.ParquetDataset(
>
> path_or_paths=FILEPATHS,
>
> filters=FILTERS,
>
> filesystem=fs,
>
> )
>
> elapsed_time = time.time() - start_time
>
> print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds")
>
> start_time = time.time()
>
> df = dataset.read_pandas(
>
> columns=COLUMNS
>
> ).to_pandas()
>
> elapsed_time = time.time() - start_time
>
> print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds")
>
> Each blob has around 3600 rows and 95 columns. It tries to download 720
> blobs in total. The final dataframe is 236404 rows x 95 columns with no
> columns/rows filtering.
>
> If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE 1).
> If I also enforce the rows filtering, it has 1544 rows x 95 columns (CASE
> 2).
>
>
>
> The timing of the cases is as follows:
>
>1.
>
> # Elapsed time for ParquetDataset: 0.886232 seconds
>
> # Elapsed time for read_pandas: 146.798920 seconds
>
>1.
>
> # Elapsed time for ParquetDataset: 0.298594 seconds
>
> # Elapsed time for read_pandas: 203.801083 seconds
>
>
>
> I was expecting the case 1 to be faster since from the timestamp only the
> first blob should be actually downloaded and read (AFAIK parquet is smart
> and it makes use of schema/metadata for the rows/columns filtering).
>
> I also was expecting case 2 to be faster in general: this is just a
> feeling (maybe I was expecting more from concurrent/parallel IO?).
>
>
>
> My question: Can I do something better w.r.t performances here? The
> parquet files are really smalls compared to other online examples of
> dealing with parquet files. Maybe I can tweak some pyarrow arguments?
>
>
>
> Thank you,
>
> Luca
>
>
> Luca Maurelli
>
> Data Scientist
> *__*
>
> [image: Camozzi Digital s.r.l.]
> Camozzi Digital s.r.l.
> Via Cassala 52
> 25126 Brescia (BS)
> ITALY
> *Phone:*
> *Fax:*
> *Mobile:*
> *Email:*  lmaure...@camozzi.com
> *Website:*  www.camozzidigital.com
> Questo messaggio di posta elettronica, comprensivo di eventuali allegati,
> è ad uso esclusivo di colui al quale è indirizzato e potrebbe contenere
> informazioni riservate. Nel caso in cui abbiate ricevuto questa
> comunicazione per errore, Vi invitiamo cortesemente a darcene notizia -
> contattando il mittente 

Re: [C++] [Arrow IPC] Efficient Multiple Reads

2023-11-08 Thread Weston Pace
You are correct that there is no existing capability to create an IPC
reader with precomputed metadata.  I don't think anyone would be opposed to
this feature, it just hasn't been a priority.

If you wanted to avoid changing arrow then you could create your own
implementation of `RandomAccessFile` which is partially backed by an
in-memory buffer and fetches from file when the reads go out of the
buffered range.  However, I'm not sure that I/O is the culprit.  Are you
reading from a local file?  If so, then the future reads would probably
already be cached by the OS (unless maybe you are under memory pressure).

Perhaps it is the CPU cost of processing the metadata that is slowing down
your reads.  If that is the case then I think a code change is inevitable.


On Wed, Nov 8, 2023 at 6:43 AM Ishbir Singh  wrote:

> Thanks for the info, Aldrin. I forgot to mention that the subsets of
> columns chosen is dynamic. Basically, I have a web server serving columns
> from the file. It wouldn’t make sense to rewrite the files for each query.
>
> I’m just looking for the easiest way to read the metadata as a buffer so I
> can pass it to the function below because I believe that should accomplish
> what I want.
>
> Thanks,
> Ishbir Singh
>
> W dniu wt., 7.11.2023 o 20:56 Aldrin  napisał(a):
>
>> In a probably too short answer, I think you want to do one of the
>> following:
>>
>> - write a single feather file with many batches
>> - write many feather files but using the dataset API to hopefully have
>> arrow do some multi-file optimization for you (and hopefully still have
>> multiple batches per file)
>> - write the schema in one file (or as few files as there are schemas) and
>> write many (N) recordbatches to fewer files (M) using the stream interface
>> (instead of file)
>>
>> I do the 3rd one and I do it because I made assumptions about data
>> accesses but I have not validated those assumptions. The main assumption
>> being that writing a RecordBatch with the stream API is not rewriting the
>> schema each time (or having equivalent amplification on the read side).
>>
>> Let me know if there's any approach you want more info on and I can
>> follow up or maybe someone else can chime in/correct me.
>>
>> Sent from Proton Mail  for iOS
>>
>>
>> On Tue, Nov 7, 2023 at 10:27, Ishbir Singh > > wrote:
>>
>> Apologies if this is the wrong place for this, but I'm looking to
>> repeatedly select a subset of columns from a wide feather file (which has
>> ~200k columns). What I find is that if I use RecordBatchReader::Open with
>> the requisite arguments asking it to select the particular columns, it
>> reads the schema over and over (once per Open call). Now that is to be
>> expected as there doesn't seem to be a way to pass a pre-existing schema.
>>
>> However, in my use case, I want the smaller queries to be fast and can't
>> have it re-parse the schema for every call. The input file thus has to be a
>> io::RandomAccesssFile. Looking at arrow/ipc/reader.h, the only method that
>> can serve this purpose seems to be:
>>
>> Result> ReadRecordBatch(
>> const Buffer& metadata, const std::shared_ptr& schema,
>> const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
>> io::RandomAccessFile* file);
>>
>> How do I efficiently read the file once to get the schema and metadata in
>> this case? My file does not have any dictionaries. Am I thinking about this
>> incorrectly?
>>
>> Would appreciate any pointers.
>>
>> Thanks,
>> Ishbir Singh
>>
>>


Re: Arrow with indirection?

2023-09-14 Thread Weston Pace
I'm not entirely sure what kinds of operations you need.

Arrow arrays all (with the exception of RLE) support constant time (O(1))
random access.  So generally if you want to keep a pointer to a particular
element or a row of data then that is ok.

On the other hand, you mentioned sorting.  One thing that is a little
challenging in arrow is swapping two rows of data.  It's very possible, and
still the same algorithmic complexity (O(# columns)) as a row based format
but it is not as memory efficient. Because you are doing a separate memory
swap for each array.

This is why arrow compute libraries will sometimes convert to a row based
format for certain operations.

On Thu, Sep 14, 2023, 8:21 AM Andrew Bell  wrote:

> Hi,
>
> We have a data structure that stores points in a point cloud (X, Y, Z,
> attributes) and we have been approached about replacing the current memory
> store with Arrow. The issue is that the current data store also has a set
> of pointers (indirection) that allows for things like subsetting and
> sorting while keeping the data in place. All data is accessed through the
> indirection table. What people typically want is to export one or more of
> these data sets specified by the pointers.
>
> My understanding is that Arrow doesn't support such a scheme as the point
> of the structure is to allow SIMD and other optimizations gained by
> processing contiguous data. Am I missing something in my reading of the
> Arrow docs? Does anyone have thoughts/recommendations, or is Arrow just not
> a good fit for this kind of thing?
>
> Thanks,
>
> --
> Andrew Bell
> andrew.bell...@gmail.com
>


Re: acero speed versus numpy

2023-08-22 Thread Weston Pace
How many rows match your timestamp criteria? In other words, how many rows
are you applying the function to?  If there is an earlier exact match
filter on a timestamp that only matches 1 (or a few rows) then I are you
sure the expression evaluation (and not the filtering) is the costly spot?

> But I expected that Acero would need to only visit columnar values once

I'm not sure what this means.  There has been very little work on
optimizing expression evaluation (most, if any, optimization work has
focused on optimizing the individual kernels themselves).  Acero will not
"fuse" the kernel and has no expression optimization.  If your expression
has 20 compute calls in it then Acero will make 20 passes over the data.
In fact, these won't even be in-place (e.g. 20 arrays will be initialized).

> Should I instead think of Acero as mainly about working on very large
datasets?

Yes.  At the moment I would expect that pyarrow compute kernels are more or
less as fast as the numpy variants (there are some exceptions, string
functions tend to be faster, some are slower, no one has done an exhaustive
survey).  Running these through Acero should have some overhead and give
you the ability to run on larger-than-memory data.  There is potential for
Acero to implement some clever tricks (like those I described earlier)
which might make it faster (instead of adding overhead).  However, I do not
know if anyone is working on these.

On Mon, Aug 21, 2023 at 6:35 PM Chak-Pong Chung 
wrote:

> Could you provide a script with which people can reproduce the problem for
> the performance comparison? That way we can take a closer look.
>
> On Mon, Aug 21, 2023 at 8:42 PM Spencer Nelson  wrote:
>
>> I'd like some help calibrating my expectations regarding acero
>> performance. I'm finding that some pretty naive numpy is about 10x faster
>> than acero for my use case.
>>
>> I'm working with a table with 13,000,000 values. The values are angular
>> positions on the sky and times. I'd like to filter to a specific one of the
>> times, and to values within a calculated great-circle distance on the sky.
>>
>> I've implemented the Vincenty formula (
>> https://en.wikipedia.org/wiki/Great-circle_distance) for this:
>>
>> ```
>> def pc_angular_separation(lon1, lat1, lon2, lat2):
>>  sdlon = pc.sin(pc.subtract(lon2, lon1))
>>  cdlon = pc.cos(pc.subtract(lon2, lon1))
>>  slat1 = pc.sin(lat1)
>>  slat2 = pc.sin(lat2)
>>  clat1 = pc.cos(lat1)
>>  clat2 = pc.cos(lat2)
>>
>>  num1 = pc.multiply(clat2, sdlon)
>>  num2 = pc.subtract(pc.multiply(slat2, clat1),
>> pc.multiply(pc.multiply(clat2, slat1), cdlon))
>>  denominator = pc.add(pc.multiply(slat2, slat1),
>> pc.multiply(pc.multiply(clat2, clat1), cdlon))
>>  hypot = pc.sqrt(pc.add(pc.multiply(num1, num1), pc.multiply(num2,
>> num2)))
>>  return pc.atan2(hypot, denominator)
>> ```
>>
>> The resulting pyarrow.compute.Expression is fairly monstrous:
>>
>> > atan2(sqrt(add(multiply(multiply(cos(Dec_deg), sin(subtract(RA_deg,
>> 168.9776949652776))), multiply(cos(Dec_deg), sin(subtract(RA_deg,
>> 168.9776949652776, multiply(subtract(multiply(sin(Dec_deg),
>> -0.9304510671785976), multiply(multiply(cos(Dec_deg), 0.3664161726591893),
>> cos(subtract(RA_deg, 168.9776949652776, subtract(multiply(sin(Dec_deg),
>> -0.9304510671785976), multiply(multiply(cos(Dec_deg), 0.3664161726591893),
>> cos(subtract(RA_deg, 168.9776949652776))), add(multiply(sin(Dec_deg),
>> 0.3664161726591893), multiply(multiply(cos(Dec_deg), -0.9304510671785976),
>> cos(subtract(RA_deg, 168.9776949652776)>
>>
>> Then my Acero graph is very simple. Just a table source node, then a
>> filter node on the timestamp (for exact match), and then another filter
>> node for a computed value of that expression under a threshold.
>>
>> For 13 million observations, this takes about 15ms on my laptop using
>> Acero.
>>
>> But the same computation done with totally naive numpy is about 3ms.
>>
>> The numpy version has no fanciness, just calling numpy trigonometric
>> functions and materializing all the intermediate results like you might
>> imagine, then eventually coming up with a boolean mask over everything and
>> calling `table.filter(mask)`.
>>
>> So finally, my question: is this about what I should expect? I know Acero
>> has an advantage that it *would* work if my data were larger than fits
>> in memory, which is not true of my numpy approach. But I expected that
>> Acero would need to only visit columnar values once, so it should be able
>> to outpace the numpy approach. Should I instead think of Acero as mainly
>> about working on very large datasets?
>>
>> -Spencer
>>
>
>
> --
> Regards,
> Chak-Pong
>


Re: [python] Diffing 2 large tables

2023-07-07 Thread Weston Pace
Arrow is a great in-memory representation to use for this kind of problem.
However, keep in mind that Arrow, the format, does not actually include
compute capabilities.  Much of your question is asking about things that
would normally be considered "compute operations" and the capabilities and
techniques are going to differ depending on which tools you are using (even
if all of the tools are using Arrow as the format).

> Have I used Arrow in the best possible way here?  Or are there better
ways to approach this problem?

This seems like a pretty reasonable approach.

>for idx, val in enumerate(table.column(key)):
>ptn = val.as_py() % nways
>partitions[ptn].append(idx)

For performance, you almost never want to do any kind of "per-row"
operation in python.  Ideally that will always be done using efficient
compute functions.  Regrettably, there is not yet a compute function for
modulo (for some reason this particular function seems to be cursed :)  If
you're ok with requiring that nways be a power of two you can use
bit_wise_and.

mask = nways - 1
partitions = pc.bit_wise_and(table.column(key), mask)

> result = []
> for idx, indexes in enumerate(partitions):
> mask = [False for i in range(0, len(table))]
> for i in indexes:
> mask[i] = True
> result.append(table.filter(mask))

Same thing here.  This for-loop is per-row and you want to try and avoid
that.  For example...

result = []
for ptn in nways:
  mask = pc.equal(partitions, ptn)
  result.append(table.filter(mask))

I don't know what compute.diff is.  So I don't know if it's implemented
efficiently or in python or not.

> I can see I have plenty of spare CPU capacity when this code is running
so I'd
> like to be able to diff all the partitions concurrently.  Does Arrow
provide any
> support for true multithreading so I can get around Python's limited
threading capabilities?

Yes, through Acero, but "Acero cannot process a join of this magnitude"
means that you probably cannot use this across partitions.  That being
said, all Arrow compute functions release the GIL when they are running.
So you should be able to use python threading and still get multithreading
benefits if you follow my above advice and move the critical sections into
compute functions.



On Fri, Jul 7, 2023 at 5:55 AM Adrian Mowat  wrote:

> Hi Jacek,
>
> Yes, I would be able to re-write the extracts from Postgres.  Is there an
> easy way to partition the results of a SQL query or would I need to write
> something?
>
> Many thanks
>
> Adrian
>
> On Fri, 7 Jul 2023 at 12:53, Jacek Pliszka 
> wrote:
>
>> Hi!
>>
>> If you have any influence over how data is dumped from postgres  - my
>> suggestion is to have it already partitioned then.
>>
>> This would make parallelization much easier,
>>
>> BR
>>
>> Jacek
>>
>> pt., 7 lip 2023 o 12:21 Adrian Mowat  napisał(a):
>> >
>> > Hi,
>> >
>> > TL;DR: newbie question.  I have an pyarrow program that uses
>> pyarrow.compute.diff to compare two 10M row tables.  In order to handle the
>> large volume, I partition my data into subsets by primary key and diff each
>> one individually.  It works but I don't think it performs as well as it
>> could and I'm wondering if there are better ways to solve this problem
>> >
>> > Full question:
>> >
>> > I'm new to Arrow and I'm running a proof of concept on using it to find
>> differences between 2 large data sets.  My use case is that we have a job
>> that dumps some postgres tables to S3 as JSON every night and I want to run
>> a batch job that compares one day's data and the next so we can send the
>> change sets to downstream systems in other parts of the company.  Some of
>> the tables have over 30M rows of data (and growing) so I need something
>> that is performant and can handle a large volume of data.
>> >
>> > For my proof of concept, I downloaded 10M rows of one of the tables
>> from 1 day and then another 10M from the next day.  I then made separate
>> "small" (100,000 rows) and "medium" (1M rows) sized subsets for
>> development.  If this exercise is successful and we decide to go ahead with
>> a project using Arrow, it will probably be written in Ruby because that's
>> the main language at my company.  I used Python for the proof of concept
>> because it seems better documented and more widely used so it should be
>> easier to find documentation etc.
>> >
>> > My first attempt was to load the data into Arrow tables and the use
>> compute.diff to find the differences.  This worked find for the small and
>> medium sized data sets but when I ran it against the "large" (10M) row data
>> my program failed with an error:
>> >
>> > There are more than 2^32 bytes of key data.  Acero cannot process a
>> join of this magnitude
>> >
>> > This is obviously a problem so I decided to partition the data in
>> subsets based on the primary key like this:
>> >
>> > def partition_by_key(table, key, nways):
>> > partitions = [[] for i in range(0, nways)]
>> >
>> > # 

Re: [C++][Acero] can Acero support distributed computation?

2023-07-07 Thread Weston Pace
Note that there are a few aggregates that cannot be distributed (e.g.
median).  Currently, the way aggregates are implemented in Acero, we
distribute across threads, and so we don't actually have an implementation
for these aggregates anyways so probably a bit of a moot point (although
there is some work being done to support those types aggregates in certain
workloads).

> Since acero's aggregate kernels already maintain such
> intermediate status internally, I wonder if it is possible
> to have some APIs in aggregate kernels to retrieve these
> intermediate status to enable such use scenarios. Thanks.

One potential challenge is that there is no guarantee the intermediate
state is representable as Arrow data.  However, I don't know of any that
are not at the moment.

One possible API would be to add an "output_intermediate" option to the
AggregateNodeOptions.  If set to true then, instead of Finalize, we call
some other aggregate method FinalizeIntermediate.  This would require
changes to the aggregate kernels (they will need to add this new method).
They will need to convert their state into an exec batch / scalar.  I don't
think this is a ton of work though so it could be doable if someone were
motivated enough.

> 2) a materialized view that stores the intermediate status
> for aggregation, so that partial aggregated results (the
> intermediate status of aggregation) is stored in materialized
> view on disk, it will be faster when reading the materialized
> view since only the `finalize` computation is needed to get the results

I'm not sure I understand the value in storing the intermediate result as a
materialized view instead of just storing the finalized computation.

--

Note that, in addition to aggregation, sorting is another operation that
typically needs a special node when used in a distributed solution.
Typically each partition-worker sorts with a regular sort node.  Then the
final stage uses an "order preserving merge" to read in the data from the
various inputs.  This is sometimes a dedicated node and sometimes a
consideration of the "exchange/capture" node depending on the engine.

On Fri, Jul 7, 2023 at 1:19 AM Jiangtao Peng  wrote:

> Hi Sasha,
>
> So far we have two use scenarios that may need the intermediate status of
> aggregate kernels during its consumption:
> 1) a shuffle-free single stage distributed query engine. We have our data
> partitioned and stored in multiple nodes, and would like to create a query
> plan with multiple fragments and retrieves partitioned data from all these
> nodes in parallel for better performance. Data shuffling is non trivial to
> implement for us, and we are looking for an approach that is simpler to
> implement. For aggregation query, one way to do it seems to: split the
> aggregation into pre-aggregation and finalize/combine two steps. For
> pre-aggregation, the aggregation operator only consumes the data and stores
> the intermediate results internally. For the `finalize/combine` step, it
> combines multiple partitioned intermediate results as the final result.
>
> 2) a materialized view that stores the intermediate status for
> aggregation, so that partial aggregated results (the intermediate status of
> aggregation) is stored in materialized view on disk, it will be faster when
> reading the materialized view since only the `finalize` computation is
> needed to get the results
>
> Although for some aggregation kernel such as `avg`, we could use two
> existing aggregate kernels (sum/count) to manually maintain such
> intermediate status, but it requires developers to understand how these
> kernels are implemented internally, which is probably not easy and new
> aggregate kernels may be added in the future.
>
> Since acero's aggregate kernels already maintain such intermediate status
> internally, I wonder if it is possible to have some APIs in aggregate
> kernels to retrieve these intermediate status to enable such use scenarios.
> Thanks.
>
>
>
> Jiangtao
>
>
>
> *From: *Sasha Krassovsky 
> *Date: *Friday, July 7, 2023 at 2:21 PM
> *To: *user@arrow.apache.org 
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> Yes, what you’ve said is correct for Mean. But my point earlier is that
> there should only be a few of such special cases. A simple case would be
> e.g. Max, where Aggregate outputs Max and then merge outputs Max(Max).
>
>
>
> Sasha
>
>
>
> 6 июля 2023 г., в 23:13, Jiangtao Peng  написал(а):
>
> 
>
> Sorry for my unclear expression.
>
>
>
> Take mean aggregation as example, does Aggregate "output" sum and count
> value, and Accumulate will "input" sum and count value, then "merge"
> sum(sum)/sum(count) as "output"?
>
> My point is how to implement Pre-Aggregation and Post-Aggregation using
> Acero.
>
>
>
> Best,
>
> Jiangtao
>
>
>
>
>
> *From: *Sasha Krassovsky 
> *Date: *Friday, July 7, 2023 at 1:25 PM
> *To: *user@arrow.apache.org 
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> 

Re: [Java][Parquet] Bulk Read Performance

2023-07-03 Thread Weston Pace
For reference, these are the scripts I used.

## To generate data
```
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

NUM_COLUMNS = 61
NUM_ROWS = 2048
NUM_FILES = 4096
DATA_DIR = "/home/pace/dev/data/small_files"

rng = np.random.default_rng()

for file_index in range(NUM_FILES):
arrays = []
names = []
for column_index in range(NUM_COLUMNS):
arrays.append(rng.random(NUM_ROWS))
names.append(f"col{column_index}")
table = pa.Table.from_arrays(arrays, names=names)
pq.write_table(table, f"{DATA_DIR}/file_{file_index}.parquet")
```

## To then read the data
```
import time

import pyarrow.dataset as ds

DATA_DIR = "/home/pace/dev/data/small_files"

my_dataset = ds.dataset(DATA_DIR)
start = time.time()
my_table = my_dataset.to_table()
end = time.time()

print(f"Loaded a table with {my_table.num_rows} rows and
{my_table.num_columns} columns in {end - start} seconds")
```

On Mon, Jul 3, 2023 at 7:54 AM Weston Pace  wrote:

> Those files are a bit smaller than ideal but not small enough that I would
> expect anything like the performance you are getting.
>
> I can't speak for the Java implementation, I know very little about it.
> However, I have generated some test data that matches your description and
> loaded it with python.  It took ~2 seconds to read all 4096 files.  This is
> on a pretty standard desktop with 8 cores (16 threads).  In fact, even if I
> restrict things to a single core, it only takes about 9 seconds.
>
> So no, this is not the performance I would expect.  I don't think the fix
> is simply a matter of optimizing certain parameters.  We are missing
> something.
>
> Have you tried the "Query Data Content For Directory" example?[1]  Would
> you be able to generate some kind of profiling or flame chart?
>
> [1] https://arrow.apache.org/cookbook/java/dataset.html#id11
>
> On Sun, Jul 2, 2023 at 1:13 PM Paulo Motta 
> wrote:
>
>> Each file has 1-2MB with 1 row group each, around 2000 rows per file and
>> 61 columns - a total of 7697842 rows. Is this performance expected for this
>> dataset or is there any suggestion to optimize?
>>
>> Thanks!
>>
>> On Sat, Jul 1, 2023 at 10:44 PM Weston Pace 
>> wrote:
>>
>>> What size are the row groups in your parquet files?  How many columns
>>> and rows in the files?
>>>
>>> On Sat, Jul 1, 2023, 6:08 PM Paulo Motta 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to read 4096 parquet files with a total size of 6GB using
>>>> this cookbook:
>>>> https://arrow.apache.org/cookbook/java/dataset.html#query-parquet-file
>>>>
>>>> I'm using 100 threads, each thread processing one file at a time on a
>>>> 72 core machine with 32GB heap. The files are pre-loaded in memory.
>>>>
>>>> However it's taking about 10 minutes to process these 4096 files with a
>>>> total size of only 6GB and the process seems to be cpu-bound.
>>>>
>>>> Is this expected read performance for parquet files or am I
>>>> doing something wrong? Any help or tips would be appreciated.
>>>>
>>>> Thanks,
>>>>
>>>> Paulo
>>>>
>>>


Re: [Java][Parquet] Bulk Read Performance

2023-07-03 Thread Weston Pace
Those files are a bit smaller than ideal but not small enough that I would
expect anything like the performance you are getting.

I can't speak for the Java implementation, I know very little about it.
However, I have generated some test data that matches your description and
loaded it with python.  It took ~2 seconds to read all 4096 files.  This is
on a pretty standard desktop with 8 cores (16 threads).  In fact, even if I
restrict things to a single core, it only takes about 9 seconds.

So no, this is not the performance I would expect.  I don't think the fix
is simply a matter of optimizing certain parameters.  We are missing
something.

Have you tried the "Query Data Content For Directory" example?[1]  Would
you be able to generate some kind of profiling or flame chart?

[1] https://arrow.apache.org/cookbook/java/dataset.html#id11

On Sun, Jul 2, 2023 at 1:13 PM Paulo Motta  wrote:

> Each file has 1-2MB with 1 row group each, around 2000 rows per file and
> 61 columns - a total of 7697842 rows. Is this performance expected for this
> dataset or is there any suggestion to optimize?
>
> Thanks!
>
> On Sat, Jul 1, 2023 at 10:44 PM Weston Pace  wrote:
>
>> What size are the row groups in your parquet files?  How many columns and
>> rows in the files?
>>
>> On Sat, Jul 1, 2023, 6:08 PM Paulo Motta 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read 4096 parquet files with a total size of 6GB using
>>> this cookbook:
>>> https://arrow.apache.org/cookbook/java/dataset.html#query-parquet-file
>>>
>>> I'm using 100 threads, each thread processing one file at a time on a 72
>>> core machine with 32GB heap. The files are pre-loaded in memory.
>>>
>>> However it's taking about 10 minutes to process these 4096 files with a
>>> total size of only 6GB and the process seems to be cpu-bound.
>>>
>>> Is this expected read performance for parquet files or am I
>>> doing something wrong? Any help or tips would be appreciated.
>>>
>>> Thanks,
>>>
>>> Paulo
>>>
>>


Re: [Java][Parquet] Bulk Read Performance

2023-07-01 Thread Weston Pace
What size are the row groups in your parquet files?  How many columns and
rows in the files?

On Sat, Jul 1, 2023, 6:08 PM Paulo Motta  wrote:

> Hi,
>
> I'm trying to read 4096 parquet files with a total size of 6GB using this
> cookbook:
> https://arrow.apache.org/cookbook/java/dataset.html#query-parquet-file
>
> I'm using 100 threads, each thread processing one file at a time on a 72
> core machine with 32GB heap. The files are pre-loaded in memory.
>
> However it's taking about 10 minutes to process these 4096 files with a
> total size of only 6GB and the process seems to be cpu-bound.
>
> Is this expected read performance for parquet files or am I
> doing something wrong? Any help or tips would be appreciated.
>
> Thanks,
>
> Paulo
>


Re: InputSplit support for parquet reads using Arrow

2023-06-07 Thread Weston Pace
I'm not familiar with the "parquet block size".  However, you can use row
groups to accomplish this task.  You could write a single 10GB file with 5
row groups.  Then, when reading, the arrow readers allow you to specify
which row groups you would like to read.

On Wed, Jun 7, 2023 at 6:13 AM Sanskar Modi  wrote:

> Hi everyone,
>
> We have a use case where we're writing a parquet file to a remote server
> and we want to read this parquet file using arrow. But we want multiple
> hosts to read splits of the parquet file based on parquet block size.
>
> Ex: If the parquet file size is 10 GB, we want 5 hosts to read a 2 GB
> split of the parquet file. This is possible if we read via native
> ParquetReader but from documentation, it is not clear if arrow readers
> support this. Can someone help with this?
>
> Regards,
> Sanskar Modi
>


Re: [Arrow IPC] memory mapping of compressed file / lazy reading

2023-05-26 Thread Weston Pace
Thanks for the clarification, I understand your use case better now.  You
are right that memory mapping can be used in this way you describe.

> why does it decompresses the data here ? For me it is doing a unnecessary
copy
> by transforming a compressed record batch into a uncompressed record
batch.
> Could not a table (or a record batch) be composed of array with
compressed buffer ?

I think what you are describing is possible with the Arrow format.  You
could wait to do decompression until the buffer is first accessed.
However, as you have noticed, it is not the way the Arrow-C++ library is
currently implemented.

You can achieve something similar by not loading the column until you
actually need it.  First, load an empty table (0 columns) from the Arrow
file (so you can get the schema and length) and create a lazy dataframe.
Then, when the column is accessed, you can create another Arrow reader and
read just that column.

So I suspect this is simply a design decision.  It should be possible to
build what you are describing but it is not how things are currently
implemented.  One caution I would have with memory mapping in this way is
that the disk load becomes something of a hidden cost that is harder to
reason about.  It's also more difficult to manage how much memory you are
currently using because it depends on the user's access patterns.  Maybe
deleting an array frees up ram and maybe it doesn't?

On Thu, May 25, 2023 at 2:18 PM Frédéric MASSON 
wrote:

> Hi,
>
> Thank you very much for your answer, I am sorry if some sentences are
> confusing.
> I did not know about the kernel space/user space and that memory mapping
> I/O was more general than just file memory mapping. I got a better
> understanding now.
> So I looked a bit deeper inside memory mapping (
> https://en.wikipedia.org/wiki/Mmap and
> https://www.ibm.com/docs/en/aix/7.2?topic=memory-understanding-mapping).
>
> I agree the term "lazy" can be ambiguous. I was talking about "lazy
> reading" (which is memory mapping a file for me) and not "lazy computing."
>
> > What is it that you would like to achieve with Arrow?
>
> The ideal use case would be having a Pandas DataFrame object with data
> directly on the file and not in memory : a DataFrame but with a backend
> that only actually read the necessary data in the file and only when needed.
> I have found that memory mapping a file is a very efficient way to open a
> file and to access its data partially. For me it is an essential part of a
> the "zero-copy" strategy. Especially with the last-gen SSD.
> Since Arrow allows to share memory between libraries without copy I
> thought I would be able to have a pandas dataFrame without actually loading
> the data.
>
> And I actually found a way to have a Pandas dataframe with all the data
> memory mapped from the disk with the "types_mapper=pd.ArrowDtype" argument
> :
>
> with pa.memory_map('table.arrow', 'rb') as source:
> df =
> pa.ipc.open_file(source).read_all().to_pandas(types_mapper=pd.ArrowDtype)
>
>
> To go further I wanted to do the same with a compressed arrow file. But I
> did not success. When reading batch records, the Arrow library *always 
> *uncompress
> the buffers inside the batch record.
>
> I looked a bit into the Arrow sources files on github and I think it comes
> from the function "LoadRecordBatchSubset" (
> https://github.com/apache/arrow/blob/2d32efeedad88743dd635ff562c65e072cfb44f7/cpp/src/arrow/ipc/reader.cc#L522
> ).
> This function just "move" all the columns unless their data are compressed
> in which case it decompresses the data and thus loads it into memory (with
> DecompressBuffers function).
>
> So my question is  : why does it decompresses the data here ? For me it is
> doing a unnecessary copy by transforming a compressed record batch into a
> uncompressed record batch.
> Could not a table (or a record batch) be composed of array with compressed
> buffer ?
>
> Thank you again,
>
> Fred
>
>
> On 22/05/2023 at 22:00, Weston Pace wrote:
>
> Well, I suppose there are cases where you can map a file with memory
> mapped I/O and then, if you are careful not to touch those buffers, they
> might not be loaded into memory.  However, that is a very difficult thing
> to achieve.  For example, when reading a file we need to access the
> metadata that is scattered throughout the file.  This will trigger that
> data to be loaded.  The OS will then also typically load some amount of
> memory ahead of the data you requested. Also, it's very easy to trigger
> some kind of scan through the data (e.g. to count how many nulls there are)
> which might cause that data to be loaded.
>
> But I was inaccurate in my earlier s

Re: [Arrow IPC] memory mapping of compressed file / lazy reading

2023-05-22 Thread Weston Pace
Well, I suppose there are cases where you can map a file with memory mapped
I/O and then, if you are careful not to touch those buffers, they might not
be loaded into memory.  However, that is a very difficult thing to
achieve.  For example, when reading a file we need to access the metadata
that is scattered throughout the file.  This will trigger that data to be
loaded.  The OS will then also typically load some amount of memory ahead
of the data you requested. Also, it's very easy to trigger some kind of
scan through the data (e.g. to count how many nulls there are) which might
cause that data to be loaded.

But I was inaccurate in my earlier statement, it is possible to use memory
mapped I/O alone to achieve some kinds of lazy loading.  I suppose that is
why read_table gets faster in your benchmark (I missed that).  It will
still need to read some data (all of the metadata for example) from disk.
I guess I am a little surprised by 4.6s but we could dig into that.

Also, compression will force the data to be loaded because of the way
read_table works.

I think most current users achieve lazy loading by selectively loading the
data they need and not by loading the entire table with memory mapping and
avoiding access to the data they don't need.

On Mon, May 22, 2023 at 12:51 PM Weston Pace  wrote:

> I'm a little bit confused on the benchmark.  The benchmark is labeled
> "open file" and yet "read_table" will read the entire file into memory.  I
> don't think your other benchmarks are doing this (e.g. they are not reading
> data into memory).
>
> As for the questions on memory mapping, I have a few answers below, but I
> will give a general answer here.  Memory mapped I/O will, at best, save you
> from one memcpy of the data from kernel space to user space.  Memory
> mapping is not the same as a "lazy dataframe".  If you ask Arrow to read a
> file then it will always load that file off of the disk and into memory.
> This is true if you used memory mapped I/O or not.  If you ask it to load a
> single column, then it will not load the entire file, but instead load a
> single column.  There are many other libraries that add "lazy dataframe"
> capabilities on top of Arrow files.
>
> What is it that you would like to achieve with Arrow?
>
> > According to the benchmark, the fonction to_pandas is loading all the
> data into memory.
> Do you agree or did I miss something ?
>
> Yes.  to_pandas will load the entire file into memory.
>
> > When you open an Arrow IPC file with memory mapping and add a column,
> does it write the column on disk ?
>
> If you open any existing file with memory mapping it's generally assumed
> it will be read only.  In theory, you could memory map a larger space, and
> then write into it over time, but none of the core Arrow utilities are
> going to do anything like that.
>
> > When opening a compressed Arrow IPC file, what does memory mapping means
> ? What is the difference with opening the same file without memory mapping ?
>
> This means that you will be able to avoid a memcpy of the compressed bytes
> from kernel space to user space.
>
> On Sun, May 21, 2023 at 10:32 AM Frédéric MASSON <
> masson-frede...@hotmail.fr> wrote:
>
>> Hello everyone,
>>
>> For several years I have been working with HDF5 files to store/load
>> information and pandas as in-memory representation to analyze them.
>> Globally the data can be of variable sizes (from a few MB to 10GB). I use
>> the dataframes inside interactive tools (with a GUI, where the data access
>> is quite random) and non-interactive tools (scripts), everything is in
>> Python but the files could be opened in other languages. The typical use
>> case is to get only some columns of the file, doing some operations on them
>> and plot the result. Since the files are quite big, data compression is
>> quite important for me to save disk-space. However writing duration is not
>> very important.
>> Of course, for the big files I faced the same performances issues as a
>> lot of people :
>> 1. Access some columns with a row oriented file is quite inefficient
>> 2. loading 10GB of data into memory is long, generally not necessary and
>> can be larger than RAM capacity on some machines.
>>
>> In order to face this issues, I came to a simple conclusion :
>> 1. The memory should be column oriented
>> 2. The in-memory layout should be the same as the on-disk memory. I am
>> very interested in memory mapping since it allows me access files very
>> quickly (there is no loading time) and open file larger than memory.
>>
>> The solution I implemented is quite simple
>> 1. I compress the data inside a HDF5 dataset with vertica

Re: [Arrow IPC] memory mapping of compressed file / lazy reading

2023-05-22 Thread Weston Pace
I'm a little bit confused on the benchmark.  The benchmark is labeled "open
file" and yet "read_table" will read the entire file into memory.  I don't
think your other benchmarks are doing this (e.g. they are not reading data
into memory).

As for the questions on memory mapping, I have a few answers below, but I
will give a general answer here.  Memory mapped I/O will, at best, save you
from one memcpy of the data from kernel space to user space.  Memory
mapping is not the same as a "lazy dataframe".  If you ask Arrow to read a
file then it will always load that file off of the disk and into memory.
This is true if you used memory mapped I/O or not.  If you ask it to load a
single column, then it will not load the entire file, but instead load a
single column.  There are many other libraries that add "lazy dataframe"
capabilities on top of Arrow files.

What is it that you would like to achieve with Arrow?

> According to the benchmark, the fonction to_pandas is loading all the
data into memory.
Do you agree or did I miss something ?

Yes.  to_pandas will load the entire file into memory.

> When you open an Arrow IPC file with memory mapping and add a column,
does it write the column on disk ?

If you open any existing file with memory mapping it's generally assumed it
will be read only.  In theory, you could memory map a larger space, and
then write into it over time, but none of the core Arrow utilities are
going to do anything like that.

> When opening a compressed Arrow IPC file, what does memory mapping means
? What is the difference with opening the same file without memory mapping ?

This means that you will be able to avoid a memcpy of the compressed bytes
from kernel space to user space.

On Sun, May 21, 2023 at 10:32 AM Frédéric MASSON 
wrote:

> Hello everyone,
>
> For several years I have been working with HDF5 files to store/load
> information and pandas as in-memory representation to analyze them.
> Globally the data can be of variable sizes (from a few MB to 10GB). I use
> the dataframes inside interactive tools (with a GUI, where the data access
> is quite random) and non-interactive tools (scripts), everything is in
> Python but the files could be opened in other languages. The typical use
> case is to get only some columns of the file, doing some operations on them
> and plot the result. Since the files are quite big, data compression is
> quite important for me to save disk-space. However writing duration is not
> very important.
> Of course, for the big files I faced the same performances issues as a lot
> of people :
> 1. Access some columns with a row oriented file is quite inefficient
> 2. loading 10GB of data into memory is long, generally not necessary and
> can be larger than RAM capacity on some machines.
>
> In order to face this issues, I came to a simple conclusion :
> 1. The memory should be column oriented
> 2. The in-memory layout should be the same as the on-disk memory. I am
> very interested in memory mapping since it allows me access files very
> quickly (there is no loading time) and open file larger than memory.
>
> The solution I implemented is quite simple
> 1. I compress the data inside a HDF5 dataset with vertical chunks (nrows x
> 1) with the Blosc compressor (not Blosc2). HDF5 is a great container for
> data, that allow to chunk data with the shape the user want. Vertical chunk
> allows to decompress each column individually without decompressing the
> others. Inside the file, the columns names are stored inside the
> user-defined metadata of the dataset.
> 2. With h5py I just open the HDF5 file and manipulate the h5py dataset
> object without reading its content. This way, I am doing a "memory-map" of
> a compressed file (or a "lazy" access I guess). When I access to a column,
> then the h5py actually reads and decompress the data on-the-fly but is
> totally transparent for me. This is not a zero-copy mechanism but I can
> access the data copying only the interesting data.
>
> The main goal with this "solution" is to reduce the time when a user opens
> a file and to reduce a lot the RAM usage.
>
> In order to access the columns with their names I made a small python
> library with a class that redefines the access operators. It is not a very
> handy library and right now I am considering transforming this class into a
> Pandas ExtensionArray. I am not sure but I think it would allow me to use
> the pandas dataframe class on a h5py dataset instead of a numpy array.
>
> I am also considering using Apache Arrow instead. That is why, recently I
> have been busy reading the Arrow documentation, the format specification
> and some blog articles. I must say that this library seems wonderful, I
> particularly love the fact that it tackle the problem of copying data and
> it is available in several languages. The zero-copy policy is exactly what
> I am looking for ! I also like the general format allowing to have columns
> of different types, nested columns and 

Re: [C++][Parquet] Best practice to write duplicated strings / enums into parquet

2023-05-22 Thread Weston Pace
Arrow can also represent dictionary encoding.  If you like StringBuilder
then there is also a StringDictionaryBuilder which should be more or less
compatible:

TEST(TestStringDictionaryBuilder, Basic) {
  // Build the dictionary Array
  StringDictionaryBuilder builder;
  ASSERT_OK(builder.Append("RED"));
  ASSERT_OK(builder.Append("GREEN"));
  ASSERT_OK(builder.Append("RED"));

  std::shared_ptr result;
  ASSERT_OK(builder.Finish());

  // Build expected data
  auto ex_dict = ArrayFromJSON(utf8(), "[\"RED\", \"GREEN\"]");
  auto dtype = dictionary(int8(), utf8());
  auto int_array = ArrayFromJSON(int8(), "[0, 1, 0]");
  DictionaryArray expected(dtype, int_array, ex_dict);

  ASSERT_TRUE(expected.Equals(result));
}

If your encoding is standard (e.g. you must always represent "RED" with 1
and "GREEN" with 0) then you can use InsertMemoValues to establish your
encoding first:

TEST(TestStringDictionaryBuilder, Basic) {
  auto values = ArrayFromJSON(utf8(), R"(["GREEN", "RED"])");

  // Build the dictionary Array
  StringDictionaryBuilder builder;
  ASSERT_OK(builder.InsertMemoValues(*values));
  ASSERT_OK(builder.Append("RED"));
  ASSERT_OK(builder.Append("GREEN"));
  ASSERT_OK(builder.Append("RED"));

  std::shared_ptr result;
  ASSERT_OK(builder.Finish());

  // Build expected data
  auto ex_dict = ArrayFromJSON(utf8(), "[\"GREEN\", \"RED\"]");
  auto dtype = dictionary(int8(), utf8());
  auto int_array = ArrayFromJSON(int8(), "[1, 0, 1]");
  DictionaryArray expected(dtype, int_array, ex_dict);

  ASSERT_TRUE(expected.Equals(result));
}

On Mon, May 22, 2023 at 8:53 AM Haocheng Liu  wrote:

> Hi,
>
> I have a use case which can be simplified as there are {0-> "RED",
> 1->"GREEN":1, 2->"BLUE", etc} and I need to write them hundreds of millions
> of times. In each row,  there may be tens of int -> string maps. When user
> read the data, they want to see "RED", "GREED" and "BLUE" rather than some
> unclear int.
>
> According to the doc
> ,
> dictionary encoding is enabled by default so there are two possible
> solutions:
>
> 1. Write strings via a stringBuilder and let Arrow do the encoding under
> the hood.
> 2. Write enums(int) and provide the encoding in metadata(?).
>
> Option 2 sounds preferred to me as it does not require expensive string
> comparison and possible string copy. Can folks please guide on  if my
> understanding is correct. If so, how to provide the int->string mapping in
> metadata? If not, what's the best practice here?
>
> Thanks in advance.
>
> Regards,
> Haocheng Liu
>
>
>


Re: Efficient storage of metadata

2023-05-19 Thread Weston Pace
Spencer's answer is good.  I'll just add one minor clarification.  A
MapArray is what you want, but it is a List> and not
just Struct.  This allows you to have many rows of metadata
key/pairs for each row of your structured data.

On Thu, May 18, 2023 at 5:41 PM Spencer Nelson  wrote:

> You could use a struct with two fields: a string key, and a string value
> (or union-typed value if you want multiple legal types).
>
> That’s what the pyarrow MapArray type is, under the hood; if you’re in
> Python then using map would be simple and convenient.
>
> On Thu, May 18, 2023 at 5:07 PM Robert McLeod 
> wrote:
>
>> What is considered best practice for storage of fairly large quantities
>> of meta-data within Arrow? For the sake of simplicity I'll define metadata
>> as key-value pairs, in this case stored in a 2-column SQL table, but it
>> equally could be
>> ZjQcmQRYFpfptBannerStart
>> This Message Is From an Untrusted Sender
>> You have not previously corresponded with this sender.
>> See https://itconnect.uw.edu/email-tags for additional information.
>> Please contact the UW-IT Service Center, h...@uw.edu 206.221.5000, for
>> assistance.
>>
>> ZjQcmQRYFpfptBannerEnd
>> What is considered best practice for storage of fairly large quantities
>> of meta-data within Arrow? For the sake of simplicity I'll define metadata
>> as key-value pairs, in this case stored in a 2-column SQL table, but it
>> equally could be a Python dict or similar.
>>
>> Arrow has a dictionary class but it doesn't appear to permit the type of
>> the value to be mutable. Since it seems to be some sort of std map I
>> discounted it as a solution, since I have int, float, and string values.
>>
>> My current thought would be to serialize the data to JSON and write it
>> into a `ListArray` with bytes type, with one element in the ListArray for
>> every JSON blob I have. It feels a bit kludge but it does work. Is this a
>> reasonable approach, or is there a better solution?
>>
>> Robert
>>
>>
>> --
>> Robert McLeod
>> robbmcl...@gmail.com
>> robert.mcl...@hitachi-hightech.com
>>
>>


Re: [C++][Parquet] Extend RcordBatchReader to write parquet from streaming data

2023-05-19 Thread Weston Pace
> Can anyone guide what's the best practice here and if my
below understandings are correct:

Here's an example:
https://gist.github.com/westonpace/6f7fdbdc0399501418101851d75091c4

> I receive streaming data via a callback function which gives me data row
by row. To my best knowledge, Subclassing RecordBatchReader is preferred?

RecordBatchReader is pull-based.  A friendly push-based source node for
Acero would be a good idea, but doesn't exist.  You can make one using a
PushGenerator but that might be complicated.  In the meantime, subclassing
RecordBatchReader is probably simplest.

> Should I batch a fixed number rows in some in memory data structure
first, then flush them to acero?

Yes, for performance.

> Then how could acero know it's time to push data in ReadNext

 function?

ReadNext is pull based.  Acero will continually call ReadNext.  You should
block until sufficient data is available.

> I have a question on how to use the Acero push model to write streaming
data as hive partitioning Parquet in a single thread program

The example I gave will use one thread in addition to the main thread.
Doing something without any additional threads at all would be possible,
but would probably require knowing more about the structure of your program.

On Thu, May 18, 2023 at 3:27 PM Haocheng Liu  wrote:

> Hi,
>
> I have a question on how to use the Acero push model to write streaming
> data as hive partitioning Parquet in a single thread program. Can anyone
> guide what's the best practice here and if my below understandings are
> correct:
>
>- I receive streaming data via a callback function which gives me data
>row by row. To my best knowledge, Subclassing RecordBatchReader is
>preferred?
>- Should I batch a fixed number rows in some in memory data structure
>first, then flush them to acero? Then how could acero know it's time to
>push data in ReadNext
>
> 
> function?
>
> I'm not clear on how to connect a call back function from streaming data
> with Aecro push model. Any suggestions will be appreciated.
>
>
> Thanks.
>
> Best,
> Haocheng
>


Re: pyArrow calling into user C++ code

2023-05-17 Thread Weston Pace
The page that Aldrin linked is possible but it requires that you use the
same toolchain and version as pyarrow.  I would probably advise using the C
data API first.  By using the C data API you don't have to couple yourself
so tightly with the pyarrow build.  For example, your C++ extension can pin
itself to Arrow version 5 and people using pyarrow 11 will still be able to
use your extension without problems.

Since this question comes up fairly often I decided to create a quick
minimal example of what this might look like.  The example creates a C++
python module using pybind11.  The C++ code relies on Arrow-C++ and
interoperates with pyarrow.  You would not need to use Arrow-C++ and could
use nanoarrow or you can copy the C data API headers directly into your
project.  The example can be found at [1].

[1]: https://github.com/westonpace/arrow-cdata-example

On Tue, May 16, 2023 at 9:07 AM Aldrin  wrote:

> You can definitely use C++! I will see if I can find an example, but in
> the meantime there's also this page in the docs [1].
>
> [1]: https://arrow.apache.org/docs/python/integration/extending.html
>
> Sent from Proton Mail for iOS
>
>
> On Tue, May 16, 2023 at 06:32, Hinko Kocevar  > wrote:
>
> Hi,
>
> I'm trying to understand if it is possible to have a C/C++ code (homebrew
> code) integrated into arrow such that a user of pyArrow would be able to
> utilize the homebrew functions (from python script).
>
> The idea is to pass an arrow array/table (or numpy array?) to the external
> code, let it work on the input(s) to produce an arrow output array and
> return it to the user. Again, the choice of programming language for user
> is Python. I've noticed c data interface and c stream interface as well as
> user compute functions in the docs. It is not clear to me if any of those
> support my use case and further more how do I get to utilize that in Python
> once implemented in C++.
>
> For example, something like https://numpy.org/doc/stable/user/c-info.html
> is what I would be after.
>
> Can this be done in (py)arrow, or should I just do it in numpy ?
>
> Thank you,
> Hinko
>
>


Re: [C++] arrow compute and tbb

2023-04-22 Thread Weston Pace
I'm not entirely sure what your use case is.  If you're running compute
functions on giant arrays then you will not get any parallelism.  Compute
functions do not, themselves, exploit parallelism.  Instead we normally
achieve parallelism in Acero by running batches through the compute
functions in parallel.  So, if you have a very large table, you would get
some parallelism by creating a plan with a table_source and a project
node.  The table source will peel off batches of size 1Mi (by default) and
run your expressions in parallel on those batches.  You could also do this
parallel slicing and function execution yourself without Acero.

However, you mention `sort_indices`, neither of which is a scalar function
(and thus can't be used in the project node), and not something that can
easily be computed with a divide-and-conquer fork join.  There is an
order_by node in Acero but it isn't terribly optimized.  It simply collects
all batches into one large chunked array and runs the sort_indices compute
function.  Parallel sorting is very much possible so there is definitely
room for improvement here.

 * We could make the sort_indices kernel itself take advantage of
parallelism.  This would be a little unique (kernel functions don't
traditionally take advantage of parallelism) but now that we are on C++17
this could be as simple as using STL's parallel stable_sort.
 * Another approach to modifying the sort_indices kernel could be to modify
the ChunkedArraySorter.  When sorting chunked arrays we sort each chunk
individually and then merge them.  The sorting of individual chunks is an
easily parallelizable thing but we do this serially today.
 * Or we could leave the sort_indices kernel alone and modify the order_by
node to sort in parallel and then merge instead of relying on a chunked
array compute kernel.

You also mention `take`.  This is another non-scalar function.  However, if
you have a large list of indices, you can probably implement a fork-join on
top of this yourself.  Then again, if I recall correctly, you are applying
many filters.  So it may be possible that you are only using `take`
indirectly.

On Fri, Apr 21, 2023 at 7:18 PM Surya Kiran Gullapalli <
suryakiran.gullapa...@gmail.com> wrote:

> Hi,
> Thanks for the reply and the suggestion on custom executor. I'll take a
> look at it.
>
> I was profiling my application (windows 10) which is dealing with millions
> of rows (roughly 10M) of data and I found some places where only a few cpus
> were engaged and the rest of them were sitting idle. On investigating
> further, I found out the time spent was in arrow compute calls (take,
> sort_indices etc). In fact out of 12 cores, only one core was doing work.
> The CPU thread count was showing 24 (12 * 2). So I'm trying to find out if
> there's any way I can improve the performance as my application is already
> using TBB for some tasks.
>
> Any pointers in this direction would be greatly appreciated.
>
> Thanks,
> Surya
>
> On Sat, Apr 22, 2023 at 2:27 AM Weston Pace  wrote:
>
>> No, there's no build-time configuration settings to enable TBB
>> specifically.
>>
>> You can, at runtime, specify a custom executor to use for most
>> operations.  We use one thread pool for CPU tasks and one for I/O tasks.
>> You could replace either or both with a TBB-based executor.
>>
>> For example, the method for creating a CSV file is defined as:
>>
>> ```
>> static Future> MakeAsync(
>>   io::IOContext io_context, std::shared_ptr input,
>>   arrow::internal::Executor* cpu_executor, const ReadOptions&, const
>> ParseOptions&,
>>   const ConvertOptions&);
>> ```
>>
>> The `cpu_executor` property specifies which thread pool to use for CPU
>> tasks.  The I/O executor is a part of the `io_context`.
>>
>> The executor interface is pretty straightforward.  Hiding the utility
>> functions it is...
>>
>> ```
>> class ARROW_EXPORT Executor {
>> public:
>>   virtual int GetCapacity() = 0;
>> protected:
>>   virtual Status SpawnReal(TaskHints hints, FnOnce task,
>> StopToken, StopCallback&&) = 0;
>> };
>> ```
>>
>> It shouldn't be too much work to create a custom implementation based on
>> TBB.  Out of curiosity, what is the motivation for using TBB?
>>
>> -Weston
>>
>> On Fri, Apr 21, 2023 at 11:04 AM Surya Kiran Gullapalli <
>> suryakiran.gullapa...@gmail.com> wrote:
>>
>>> Hello,
>>> I'm curious to know if c++ sdk of arrow compute functions can use tbb
>>> parallelization underneath ?
>>> The documentation mentions that arrow uses a threadpool for
>>> parallelization. Does compute functions also use threadpool and parallelize
>>> computation ?
>>>
>>> Looking at the .so file created I do not see tbb library as a dependency
>>> for arrow library.
>>>
>>> Is there a configuration variable during build which can activate this ?
>>>
>>> Thanks,
>>> Surya
>>>
>>


Re: [C++] arrow compute and tbb

2023-04-21 Thread Weston Pace
No, there's no build-time configuration settings to enable TBB specifically.

You can, at runtime, specify a custom executor to use for most operations.
We use one thread pool for CPU tasks and one for I/O tasks.  You could
replace either or both with a TBB-based executor.

For example, the method for creating a CSV file is defined as:

```
static Future> MakeAsync(
  io::IOContext io_context, std::shared_ptr input,
  arrow::internal::Executor* cpu_executor, const ReadOptions&, const
ParseOptions&,
  const ConvertOptions&);
```

The `cpu_executor` property specifies which thread pool to use for CPU
tasks.  The I/O executor is a part of the `io_context`.

The executor interface is pretty straightforward.  Hiding the utility
functions it is...

```
class ARROW_EXPORT Executor {
public:
  virtual int GetCapacity() = 0;
protected:
  virtual Status SpawnReal(TaskHints hints, FnOnce task, StopToken,
StopCallback&&) = 0;
};
```

It shouldn't be too much work to create a custom implementation based on
TBB.  Out of curiosity, what is the motivation for using TBB?

-Weston

On Fri, Apr 21, 2023 at 11:04 AM Surya Kiran Gullapalli <
suryakiran.gullapa...@gmail.com> wrote:

> Hello,
> I'm curious to know if c++ sdk of arrow compute functions can use tbb
> parallelization underneath ?
> The documentation mentions that arrow uses a threadpool for
> parallelization. Does compute functions also use threadpool and parallelize
> computation ?
>
> Looking at the .so file created I do not see tbb library as a dependency
> for arrow library.
>
> Is there a configuration variable during build which can activate this ?
>
> Thanks,
> Surya
>


Re: [C++] Filter string array vs dictionary encoded array

2023-04-13 Thread Weston Pace
It's unlikely there will be much benefit in using dictionary encoding.  How
are you doing the filtering?  It sounds like you might have many filters.
`arrow::compute::Filter` will not get great performance if you want to
apply many filters.  No one has added support for selection vectors but
that would probably be the fastest way to apply many filters against the
same array.  Ideally you could avoid any kind of allocation between each
filter pass.  Although, if the filters were highly selective you might want
to use a list of indices instead of a selection vector.  However, this has
also not been implemented.

On Wed, Apr 12, 2023 at 7:57 PM Surya Kiran Gullapalli <
suryakiran.gullapa...@gmail.com> wrote:

> Hi,
> I’m trying to run filter based on multiple columns (decided at run time)
> in a table. There are more than a million  of filters I have to run and
> even though I’m getting results it was taking humongous amount of time to
> complete the operation. I’m using Acero filter node to get the filtered
> table
>
> I’d like to know if there’ll be a performance improvement if I use
> dictionary encoded arrays instead of string arrays?
>
> Also, any other pointers to speed up the operation
>
> Thanks,
> Surya
>
>
>


[C++] Acero refactoring

2023-04-01 Thread Weston Pace
Over a series of PRs (with [1] being the overall PR with most of the
discussion) we have done some internal refactoring of the streaming
execution engine (Acero) to put it into its own namespace and library.
This was a significant move of files and there are two breaking changes
(the affected code was labeled experimental but we have tried to avoid
breaking changes here so far) that I wanted to cover in more detail.  This
is also why I've included the user@ mailing list as I know there are some
users that are using the ExecPlan today.

The overall gist of the change is covered in this diagram [2] and there is
now an additional libarrow_acero shared library.

The breaking changes are:

 * A number of files that used to reside in src/arrow/compute/exec are now
located in src/arrow/acero
 * Some of the symbols in the arrow::compute namespace (those defined in
the above files) have now moved to the arrow::acero namespace

We have no doubt broken something in this change.  I will be focusing on
nightly test failures over the next few weeks as we approach the release.
Please feel free to ping me (@westonpace) on any issues that seem to be
caused by this.

Lastly, I want to thank ildipo and icexelloss for doing the bulk of this
work and many others (Will, Joris, Kou, Jacob, and more I probably missed)
for their review and suggestions.

[1] https://github.com/apache/arrow/issues/15280
[2]
https://docs.google.com/document/d/1oCGl3_QJUcNAk1qnjlBvoZsWKbrTsCT87QozqbCK6sQ/edit?usp=sharing


Re: s3 partitioned data - delayed partition scan

2023-03-29 Thread Weston Pace
I suspect this is a combination of [1] and [2]/[3].

We do not currently allow you to specify a filter during discovery.  We
could, and that should allow us to reduce the amount of reading we need to
do.

Also, when no filter is supplied, we can be more efficient with our usage
of S3.

[1] https://github.com/apache/arrow/issues/31174
[2] https://github.com/apache/arrow/issues/34213
[3] https://github.com/apache/arrow/issues/25019

On Wed, Mar 29, 2023 at 1:17 AM Oxlade, Dan 
wrote:

> Hi all,
>
>
>
> I’m fairly new to arrow.
>
>
>
> I’m trying to create an Arrow Flight service that reads data from and s3
> bucket. On the face of it that appears to be quite simple. Unfortunately I
> have a very large bucket with 1000’s of files across partitions.
>
>
>
> I’m trying the following in python:
>
>
>
> dataset = ds.dataset(
>
>f“{bucket}/{partition_root}/”,
>
>filesystem=s3fs,
>
>partitioning=my_partitioning_def,
>
> )
>
> batches = dataset.to_batches(
>
>filter=my_filter_which_would_select_a_tiny_subset_of_files
>
> )
>
>
>
> From my testing it seems as though the s3 bucket is scanned at the first
> step, this is extremely inefficient in my use-case. Is there a way to delay
> the scan until the filter is applied? This could reduce the scan of many
> 1000’s of objects to a single object in s3.
>
>
>
> Hopefully that make sense.
>
>
>
> Thanks
>
> Dan
>
>
> T. Rowe Price (including T. Rowe Price Group, Inc. and its affiliates) and
> its associates do not provide legal or tax advice.  Any tax-related
> discussion contained in this e-mail, including any attachments, is not
> intended or written to be used, and cannot be used, for the purpose of (i)
> avoiding any tax penalties or (ii) promoting, marketing, or recommending to
> any other party any transaction or matter addressed herein.  Please consult
> your independent legal counsel and/or professional tax advisor regarding
> any legal or tax issues raised in this e-mail.
>
> The contents of this e-mail and any attachments are intended solely for
> the use of the named addressee(s) and may contain confidential and/or
> privileged information. Any unauthorized use, copying, disclosure, or
> distribution of the contents of this e-mail is strictly prohibited by the
> sender and may be unlawful. If you are not the intended recipient, please
> notify the sender immediately and delete this e-mail.
>


Re: [C++] How can I configure s3 filestystem to use my endpoint?

2023-03-24 Thread Weston Pace
No.  There is no environment variable for setting the endpoint URL.

In addition to the FileSystemFromUri method you can also construct the
filesystem directly:

```
arrow::fs::S3Options options = arrow::fs::S3Options::Defaults();
options.endpoint_override = "localhost:9000";
Result filesystem =
arrow::fs::S3FileSystem::Make(options);
```

On Wed, Mar 22, 2023 at 9:48 PM Алексей Рябов  wrote:

> Hello, I want to understand the proper way of configuring S3FileSystem for
> it to be able reading from my local MinioServer. Question is about
> specifying endpoint_url.
>
> I can set access keys through environment
> variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, but the only way to
> specify endpoint url is to set it in bucket location uri through endpoint:
>
> s3://mvp/foo/bar/?endpoint_override=localhost:9000=http
>
> and then using arrow::fs::FileSystemFromUri method.
>
> Is there any other way of specifying endpoint url similar to access keys?
> Please advise.
>


Re: [Python] S3 and user-defined object metadata

2023-03-20 Thread Weston Pace
Thow following headers are supported:

ACL*
Cache-Control
Content-Type
Content-Language
Expires

*ACL must be one of the "canned ACL" strings[1]

> In the docs of open_append_stream() it reads 'Unsupported metadata keys
will be ignored.'
> What  does 'unsupported' mean here - unsupported by PyArrow or
unsupported by the S3 implementation?

The primary point here was "not all filesystems support all metadata".  It
could be either unsupported by S3 or unsupported by pyarrow.  It would
depend on the specific value.

> It would be nice to add such user-defined object metadata using PyArrow.

This seems like a reasonable request.  Feel free to open an issue on [2]
although I think this ask may be covered by [3] which seems to be a similar
issue.  Are you interested in creating a PR?

[1]
https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
[2] https://github.com/apache/arrow
[3] https://github.com/apache/arrow/issues/32083

On Fri, Mar 17, 2023 at 10:41 AM elveshoern32 via user <
user@arrow.apache.org> wrote:

> Hi,
>
> according to the docs
> https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html 
> it's
> possible to define metadata for files in S3 object storage using the
> 'default_metadata' argument of pyarrow.fs.S3FileSystem().
> Experimentally this works only for certain keys:
> Some standard keys like 'Content-Type', 'Content-Language' or 'Expires' do
> work, but others like 'ACL', 'foo' or 'x-amz-meta-thud' don't.
> It seems that Arrow filters for only a few keys and throws away any
> further.
>
> In the docs of open_append_stream() it reads 'Unsupported metadata keys
> will be ignored.'
> What  does 'unsupported' mean here - unsupported by PyArrow or unsupported
> by the S3 implementation?
> How can Arrow know which keys are supported by the implementation (and,
> yes, there are more implementations available beyond the original AWS S3)?
>
> According to the AWS docs
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html it
> should be possible to use 'User-defined object metadata' with keys that
> start with 'x-amz-meta-'.
> It would be nice to add such user-defined object metadata using PyArrow.
> (Yes, using other tools user-defined object metadata work like charm for
> me, so it's not a limitation in S3.)
>
> Am I missing anything?
>
> Regards,
> elveshoern32
>
>
>


Re: [C++][C Data][Table]ChunkedArray and Table

2023-03-09 Thread Weston Pace
> I think you're not releasing memory correctly; namely, you're missing
calls to `release`

If you call `arrow::ImportRecordBatch` it will take ownership of calling
release on the ArrowArray and so you should not call release yourself.
There is a variant of this method that receives ArrowSchema instead of
std::shared_ptr and that variant will also take care of calling
release on the schema.  So the only time that release() needs to be called
is in the else branch.

The ArrowSchema is getting unconditionally passed to `arrow::ImportSchema`
which will take care of calling release().  So you should not call release
on the ArrowSchema.

> I have to have it like this:
> Otherwise there will be a seg fault in c++ code. It’s maybe a requirement
for DuckDB.

>From DuckDb's perspective it shouldn't matter (and it should be impossible
for DuckDb to know) how you are allocating the ArrowSchema / ArrowArray.
So this is very odd.

Is this code something runnable that you can share?
Can you run the code in gdb and show the stack trace of the segmentation
fault?

On Wed, Mar 8, 2023 at 10:57 PM Kimmo Linna  wrote:

> Hi,
>
> I have to have it like this:
> ArrowSchema *arrow_schema = new ArrowSchema;
> ArrowArray *arrow_array = new ArrowArray;
>
> Otherwise there will be a seg fault in c++ code. It’s maybe a requirement
> for DuckDB.
>
> Best regards,
>
> Kimmo
> --
> Kimmo Linna
> Nihtisalontie 3 as 1
> 02630  ESPOO
> kimmo.li...@gmail.com
> +358 40 590 1074
>
>
>
> On 8. Mar 2023, at 18.10, Weston Pace  wrote:
>
> I agree with Aldrin.  `arrow_schema` has type ArrowSchema*. You are
> passing in `_schema` which has type `ArrowSchema**` but you are
> casting it to `duckdb_arrow_schema*` so it at least compiles.  You can also
> simplify things by getting rid of some of this allocation.
>
> ```
> ArrowSchema arrow_schema;
> duckdb_query_arrow_schema(arrow_result, (duckdb_arrow_schema
> *)_schema);
> auto schema = arrow::ImportSchema(_schema);
> auto output_stream = arrow::io::BufferOutputStream::Create();
> auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream,
> *schema);
> while (true)
> {
>   ArrowArray arrow_array;
>   duckdb_query_arrow_array(arrow_result, (duckdb_arrow_array
> *)_array);
>   if (arrow_array.length > 0)
>   {
> (*batch_writer)->WriteRecordBatch((const arrow::RecordBatch
> &)**arrow::ImportRecordBatch(_array, *schema));
>   }
>   else
>   {
> arrow_array.release(_array);
> break;
>   }
> }
> auto buffer = (*output_stream)->Finish();
> ```
>
>
> On Tue, Mar 7, 2023 at 11:26 AM Aldrin  wrote:
>
>> I might be misreading, but do you need to get the address of your object
>> pointers, `*arrow_schema` and `*arrow_array`? If they're already pointers,
>> I'm not sure you need to get their address and cast that to a pointer.
>> otherwise, I don't see anything else that stands out as problematic to me.
>>
>> You could try to check the Result objects so that you get better error
>> information before the segfault occurs; as it stands now you're blindly
>> dereferencing. The other thing that might be helpful is that I think you
>> don't have to call `new` every time. In all of these instances you are
>> allocating memory for the objects, but you could just create them directly
>> and then pass references to them. For example:
>>
>>ArrowSchema arrow_schema;
>>duckdb_query_arrow_schema(arrow_result, _schema);
>>...
>>
>> Since ArrowSchema seems to be default constructible, I believe the
>> declaration alone will do the default construction (`ArrowSchema
>> arrow_schema`). But, my cpp knowledge comes and goes, so apologies if those
>> suggestions aren't accurate.
>>
>> Aldrin Montana
>> Computer Science PhD Student
>> UC Santa Cruz
>>
>>
>> On Sun, Mar 5, 2023 at 8:56 AM Kimmo Linna  wrote:
>>
>>> Hi,
>>>
>>> I noticed that my previous code worked only for one ArrowArray from
>>> DuckDB. I change a code this but this is still working only for one
>>> ArrowArray. The second “loop” causes a segmentation fault. Could someone
>>> guide me a bit? What should I change to get this work?
>>>
>>>
>>> ArrowSchema *arrow_schema = new ArrowSchema();
>>> duckdb_query_arrow_schema(arrow_result, (duckdb_arrow_schema *)&
>>> arrow_schema);
>>> auto schema = arrow::ImportSchema(arrow_schema);
>>> auto output_stream = arrow::io::BufferOutputStream::Create();
>>> auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream, *schema

Re: [C++][C Data][Table]ChunkedArray and Table

2023-03-08 Thread Weston Pace
I agree with Aldrin.  `arrow_schema` has type ArrowSchema*. You are passing
in `_schema` which has type `ArrowSchema**` but you are casting it to
`duckdb_arrow_schema*` so it at least compiles.  You can also simplify
things by getting rid of some of this allocation.

```
ArrowSchema arrow_schema;
duckdb_query_arrow_schema(arrow_result, (duckdb_arrow_schema
*)_schema);
auto schema = arrow::ImportSchema(_schema);
auto output_stream = arrow::io::BufferOutputStream::Create();
auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream,
*schema);
while (true)
{
  ArrowArray arrow_array;
  duckdb_query_arrow_array(arrow_result, (duckdb_arrow_array
*)_array);
  if (arrow_array.length > 0)
  {
(*batch_writer)->WriteRecordBatch((const arrow::RecordBatch
&)**arrow::ImportRecordBatch(_array, *schema));
  }
  else
  {
arrow_array.release(_array);
break;
  }
}
auto buffer = (*output_stream)->Finish();
```


On Tue, Mar 7, 2023 at 11:26 AM Aldrin  wrote:

> I might be misreading, but do you need to get the address of your object
> pointers, `*arrow_schema` and `*arrow_array`? If they're already pointers,
> I'm not sure you need to get their address and cast that to a pointer.
> otherwise, I don't see anything else that stands out as problematic to me.
>
> You could try to check the Result objects so that you get better error
> information before the segfault occurs; as it stands now you're blindly
> dereferencing. The other thing that might be helpful is that I think you
> don't have to call `new` every time. In all of these instances you are
> allocating memory for the objects, but you could just create them directly
> and then pass references to them. For example:
>
>ArrowSchema arrow_schema;
>duckdb_query_arrow_schema(arrow_result, _schema);
>...
>
> Since ArrowSchema seems to be default constructible, I believe the
> declaration alone will do the default construction (`ArrowSchema
> arrow_schema`). But, my cpp knowledge comes and goes, so apologies if those
> suggestions aren't accurate.
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Sun, Mar 5, 2023 at 8:56 AM Kimmo Linna  wrote:
>
>> Hi,
>>
>> I noticed that my previous code worked only for one ArrowArray from
>> DuckDB. I change a code this but this is still working only for one
>> ArrowArray. The second “loop” causes a segmentation fault. Could someone
>> guide me a bit? What should I change to get this work?
>>
>>
>> ArrowSchema *arrow_schema = new ArrowSchema();
>> duckdb_query_arrow_schema(arrow_result, (duckdb_arrow_schema *)&
>> arrow_schema);
>> auto schema = arrow::ImportSchema(arrow_schema);
>> auto output_stream = arrow::io::BufferOutputStream::Create();
>> auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream, *schema
>> );
>> while (true) {
>> ArrowArray *arrow_array = new ArrowArray();
>> duckdb_query_arrow_array(arrow_result, (duckdb_arrow_array *)_array
>> );
>> if( arrow_array->length > 0) {
>> (*batch_writer)->WriteRecordBatch((const arrow::RecordBatch&) **arrow::
>> ImportRecordBatch(arrow_array,*schema));
>> delete arrow_array;
>> }else{
>> break;
>> }
>> }
>> auto buffer = (*output_stream)->Finish();
>> Best regards,
>>
>> Kimmo
>>
>> --
>> Kimmo Linna
>> Nihtisalontie 3 as 1
>> 02630  ESPOO
>> kimmo.li...@gmail.com
>> +358 40 590 1074
>>
>>
>>
>>


Re: [C++][C Data][JavaScript]RecordBatch buffer to JavaScript in Bun

2023-03-02 Thread Weston Pace
> I think that the main thing is that is It possible use RecordBatchReader
in Javascript without IPC stream.

It is possible but it won't be easy.

Looking at your code I assume arrow_schema is an instance of ArrowSchema in
the C data interface and arrow_array is an instance of ArrowArray in the C
data interface. These are defined in the C data interface[1] which is meant
to be a stable C ABI.  Everything in these structures is either a number, a
pointer (pointers to data, pointers to functions, and pointers to structs),
or a null terminated string.  So these structures should be able to marshal
across bun:ffi.  You would want to turn the numbers into JS numbers, the
null terminated strings into JS strings, the function pointers into JS
functions[2], and the data pointers into ArrayBuffer[3].

Once you've done this you should be able to assemble these various pieces
into an ArrowJS Schema[4] or an ArrowJS RecordBatch[5].

At the end of the day I'd expect most of the work to be JS work and not
much C++ work.  However, it would require a pretty good familiarity with
ArrowJS to know the proper way to assemble these different pieces (e.g. the
ArrayBuffer data buffers would need to be wrapped into things like
Int8Array or Int32Array based on the type of the array).

[1]
https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions
[2] https://bun.sh/docs/api/ffi#function-pointers
[3]
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer
[4] https://github.com/apache/arrow/blob/main/js/src/schema.ts
[5] https://github.com/apache/arrow/blob/main/js/src/recordbatch.ts

On Thu, Mar 2, 2023 at 11:19 AM Kimmo Linna  wrote:

> Hi Weston,
>
> I’m willing but hardly capable. I’m just a copy cat with C/C++. I think
> that the main thing is that is It possible use RecordBatchReader in
> Javascript without IPC stream. I haven’t find a way but that doesn’t tell
> much. Bun is capable to read directly from buffer if the buffer is null
> terminated or the size is know. I tried to use TotalBufferSize with
> RecordBatch from ImportRecordBatch but It didn’t work.
>
> K.
> --
> Kimmo Linna
> Nihtisalontie 3 as 1
> 02630  ESPOO
> kimmo.li...@gmail.com
> +358 40 590 1074
>
>
>
> On 2. Mar 2023, at 21.05, Weston Pace  wrote:
>
> I believe you would need a Javascript version of the C data interface.
> This should be doable with bun:ffi but I'm not aware of anyone that has
> done this before.  I also wonder if there is a way to create a C data
> interface based on TypedArray that would be usable in both bun and node.
> I'm also not really up to speed on what arrow-js has in terms of
> capabilities so it is possible it exists and I just didn't know.  Is it
> something you are interested in contributing?
>
> On Wed, Mar 1, 2023 at 10:41 PM Kimmo Linna  wrote:
>
>> Hi,
>>
>> I will get ArrowSchema and ArrowArray directly from DuckDB. I want to
>> transfer the RecordBatch to Bun with bun::ffi. At the moment my procedure
>> is the following:
>> auto schema = arrow::ImportSchema(arrow_schema);
>> auto batch = arrow::ImportRecordBatch(arrow_array, *schema);
>> auto output_stream = arrow::io::BufferOutputStream::Create();
>> auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream, *schema
>> );
>> auto status = (*batch_writer)->WriteRecordBatch(**batch);
>> auto buffer = (*output_stream)->Finish();
>> (*out).address = (void *)(*buffer)->address();
>> (*out).size = (*buffer)->size();
>>
>> And then I will read the buffer in Bun with toArrayBuffer and
>> RecordBatchReader like this:
>> return RecordBatchReader.from(
>> toArrayBuffer(
>> dab.dab_ipc_address(ipc), 0, Number(dab.dab_ipc_size(ipc))
>> )).readAll()[0];
>>
>> I just wonder Is there a way to read RecordBatch directly from
>> RecordBatch which is done by ImportRecordBatch or can I do this without
>> OutputStream at all?
>>
>> Best regards,
>>
>> Kimmo
>>
>> --
>> Kimmo Linna
>> Nihtisalontie 3 as 1
>> 02630  ESPOO
>> kimmo.li...@gmail.com
>> +358 40 590 1074
>>
>>
>>
>>
>


Re: [C++] How to write parquet file in hive style using parquet::StreamingWriter

2023-03-02 Thread Weston Pace
If you are working in C++ there are a few interfaces you might be
interested in.

The simplest high level API for this would be to use Acero and create a
write node.  This is what pyarrow uses (though a little indirectly at the
moment).  There is a brief example here[1].  I'd be happy to answer
specific questions too.  The input to Acero needs to be a stream of record
batches.  You could wrap your bespoke reader in a RecordBatchReader and
then use "record_batch_reader_source".  Putting it all together you would
get something that looks like (in pseudocode):

```
BespokeReader reader = OpenBespokeReaderForGiantFile(...);
RecordBatchReader rb_reader = reader.ToRecordBatchReader();
RecordBatchReaderSourceNodeOptions source_options{rb_reader};
// This is where you specify which columns to partition on and specify
// that you want to use hive-style partitioning
FileSystemDatasetWriteOptions write_options = CreateWriteOptions(...);
WriteNodeOptions write_node_options(write_options);
Declaration plan = Declaration::Sequence({
  {"record_batch_reader_source", source_options},
  {"write", write_node_options}
});
Status final_result = DeclarationToStatus(plan);
```

Note, the above is assuming you are using Arrow 11.0.0.  The dataset
writer[2] is probably the core component for writing data to a dataset.  So
if you want to bypass Acero you could use it directly.  However, the
partitioning logic happens in the write node (and not the dataset writer)
today so you would need to duplicate that logic.

[1]
https://github.com/apache/arrow/blob/main/cpp/examples/arrow/execution_plan_documentation_examples.cc#L647
[2]
https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/dataset_writer.h

On Wed, Mar 1, 2023 at 2:57 PM Haocheng Liu  wrote:

> Hi Arrow community,
>
> Hope this email finds you well. I'm working on a project to convert
> a bespoke format into parquet format, where each file contains time series
> data and can be tens of gigabytes large on a daily basis.
>
> I've successfully created a binary with parquet::StreamingWriter to
> convert the file to one big parquet file.
> Next I would like to 1) break it into small files - let's say 1 hour per
> sub file - and 2) store them in a hive-style manner in *C++*. From the 
> official
> docs  I
> failed to find related information. Can folks please guide where the docs
> are or if it's doable right now in C++?
>
> Best regards
> Haocheng Liu
>
>
> --
> Best regards
>


Re: [C++][C Data][JavaScript]RecordBatch buffer to JavaScript in Bun

2023-03-02 Thread Weston Pace
I believe you would need a Javascript version of the C data interface.
This should be doable with bun:ffi but I'm not aware of anyone that has
done this before.  I also wonder if there is a way to create a C data
interface based on TypedArray that would be usable in both bun and node.
I'm also not really up to speed on what arrow-js has in terms of
capabilities so it is possible it exists and I just didn't know.  Is it
something you are interested in contributing?

On Wed, Mar 1, 2023 at 10:41 PM Kimmo Linna  wrote:

> Hi,
>
> I will get ArrowSchema and ArrowArray directly from DuckDB. I want to
> transfer the RecordBatch to Bun with bun::ffi. At the moment my procedure
> is the following:
> auto schema = arrow::ImportSchema(arrow_schema);
> auto batch = arrow::ImportRecordBatch(arrow_array, *schema);
> auto output_stream = arrow::io::BufferOutputStream::Create();
> auto batch_writer = arrow::ipc::MakeStreamWriter(*output_stream, *schema);
> auto status = (*batch_writer)->WriteRecordBatch(**batch);
> auto buffer = (*output_stream)->Finish();
> (*out).address = (void *)(*buffer)->address();
> (*out).size = (*buffer)->size();
>
> And then I will read the buffer in Bun with toArrayBuffer and
> RecordBatchReader like this:
> return RecordBatchReader.from(
> toArrayBuffer(
> dab.dab_ipc_address(ipc), 0, Number(dab.dab_ipc_size(ipc))
> )).readAll()[0];
>
> I just wonder Is there a way to read RecordBatch directly from RecordBatch
> which is done by ImportRecordBatch or can I do this without OutputStream at
> all?
>
> Best regards,
>
> Kimmo
>
> --
> Kimmo Linna
> Nihtisalontie 3 as 1
> 02630  ESPOO
> kimmo.li...@gmail.com
> +358 40 590 1074
>
>
>
>


Re: [python] Using Arrow for storing compressable python dictionaries

2022-11-23 Thread Weston Pace
> Question to others - is there option to read rows from 1000th to
> 2000th in current parquet interface?

I'm pretty sure the answer today is "no".  In theory, however, you
should be able to narrow down which pages to load using a skip.  I
believe there is some active work in this area[1].

> There is head in dataset.Scanner but slice is not there.

The head in the scanner today is a sort of "best effort stop reading
when we have enough data" and cannot accomplish any kind of skipping.
I am working on adding support in the scanner for skipping record
batches based on a limit & offset but that isn't ready yet.

[1] https://issues.apache.org/jira/browse/PARQUET-2210

On Wed, Nov 23, 2022 at 11:18 AM Jacek Pliszka  wrote:
>
> Actually you provided too little information to tell.
>
> Will you store data locally or over the network?
>
> Do you want to optimize for speed or data size?
>
> Locally stored memory mapped arrow IPC should be fast - you may want
> to test it against HDF5 and since it is memory mapped - slicing should
> work great.
>
> On the other hand, parquet file should have decent compression if you
> want to save on storage and/or bandwidth.
> But I do not know how to slice it efficiently in your case.
>
> Question to others - is there option to read rows from 1000th to
> 2000th in current parquet interface?
> There is head in dataset.Scanner but slice is not there.
>
> BR,
>
> Jacek
>
> śr., 23 lis 2022 o 18:20 Ramón Casero Cañas  napisał(a):
> >
> > Hi Jacek,
> >
> > Thanks for your reply, but it looks like that would be a complicated 
> > workaround. I have been looking some more, and it looks like hdf5 would be 
> > a good file format for this problem.
> >
> > It naturally supports slicing like fp['field1'][1000:5000], provides 
> > chunking and compression, new arrays can be appended... Maybe Arrow is just 
> > not the right tool for this specific problem.
> >
> > Kind regards,
> >
> > Ramon.
> >
> >
> > On Wed, 23 Nov 2022 at 15:54, Jacek Pliszka  wrote:
> >>
> >> Hi!
> >>
> >> I am not sure if this would solve your problem:
> >>
> >> pa.concat_tables([pa.Table.from_pydict({'v': v}).append_column('f',
> >> [len(v)*[f]]) for f, v in x.items()])
> >>
> >> pyarrow.Table
> >> v: double
> >> f: string
> >> 
> >> v: [[0.2,0.2,0.2,0.1,0.2,0,0.8,0.7],[0.3,0.5,0.1],[0.9,nan,nan,0.1,0.5]]
> >> f: 
> >> [["field1","field1","field1","field1","field1","field1","field1","field1"],["field2","field2","field2"],["field3","field3","field3","field3","field3"]]
> >>
> >> f column should compress very well or you can make it dictionary from the 
> >> start.
> >>
> >> To get back you can do couple things, take from pc.equal, to_batches, 
> >> groupby
> >>
> >> BR
> >>
> >> Jacek
> >>
> >>
> >>
> >> śr., 23 lis 2022 o 13:12 Ramón Casero Cañas  napisał(a):
> >> >
> >> > Hi,
> >> >
> >> > I'm trying to figure out whether pyArrow could efficiently store and 
> >> > slice large python dictionaries that contain numpy arrays of variable 
> >> > length, e.g.
> >> >
> >> > x = {
> >> > 'field1': [0.2, 0.2, 0.2, 0.1, 0.2, 0.0, 0.8, 0.7],
> >> > 'field2': [0.3, 0.5, 0.1],
> >> > 'field3': [0.9, NaN, NaN, 0.1, 0.5]
> >> > }
> >> >
> >> > Arrow seems to be designed for Tables, but I was wondering whether 
> >> > there's a way to do this (probably not with a Table or RecordBatch 
> >> > because those require the same lengths).
> >> >
> >> > The vector in each dictionary key would have in the order of 1e4 - 1e9 
> >> > elements. There are some NaN gaps in the data (which would go well with 
> >> > Arrow's null elements, I guess), but especially, many repeated values 
> >> > that makes the data quite compressible.
> >> >
> >> > Apart from writing that data to disk quickly and with compression, then 
> >> > I need to slice it efficiently, e.g.
> >> >
> >> > fp = open('file', 'r')
> >> > v = fp['field1'][1000:5000]
> >> >
> >> > Is this something that can be done with pyArrow?
> >> >
> >> > Kind regards,
> >> >
> >> > Ramon.


Re: [python] Using Arrow for storing compressable python dictionaries

2022-11-23 Thread Weston Pace
You could store it a List column:

```
>>> x = pa.array([[1.2, 2.3], [3.4]])
>>> x

[
  [
1.2,
2.3
  ],
  [
3.4
  ]
]
>>> x[0]

>>> x[0][1]

>>> x[0].values.slice(0, 1)

[
  1.2
]
```

This will be stored in parquet as LIST and should give you reasonable
compression (though I have not personally tested it).

Slicing is O(1) once it is loaded in memory.

On Wed, Nov 23, 2022 at 9:20 AM Ramón Casero Cañas  wrote:
>
> Hi Jacek,
>
> Thanks for your reply, but it looks like that would be a complicated 
> workaround. I have been looking some more, and it looks like hdf5 would be a 
> good file format for this problem.
>
> It naturally supports slicing like fp['field1'][1000:5000], provides chunking 
> and compression, new arrays can be appended... Maybe Arrow is just not the 
> right tool for this specific problem.
>
> Kind regards,
>
> Ramon.
>
>
> On Wed, 23 Nov 2022 at 15:54, Jacek Pliszka  wrote:
>>
>> Hi!
>>
>> I am not sure if this would solve your problem:
>>
>> pa.concat_tables([pa.Table.from_pydict({'v': v}).append_column('f',
>> [len(v)*[f]]) for f, v in x.items()])
>>
>> pyarrow.Table
>> v: double
>> f: string
>> 
>> v: [[0.2,0.2,0.2,0.1,0.2,0,0.8,0.7],[0.3,0.5,0.1],[0.9,nan,nan,0.1,0.5]]
>> f: 
>> [["field1","field1","field1","field1","field1","field1","field1","field1"],["field2","field2","field2"],["field3","field3","field3","field3","field3"]]
>>
>> f column should compress very well or you can make it dictionary from the 
>> start.
>>
>> To get back you can do couple things, take from pc.equal, to_batches, groupby
>>
>> BR
>>
>> Jacek
>>
>>
>>
>> śr., 23 lis 2022 o 13:12 Ramón Casero Cañas  napisał(a):
>> >
>> > Hi,
>> >
>> > I'm trying to figure out whether pyArrow could efficiently store and slice 
>> > large python dictionaries that contain numpy arrays of variable length, 
>> > e.g.
>> >
>> > x = {
>> > 'field1': [0.2, 0.2, 0.2, 0.1, 0.2, 0.0, 0.8, 0.7],
>> > 'field2': [0.3, 0.5, 0.1],
>> > 'field3': [0.9, NaN, NaN, 0.1, 0.5]
>> > }
>> >
>> > Arrow seems to be designed for Tables, but I was wondering whether there's 
>> > a way to do this (probably not with a Table or RecordBatch because those 
>> > require the same lengths).
>> >
>> > The vector in each dictionary key would have in the order of 1e4 - 1e9 
>> > elements. There are some NaN gaps in the data (which would go well with 
>> > Arrow's null elements, I guess), but especially, many repeated values that 
>> > makes the data quite compressible.
>> >
>> > Apart from writing that data to disk quickly and with compression, then I 
>> > need to slice it efficiently, e.g.
>> >
>> > fp = open('file', 'r')
>> > v = fp['field1'][1000:5000]
>> >
>> > Is this something that can be done with pyArrow?
>> >
>> > Kind regards,
>> >
>> > Ramon.


Re: Questions about working with large multi-file zipped CSV data

2022-11-09 Thread Weston Pace
> 2. Some of the files end up being larger than memory when unzipped. In this 
> case I’m using the file size to switch over and use open_csv instead of 
> read_csv. Is there any plan for open_csv to be multithreaded in a future 
> release (didn’t see anything on Jira, but I’m not great at searching on it)?

There is a PR in progress[1] which will add parallel reads to
`open_csv` when provided with a random access file (e.g. not just an
input stream).  This is important when reading from S3 but perhaps not
as big of a deal when reading from a local disk (which usually doesn't
support a ton of parallelism).  The streaming CSV reader's parsing is
also not very parallel and could be improved (I would presume to get
pretty close to read_csv performance).  However, I don't know anyone
currently working on this.

> If I go with this approach, will the dataset to batches read be 
> single-threaded (reading csv format) like open_csv? That is obviously not an 
> issue for large files I would have had to use open_csv for anyway, but if the 
> eventual dataset API read is single threaded, I might still want to use 
> read_csv and process columns post read for smaller datasets.

dataset.to_batches is built on top of the streaming CSV reader (e.g.
open_csv).  However, any compute work done by datasets (e.g.
dictionary encoding, joins, casts, etc.) will be done in parallel.

[1] https://github.com/apache/arrow/pull/14269

On Wed, Nov 9, 2022 at 3:21 PM Ryan Kuhns  wrote:
>
> Hi Everyone,
>
> Adam’s reply got me thinking about using the dataset API to overcome the 
> problem I was facing in my third question. It seems like I could use the 
> column projection to provide a mapping of from strings to integer lookup 
> values. Then similar to the writing large amounts of data example 
> (https://arrow.Apache.org/docs/Python/dataset.html) I can pass the dataset to 
> write_dataset and never have everything in memory.
>
> If I go with this approach, will the dataset to batches read be 
> single-threaded (reading csv format) like open_csv? That is obviously not an 
> issue for large files I would have had to use open_csv for anyway, but if the 
> eventual dataset API read is single threaded, I might still want to use 
> read_csv and process columns post read for smaller datasets.
>
> Thanks,
>
> Ryan
>
> On Nov 9, 2022, at 4:07 PM, Ryan Kuhns  wrote:
>
> 
> Adam,
>
> Thanks for pointing me to that. The fsspec approach looks like it will be 
> helpful and the code snippet give me a good starting point.
>
> -Ryan
>
> On Nov 9, 2022, at 2:42 PM, Kirby, Adam  wrote:
>
> 
> Hi Ryan,
>
> For your first question of a ZIP of multiple CSVs, I've had good luck [2] 
> combining fsspec [1] with pyarrow dataset to process ZIPs of multiple CSVs. 
> fsspec allows you to manage how much RAM you use on the read side with a few 
> different cache configs.
>
> In case helpful, I sent a python snippet earlier. [3]
>
> [1] 
> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>
> [2] The idea was proposed by emkornfi...@gmail.com on this list and proved 
> very helpful.
>
> [3] https://www.mail-archive.com/user@arrow.apache.org/msg02176.html
>
>
> On Wed, Nov 9, 2022, 12:15 PM Ryan Kuhns  wrote:
>>
>> Hi Everyone,
>>
>> I’m using pyarrow to read, process, store and analyze some large files 
>> (~460GB zipped on 400+ files updated quarterly).
>>
>> I’ve have a couple thoughts/questions come up as I have worked through the 
>> process. First two questions are mainly informational, wanting to confirm 
>> what I’ve inferred from existing docs.
>>
>> 1. I know pyarrow has functionality to uncompress a zipped file with a 
>> single CSV in it, but in my case I have 3 files in the zip. I’m currently 
>> using Python’s zipfile to find and open the file I want in the zip and then 
>> I am reading it with pyarrow.read_csv. I wanted to confirm there isn’t 
>> pyarrow functionality that might be able to tell me the files in the zip and 
>> let me select the one to unzip and read.
>>
>> 2. Some of the files end up being larger than memory when unzipped. In this 
>> case I’m using the file size to switch over and use open_csv instead of 
>> read_csv. Is there any plan for open_csv to be multithreaded in a future 
>> release (didn’t see anything on Jira, but I’m not great at searching on it)?
>>
>> 3. My data has lots of columns that are dimensions (with low cardinality) 
>> with longish string values and a large number of rows. Since I have files 
>> getting close to or above my available memory when unzipped, I need to be as 
>> memory efficient as possible. Converting these to dictionaries via 
>> ConvertOptions helps with the in-memory size. But then I get errors when 
>> looking to join tables together later (due to functionality to unify 
>> dictionaries not being implemented yet). Is that something that will be 
>> added? How about the ability to provide a user dictionary that should be 
>> used in the encoding (as 

Re: [C++] [Windows] Building arrow minimal build sample on Windows

2022-11-08 Thread Weston Pace
I took a look at your code and I don't see anything wrong with it.  I
was able to compile it and run it on my Ubuntu desktop and didn't get
any errors (other than the expected error if the CSV file wasn't
placed where it could be found).  So I suspect this may be related to
your setup steps or Windows.  I'll try to run your example with
Windows soon.

On Tue, Nov 8, 2022 at 1:17 PM Raghavendra Prasad  wrote:
>
> Hi,
>
> Just checking if anyone can point me in the right direction here.
>
> Thanks
> Prasad
>
> On Mon, 7 Nov 2022 at 2:30 pm Raghavendra Prasad  wrote:
>>
>> Hi,
>>
>> Thanks for responding.  Attached are the files & below are the steps I took 
>> to build & run it.  Hopefully my issue can be spotted.
>>
>> Downloaded & installed arrow-cpp 9.0.0 using miniconda
>> Added C:\Users\\Miniconda3\Library\bin to path.  (or else it 
>> complains arrow.dll is not found)
>> Downloaded arrow source code & switched to the maint-9.0.0 branch  (to get 
>> hold of the  conda_env_cpp.txt)
>> Launched Visual Studio 2019 x64 Developer tools command prompt
>> Created a new conda environment using conda_env_cpp.txt from maint-9.0.0 
>> branch:conda create -y -n arrow-9-test 
>> --file=C:\Repos\arrow\ci\conda_env_cpp.txt
>> Activated the new environment
>> Navigated to miminal_build source folder
>> Ran:  cmake -G "Visual Studio 16 2019" CMakeLists.txt.  It ran successfully 
>> as below
>> -- Selecting Windows SDK version 10.0.19041.0 to target Windows 10.0.19044.
>> -- The C compiler identification is MSVC 19.29.30143.0
>> -- The CXX compiler identification is MSVC 19.29.30143.0
>> -- Detecting C compiler ABI info
>> -- Detecting C compiler ABI info - done
>> -- Check for working C compiler: C:/Program Files (x86)/Microsoft Visual 
>> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x64/cl.exe - 
>> skipped
>> -- Detecting C compile features
>> -- Detecting C compile features - done
>> -- Detecting CXX compiler ABI info
>> -- Detecting CXX compiler ABI info - done
>> -- Check for working CXX compiler: C:/Program Files (x86)/Microsoft Visual 
>> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x64/cl.exe - 
>> skipped
>> -- Detecting CXX compile features
>> -- Detecting CXX compile features - done
>> -- Arrow version: 9.0.0
>> -- Arrow SO version: 900.0.0
>> -- Configuring done
>> -- Generating done
>> -- Build files have been written to: C:/Temp/arrow/minimal_build
>> Ran: cmake --build .  It ran successfully as below
>> Microsoft (R) Build Engine version 16.11.2+f32259642 for .NET Framework
>> Copyright (C) Microsoft Corporation. All rights reserved.
>>
>>   Checking Build System
>>   Building Custom Rule C:/Temp/arrow/minimal_build/CMakeLists.txt
>>   example.cc
>>   arrow-example.vcxproj -> 
>> C:\Temp\arrow\minimal_build\Debug\arrow-example.exe
>>   Building Custom Rule C:/Temp/arrow/minimal_build/CMakeLists.txt
>> Placed test.csv in the Debug folder
>> Ran: Debug\arrow-example.exe.  Failed as below.
>> (arrow-9-test) c:\Temp\arrow\minimal_build>Debug\arrow-example.exe
>> * Reading CSV file 
>> 'C:\Repos\arrow\cpp\examples\minimal_build\Debug\test.csv' into table
>>  ☺  ♫▼║♫ ┤  ═!©☺L═!T
>>
>> Regards
>>
>> Prasad
>>
>>
>> On Mon, Nov 7, 2022 at 2:06 AM Weston Pace  wrote:
>>>
>>> That's pretty odd.  Are you able to share a full copy of your test program?
>>>
>>> On Sun, Nov 6, 2022 at 3:46 AM Raghavendra Prasad  
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Thanks for the quick reply.   res.status().ToString() results in an 
>>> > access violation:
>>> >
>>> > Exception thrown at 0x7FF68F936C3C in arrow-example.exe: 0xC005: 
>>> > Access violation reading location 0x.
>>> >
>>> > Regards
>>> > Prasad
>>> >
>>> >
>>> > On Mon, Nov 7, 2022 at 12:43 AM Weston Pace  wrote:
>>> >>
>>> >> Can you try res.status().ToString() ?
>>> >>
>>> >> On Sun, Nov 6, 2022, 5:31 AM Raghavendra Prasad  
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> Hope you are all well!
>>> >>>
>>> >>> I am struggling to get the minimal sample app to work & am hoping for 
>>> >>> tips to move forward
>>>

Re: [C++] [Windows] Building arrow minimal build sample on Windows

2022-11-06 Thread Weston Pace
That's pretty odd.  Are you able to share a full copy of your test program?

On Sun, Nov 6, 2022 at 3:46 AM Raghavendra Prasad  wrote:
>
> Hi,
>
> Thanks for the quick reply.   res.status().ToString() results in an access 
> violation:
>
> Exception thrown at 0x7FF68F936C3C in arrow-example.exe: 0xC005: 
> Access violation reading location 0x.
>
> Regards
> Prasad
>
>
> On Mon, Nov 7, 2022 at 12:43 AM Weston Pace  wrote:
>>
>> Can you try res.status().ToString() ?
>>
>> On Sun, Nov 6, 2022, 5:31 AM Raghavendra Prasad  wrote:
>>>
>>> Hi,
>>>
>>> Hope you are all well!
>>>
>>> I am struggling to get the minimal sample app to work & am hoping for tips 
>>> to move forward
>>>
>>> Setup: Windows 10, Visual Studio 2019, Arrow 9.0.0 installed via miniconda.
>>>
>>> Thanks to the previous tip, I can now successfully build the arrow 
>>> minimal_build example.   However when I try to run the sample it always 
>>> fails at the arrow::io::ReadableFile::Open(csv_filename) step.   My code 
>>> snippet is below & trying to debug, the returned status code does not seem 
>>> useful.  Appreciate any guidance to understand what is wrong.
>>>
>>>   const char* csv_filename = 
>>> "C:\\Repos\\arrow\\cpp\\examples\\minimal_build\\Debug\\test.csv";
>>>   auto res = arrow::io::ReadableFile::Open(csv_filename);  // full path to 
>>> existing file
>>>   if (!res.ok()) {
>>> std::cout << res.status().IsIOError();  // returns 0
>>> std::cout << res.status().IsUnknownError();  // returns 0
>>> std::cout << res.status().detail(); // returns 00
>>> std::cout << res.status().CodeAsString();   // causes app to crash with 
>>> access violation
>>> std::cout << res.status().message(); // returns gibberish
>>>   }
>>>
>>> Regards
>>> Prasad
>>>
>>>
>>> On Fri, Nov 4, 2022 at 7:06 PM Raghavendra Prasad  
>>> wrote:
>>>>
>>>> Thank you and have a great weekend
>>>>
>>>> On Fri, 4 Nov 2022 at 6:25 pm Sutou Kouhei  wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> > Just wondering if you know when arrow-cpp v10.0.0 packages will be
>>>>> > available via conda mechanism?
>>>>>
>>>>> This is work in progress. Please watch this pull request:
>>>>> https://github.com/conda-forge/arrow-cpp-feedstock/pull/866
>>>>>
>>>>>
>>>>> Thanks,
>>>>> --
>>>>> kou
>>>>>
>>>>> In 
>>>>>   "Re: [C++] [Windows] Building arrow minimal build sample on Windows" on 
>>>>> Fri, 4 Nov 2022 18:14:01 +1100,
>>>>>   Raghavendra Prasad  wrote:
>>>>>
>>>>> > Hi kou,
>>>>> >
>>>>> > Thanks for the quick reply, that seems to have worked & I can build fine
>>>>> > now!   I have run into other issues, but at least I can progress now.
>>>>> >
>>>>> > Just wondering if you know when arrow-cpp v10.0.0 packages will be
>>>>> > available via conda mechanism?
>>>>> >
>>>>> > Regards
>>>>> > Prasad
>>>>> >
>>>>> >
>>>>> > On Fri, Nov 4, 2022 at 3:43 PM Sutou Kouhei  wrote:
>>>>> >
>>>>> >> Hi,
>>>>> >>
>>>>> >> Could you use "arrow_shared" instead of
>>>>> >> "Arrow::arrow_shared" instead? "Arrow::arrow_shared" is
>>>>> >> available since Apache Arrow 10.0.0.
>>>>> >>
>>>>> >> FYI: "arrow_shared" is still available with Apache Arrow
>>>>> >> 10.0.0 to keep backward compatibility.
>>>>> >>
>>>>> >>
>>>>> >> Thanks,
>>>>> >> --
>>>>> >> kou
>>>>> >>
>>>>> >> In 
>>>>> >>   "[C++] [Windows] Building arrow minimal build sample on Windows" on 
>>>>> >> Fri,
>>>>> >> 4 Nov 2022 09:03:44 +1100,
>>>>> >>   Raghavendra Prasad  wrote:
>>>>> >>
>>>>> >> >

Re: [C++] [Windows] Building arrow minimal build sample on Windows

2022-11-06 Thread Weston Pace
Can you try res.status().ToString() ?

On Sun, Nov 6, 2022, 5:31 AM Raghavendra Prasad  wrote:

> Hi,
>
> Hope you are all well!
>
> I am struggling to get the minimal sample app to work & am hoping for tips
> to move forward
>
> *Setup*: Windows 10, Visual Studio 2019, Arrow 9.0.0 installed via
> miniconda.
>
> Thanks to the previous tip, I can now successfully build the arrow
> minimal_build
>  
> example.
>  However when I try to run the sample it always fails at
> the arrow::io::ReadableFile::Open(csv_filename) step.   My code snippet is
> below & trying to debug, the returned status code does not seem useful.
> Appreciate any guidance to understand what is wrong.
>
>   const char* csv_filename =
> "C:\\Repos\\arrow\\cpp\\examples\\minimal_build\\Debug\\test.csv";
>   auto res = arrow::io::ReadableFile::Open(csv_filename);  // full path to
> existing file
>   if (!res.ok()) {
> std::cout << res.status().IsIOError();  // returns 0
> std::cout << res.status().IsUnknownError();  // returns 0
> std::cout << res.status().detail(); // returns 00
> std::cout << res.status().CodeAsString();   // causes app to crash
> with access violation
> std::cout << res.status().message(); // returns gibberish
>   }
>
> Regards
> Prasad
>
>
> On Fri, Nov 4, 2022 at 7:06 PM Raghavendra Prasad 
> wrote:
>
>> Thank you and have a great weekend
>>
>> On Fri, 4 Nov 2022 at 6:25 pm Sutou Kouhei  wrote:
>>
>>> Hi,
>>>
>>> > Just wondering if you know when arrow-cpp v10.0.0 packages will be
>>> > available via conda mechanism?
>>>
>>> This is work in progress. Please watch this pull request:
>>> https://github.com/conda-forge/arrow-cpp-feedstock/pull/866
>>>
>>>
>>> Thanks,
>>> --
>>> kou
>>>
>>> In 
>>>   "Re: [C++] [Windows] Building arrow minimal build sample on Windows"
>>> on Fri, 4 Nov 2022 18:14:01 +1100,
>>>   Raghavendra Prasad  wrote:
>>>
>>> > Hi kou,
>>> >
>>> > Thanks for the quick reply, that seems to have worked & I can build
>>> fine
>>> > now!   I have run into other issues, but at least I can progress now.
>>> >
>>> > Just wondering if you know when arrow-cpp v10.0.0 packages will be
>>> > available via conda mechanism?
>>> >
>>> > Regards
>>> > Prasad
>>> >
>>> >
>>> > On Fri, Nov 4, 2022 at 3:43 PM Sutou Kouhei 
>>> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Could you use "arrow_shared" instead of
>>> >> "Arrow::arrow_shared" instead? "Arrow::arrow_shared" is
>>> >> available since Apache Arrow 10.0.0.
>>> >>
>>> >> FYI: "arrow_shared" is still available with Apache Arrow
>>> >> 10.0.0 to keep backward compatibility.
>>> >>
>>> >>
>>> >> Thanks,
>>> >> --
>>> >> kou
>>> >>
>>> >> In <
>>> ca+203qvpmgltghzz9jrusb3hng81gabnz_52-6_8s48satn...@mail.gmail.com>
>>> >>   "[C++] [Windows] Building arrow minimal build sample on Windows" on
>>> Fri,
>>> >> 4 Nov 2022 09:03:44 +1100,
>>> >>   Raghavendra Prasad  wrote:
>>> >>
>>> >> > Hello everyone,
>>> >> >
>>> >> > I am exploring usage of Apache Arrow specifically usage form Visual
>>> >> Studio
>>> >> > (VS2019) compiled C++ programs on my Windows 10 machine.
>>> >> >
>>> >> > I have Visual Studio 2019 installed already.   I wanted to simply
>>> use
>>> >> pre-build
>>> >> > binaries, so I installed Arrow 9.0.0 using miniconda:  conda install
>>> >> > arrow-cpp=9.0.* -c conda-forge.  (9.0.0 was the latest package I
>>> can find
>>> >> > there).   The install was successful.
>>> >> >
>>> >> > I now wanted to build the arrow minimal_build example & am failing
>>> at
>>> >> multiple
>>> >> > attempts.  Will gratefully accept any guidance to get this working!
>>> >> >
>>> >> > C:\Repos\arrow\cpp\examples\minimal_build> cmake CMakeLists.txt
>>> >> > which immediately failed with:
>>> >> >
>>> >> > C:\Repos\arrow\cpp\examples\minimal_build>cmake CMakeLists.txt
>>> >> > -- Selecting Windows SDK version 10.0.19041.0 to target Windows
>>> >> 10.0.19044.
>>> >> > -- Arrow version: 9.0.0
>>> >> > -- Arrow SO version: 900.0.0
>>> >> > -- Configuring done
>>> >> > CMake Error at CMakeLists.txt:40 (add_executable):
>>> >> >   Target "arrow-example" links to target "Arrow::arrow_shared" but
>>> the
>>> >> target
>>> >> >   was not found.  Perhaps a find_package() call is missing for an
>>> >> IMPORTED
>>> >> >   target, or an ALIAS target is missing?
>>> >> >
>>> >> > I next activated arrow-dev as per Developing on Windows & ran the
>>> same
>>> >> command.
>>> >> >
>>> >> > conda create -y -n arrow-dev --file=ci\conda_env_cpp.txt  ==>
>>> successful
>>> >> > conda activate arrow-dev ==> successful
>>> >> > (arrow-dev) C:\Repos\arrow\cpp\examples\minimal_build>cmake
>>> >> cmakelists.txt  ==>
>>> >> > failed
>>> >> > -- Selecting Windows SDK version 10.0.19041.0 to target Windows
>>> >> 10.0.19044.
>>> >> > -- The C compiler identification is MSVC 19.29.30143.0
>>> >> > -- The CXX compiler identification is MSVC 19.29.30143.0
>>> >> > -- Detecting C compiler ABI info
>>> >> > -- 

Re: is Scanner Filter support expression cloumn_name != NULL?

2022-10-24 Thread Weston Pace
To check for null you can use the `is_null` function:

```
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds

tab = pa.Table.from_pydict({"x": [1, 2, 3, None], "y": ["a", "b", "c",
"d"]})
filtered = ds.dataset(tab).to_table(filter=pc.is_null(pc.field("x")))
print(filtered)
```

Does that help?

On Mon, Oct 24, 2022 at 8:02 AM 1057445597 <1057445...@qq.com> wrote:

> my Filter Expression:
> expression->ToString() get this result:(predict_model != null[string])
> That's how I got this expression:
>
> auto null_expr = arrow::compute::Expression(MakeNullScalar(arrow::utf8
> ()));
> call(not_equal(field_ref("predict_model"), null_expr))
>
> I then use this expression to filter, but end up with an empty batch
>
> if (!dataset()->filter_.empty()) {
> auto scanner_builder =
> arrow::dataset::ScannerBuilder::FromRecordBatchReader(
> batch_reader);
> scanner_builder->Filter(dataset()->filter_expr_);
> auto scanner_result = scanner_builder->Finish();
> if (!scanner_result.ok()) {
> res = errors::Internal(scanner_result.status().ToString());
> break;
> }
> auto scanner = scanner_result.ValueOrDie();
> auto batch_reader_result = scanner->ToRecordBatchReader();
> if (!batch_reader_result.ok()) {
> res = errors::Internal(batch_reader_result.status().ToString());
> break;
> }
> batch_reader = batch_reader_result.ValueOrDie();
> }
>
> arrow_status = batch_reader->ReadNext();
>
> batch == nullptr
>
> Is there any other way to filter out things that are not null?
>
>
>
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>


Re: Arrow IPC files - number of rows

2022-10-20 Thread Weston Pace
It's not in the footer metadata but each record batch should have its
own metadata the batch's metadata should contain the # of rows.  So
you should be able to do it without reading any data.  In pyarrow,
this *should* be what count_rows is doing but it has been a while
since I've really dove into that code and I may be remembering
incorrectly.

Can you use a MessageReader[1]?  I have not used it myself.  I don't
actually know if it will read the buffer data as well or just the
metadata.

[1] 
https://arrow.apache.org/docs/python/generated/pyarrow.ipc.MessageReader.html#pyarrow.ipc.MessageReader

On Thu, Oct 20, 2022 at 9:14 AM Quentin Lhoest  wrote:
>
> Hi everyone ! I was wondering:
> What is the most efficient way to know the number of rows in dataset of Arrow 
> IPC files ?
>
> I expected each file to have the number of rows as metadata in the footer, 
> but it doesn’t seem to be the case. Therefore I need to call count_rows() 
> which is less efficient than reading metadata.
>
> Maybe the number of row can be written as custom_metadata in the footer, but 
> the writing/reading custom_metadata functions don’t seem to be exposed in 
> python - if I’m not mistaken.
>
> Thanks in advance :)
>
> --
> Quentin


Re: [Python] - Dataset API - What's happening under the hood?

2022-09-20 Thread Weston Pace
ro copy with the IPC format.

Process A memory maps a file.
Process A populates that region of memory with a table it generates in some way.
Process A sends a control signal to process B that the table is ready.
Process B memory maps the same file (we know it will be in the kernel
page cache because we just used this RAM to write to it).
Process B operates on the data in some way.

On Tue, Sep 20, 2022 at 4:46 PM Nikhil Makan  wrote:
>
> Hi Weston, thanks for the response!
>
> > I would say that this is always a problem.  In the datasets API the
> goal is to maximize the resource usage within a single process.  Now,
> it may be a known or expected problem :)
>
> The dataset API still makes use of multiple cores though correct?
> How does this then relate to the filesystems interface and native support for 
> HDFS, GCFS and S3. Do these exhibit the same issue? Further to this are per 
> my earlier discussions on this thread we are unable to do partial reads of a 
> blob in Azure, I wanted to know if that is possible with any of the other 
> three that have native support. i.e. can we filter the data downloaded from 
> these instead of downloading everything and then filtering?
>
> > I think the benefits of memory mapping are rather subtle and often
> misleading.  Datasets can make use of memory mapping for local
> filesystems.  Doing so will, at best, have a slight performance
> benefit (avoiding a memcpy) but would most likely decrease performance
> (by introducing I/O where it is not expected) and it will have no
> effect whatsoever on the amount of RAM used.
>
> I don't think I quite follow this. Happy to be pointed to some documentation 
> to read more on this by the way. I thought the basic idea behind memory 
> mapping is that the data structure has the same representation on disk as it 
> does in memory therefore allowing it to not consume additional memory when 
> reading it, which is typical with normal I/O operations with reading files. 
> So would the dataset API process multiple files potentially quicker without 
> memory mapping. Also correct me if I am wrong, but memory mapping is related 
> to the ipc format only, formats such as parquet cannot take advantage of this?
>
> Kind regards
> Nikhil Makan
>
>
> On Tue, Sep 20, 2022 at 5:12 AM Weston Pace  wrote:
>>
>> Sorry for the slow reply.
>>
>> > This could be something on the Azure side but I find I am being 
>> > bottlenecked on the download speed and have noticed if I spin up multiple 
>> > Python sessions (or in my case interactive windows) I can increase my 
>> > throughput. Hence I can download each year of the taxinyc dataset in 
>> > separate interactive windows and increase my bandwidth consumed.
>>
>> I would say that this is always a problem.  In the datasets API the
>> goal is to maximize the resource usage within a single process.  Now,
>> it may be a known or expected problem :)
>>
>> > Does the Dataset API make use of memory mapping? Do I have the correct 
>> > understanding that memory mapping is only intended for dealing with large 
>> > data stored on a local file system. Where as data stored on a cloud file 
>> > system in the feather format effectively cannot be memory mapped?
>>
>> I think the benefits of memory mapping are rather subtle and often
>> misleading.  Datasets can make use of memory mapping for local
>> filesystems.  Doing so will, at best, have a slight performance
>> benefit (avoiding a memcpy) but would most likely decrease performance
>> (by introducing I/O where it is not expected) and it will have no
>> effect whatsoever on the amount of RAM used.
>>
>> > This works as well as noted previosuly, so I assume the python operators 
>> > are mapped across similar to what happens when you use the operators 
>> > against a numpy or pandas series it just executes a np.multiply or pd. 
>> > multiply in the background.
>>
>> Yes.  However the functions that get mapped can sometimes be
>> surprising.  Specifically, logical operations map to the _kleene
>> variation and arithmetic maps to the _checked variation.  You can find
>> the implementation at [1].  For multiplication this boils down to:
>>
>> ```
>> @staticmethod
>> cdef Expression _expr_or_scalar(object expr):
>> if isinstance(expr, Expression):
>> return ( expr)
>> return ( Expression._scalar(expr))
>>
>> ...
>>
>> def __mul__(Expression self, other):
>> other = Expression._expr_or_scalar(other)
>> return Expression._call("multiply_checked", [self, other])
>> ```
>>
>&g

Re: [c++][compute]Is there any other way to use Join besides Acero?

2022-09-20 Thread Weston Pace
Thanks for the detailed reproducer.  I've added a few notes on the JIRA
that I hope will help.

On Tue, Sep 20, 2022, 5:10 AM 1057445597 <1057445...@qq.com> wrote:

> I re-uploaded a copy of the code that can be compiled and run in
> join_test.zip, including cmakelists.txt, the test data files and the Python
> code that generated the test files. There is also Python code to view the
> data files. You will need to compile Arrow 9.0 yourself.
>
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>
>
> -- 原始邮件 --
> *发件人:* "user" <1057445...@qq.com>;
> *发送时间:* 2022年9月15日(星期四) 晚上10:27
> *收件人:* "user";
> *主题:* 回复: [c++][compute]Is there any other way to use Join besides Acero?
>
> this jira
>
> https://issues.apache.org/jira/browse/ARROW-17740
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>
>
> -- 原始邮件 --
> *发件人:* "user" ;
> *发送时间:* 2022年9月15日(星期四) 中午12:15
> *收件人:* "user";
> *主题:* Re: [c++][compute]Is there any other way to use Join besides Acero?
>
> Within Arrow-C++ that is the only way I am aware of.  You might be able to
> use DuckDb.  It should be able to scan parquet files.
>
> Is this the same program that you shared before?  Were you able to figure
> out threading?  Can you create a JIRA with some sample input files and a
> reproducible example?
>
> On Wed, Sep 14, 2022 at 5:14 PM 1057445597 <1057445...@qq.com> wrote:
>
>> Acero performs poorly, and coredump occurs frequently!
>>
>> In the scenario I'm working on, I'll read one Parquet file and then
>> several other Parquet files. These files will have the same column name
>> (UUID). I need to join (by UUID), project (remove UUID), and filter (some
>> custom filtering) the results of the two reads. I found that Acero could
>> only be used to do join, but when I tested it, Acero performance was very
>> poor and very unstable, coredump often happened. Is there another way? Or
>> just another way to do a join!
>>
>>
>> --
>> 1057445597
>> 1057445...@qq.com
>>
>> 
>>
>>
>


Re: [compute] limit push-down and nested types

2022-09-20 Thread Weston Pace
> However, I'm wondering if there's a better path to integrating that more 
> "natively" into Arrow. Happy to make contributions if that's an option.

I'm not quite sure I understand where Arrow integration comes into
play here.  Would that scanner use Arrow internally?  Or would you
only convert the output to Arrow?

If you're planning on using Arrow's scanner internally then I think
there are improvements that could be made to add support for partially
reading a dataset.  There are a few tentative explorations here (e.g.
I think marsupialtail has been looking at slicing fragments and there
is some existing Spark work with slicing fragments).

If you're only converting the output to Arrow then I don't think there
is any special interoperability concerns.

What does `scanner` return?  Does it return a table?  Or an iterator?

> it's hard to express nested field references (i have to use a non-public 
> pc.Expression._nested_field and convert list-of-struct to struct-of-list)

There was some work recently to add better support for nested field references:

```
>>> import pyarrow as pa
>>> import pyarrow.dataset as ds

>>> points = [{'x': 7, 'y': 10}, {'x': 12, 'y': 44}]
>>> table = pa.Table.from_pydict('points': points)
>>> ds.write_dataset(table, '/tmp/my_dataset', format='parquet')

>>> ds.dataset('/tmp/my_dataset').to_table(columns={'x': ds.field('points', 
>>> 'x')}, filter=(ds.field('points', 'y') > 20))
pyarrow.Table
x: int64

x: [[12]]
```

> the compute operations are not implemented for List arrays.

What sorts of operations would you like to see on list arrays?  Can
you give an example?

On Tue, Sep 20, 2022 at 10:13 AM Chang She  wrote:
>
> Hi there,
>
> We're creating a new columnar data format for computer vision with Arrow 
> integration as a first class citizen (github.com/eto-ai/lance). It 
> significantly outperforms parquet in a variety of computer vision workloads.
>
> Question 1:
>
> Because vision data tends to be large-ish blobs, we want to be very careful 
> about how much data is being retrieved. So we want to be able to push-down 
> limit/offset when it's appropriate to support data exploration queries (e.g., 
> "show me page 2 of N images that meet these filtering criteria"). For now 
> we've implemented our own Dataset/Scanner subclass to support these extra 
> options.
>
> Example:
>
> ```python
> lance.dataset(uri).scanner(limit=10, offset=20)
> ```
>
> And the idea is that it would only retrieve those rows from disk.
>
> However, I'm wondering if there's a better path to integrating that more 
> "natively" into Arrow. Happy to make contributions if that's an option.
>
>
> Question 2:
>
> In computer vision we're often dealing with deeply nested data types (e.g., 
> for object detection annotations that has a list of labels, bounding boxes, 
> polygons, etc for each image). Lance supports efficient filtering scans on 
> these list-of-struct columns, but a) it's hard to express nested field 
> references (i have to use a non-public pc.Expression._nested_field and 
> convert list-of-struct to struct-of-list), and b) the compute operations are 
> not implemented for List arrays.
>
> Any guidance/thoughts on how y'all are thinking about efficient compute on 
> nested data?
>
> Thanks!
>
> Chang
>


Re: [Python] - Dataset API - What's happening under the hood?

2022-09-19 Thread Weston Pace
Sorry for the slow reply.

> This could be something on the Azure side but I find I am being bottlenecked 
> on the download speed and have noticed if I spin up multiple Python sessions 
> (or in my case interactive windows) I can increase my throughput. Hence I can 
> download each year of the taxinyc dataset in separate interactive windows and 
> increase my bandwidth consumed.

I would say that this is always a problem.  In the datasets API the
goal is to maximize the resource usage within a single process.  Now,
it may be a known or expected problem :)

> Does the Dataset API make use of memory mapping? Do I have the correct 
> understanding that memory mapping is only intended for dealing with large 
> data stored on a local file system. Where as data stored on a cloud file 
> system in the feather format effectively cannot be memory mapped?

I think the benefits of memory mapping are rather subtle and often
misleading.  Datasets can make use of memory mapping for local
filesystems.  Doing so will, at best, have a slight performance
benefit (avoiding a memcpy) but would most likely decrease performance
(by introducing I/O where it is not expected) and it will have no
effect whatsoever on the amount of RAM used.

> This works as well as noted previosuly, so I assume the python operators are 
> mapped across similar to what happens when you use the operators against a 
> numpy or pandas series it just executes a np.multiply or pd. multiply in the 
> background.

Yes.  However the functions that get mapped can sometimes be
surprising.  Specifically, logical operations map to the _kleene
variation and arithmetic maps to the _checked variation.  You can find
the implementation at [1].  For multiplication this boils down to:

```
@staticmethod
cdef Expression _expr_or_scalar(object expr):
if isinstance(expr, Expression):
return ( expr)
return ( Expression._scalar(expr))

...

def __mul__(Expression self, other):
other = Expression._expr_or_scalar(other)
return Expression._call("multiply_checked", [self, other])
```


On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka  wrote:
>
> Re 2.   In Python Azure SDK there is logic for partial blob read:
>
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
>
> However I was unable to use it as it does not support parquet files
> with decimal columns and these are the ones I have.
>
> BR
>
> J
>
> pt., 16 wrz 2022 o 02:26 Aldrin  napisał(a):
> >
> > For Question 2:
> > At a glance, I don't see anything in adlfs or azure that is able to do 
> > partial reads of a blob. If you're using block blobs, then likely you would 
> > want to store blocks of your file as separate blocks of a blob, and then 
> > you can do partial data transfers that way. I could be misunderstanding the 
> > SDKs or how Azure stores data, but my guess is that a whole blob is 
> > retrieved and then the local file is able to support partial, block-based 
> > reads as you expect from local filesystems. You may be able to double check 
> > how much data is being retrieved by looking at where adlfs is mounting your 
> > blob storage.
> >
> > For Question 3:
> > you can memory map remote files, it's just that every page fault will be 
> > even more expensive than for local files. I am not sure how to tell the 
> > dataset API to do memory mapping, and I'm not sure how well that would work 
> > over adlfs.
> >
> > For Question 4:
> > Can you try using `pc.scalar(1000)` as shown in the first code excerpt in 
> > [1]:
> >
> > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> > >> pc.multiply(x, y)
> > 
> >
> > [1]: 
> > https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> >
> > Aldrin Montana
> > Computer Science PhD Student
> > UC Santa Cruz
> >
> >
> > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan  wrote:
> >>
> >> Hi There,
> >>
> >> I have been experimenting with Tabular Datasets for data that can be 
> >> larger than memory and had a few questions related to what's going on 
> >> under the hood and how to work with it (I understand it is still 
> >> experimental).
> >>
> >> Question 1: Reading Data from Azure Blob Storage
> >> Now I know the filesystems don't fully support this yet, but there is an 
> >> fsspec compatible library (adlfs) which is shown in the file system 
> >> example which I have used. Example below with the nyc taxi dataset, where 
> >> I am pulling the whole dataset through and writing to disk to the feather 
> >> format.
> >>
> >> import adlfs
> >> import pyarrow.dataset as ds
> >>
> >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> >>
> >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
> >>
> >> scanner = dataset.scanner()
> >> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
> >>
> >> This could be something on the Azure side but I find I am being 
> >> bottlenecked on 

Re: [c++][compute]Is there any other way to use Join besides Acero?

2022-09-14 Thread Weston Pace
Within Arrow-C++ that is the only way I am aware of.  You might be able to
use DuckDb.  It should be able to scan parquet files.

Is this the same program that you shared before?  Were you able to figure
out threading?  Can you create a JIRA with some sample input files and a
reproducible example?

On Wed, Sep 14, 2022 at 5:14 PM 1057445597 <1057445...@qq.com> wrote:

> Acero performs poorly, and coredump occurs frequently!
>
> In the scenario I'm working on, I'll read one Parquet file and then
> several other Parquet files. These files will have the same column name
> (UUID). I need to join (by UUID), project (remove UUID), and filter (some
> custom filtering) the results of the two reads. I found that Acero could
> only be used to do join, but when I tested it, Acero performance was very
> poor and very unstable, coredump often happened. Is there another way? Or
> just another way to do a join!
>
>
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>


Re: [C++][Python] Recommend way to just read several rows from Parquet

2022-09-06 Thread Weston Pace
Setting the batch size will not have too much of an impact on the
amount of memory used.  That is mostly controlled by I/O readahead
(e.g. how many record batches to read at once).  The readahead
settings are not currently exposed to pyarrow although a PR was
recently merged[1] that should make this available in 10.0.0.

OOM when reading a single 2GB parquet file seems kind of extreme.  How
much RAM is available on the system?  Do you know if the parquet file
has some very compressive encodings (e.g. dictionary encoding with
long strings or run length encoding with long runs)?

> I am confused and what to know is the underlying behavior in C++ Arrow 
> Parquet reader, when setting batch_size to be small?

Basically the readahead tries to keep some number of rows in flight.
If the batches are small then it tries to run lots of rows at once.
If the batches are large then it will only run a few rows at once.  So
yes, extremely small batches will incur a lot of overhead, both in
terms of RAM and compute.

> My end goal is to sample just a few rows (~5 rows) from any Parquet file, to 
> estimate in-memory data size of the whole file, based on sampled rows.

I'm not sure 5 rows will be enough for this.  However, one option
might be to just read in a single row group (assuming the file has
multiple row groups).

One last idea might be to disable pre-buffering.  Pre-buffering is
currently using too much RAM on file reads[2].  You could also try
setting use_legacy_dataset to True.  The legacy reader isn't quite so
aggressive with readahead and might use less RAM.  However, I still
don't think you'll be able to do better than reading a single row
group.

[1] https://github.com/apache/arrow/pull/13799
[2] https://issues.apache.org/jira/browse/ARROW-17599

On Fri, Sep 2, 2022 at 8:32 AM Cheng Su  wrote:
>
> Hello,
>
> I am using PyArrow, and encountering an OOM issue when reading the Parquet 
> file. My end goal is to sample just a few rows (~5 rows) from any Parquet 
> file, to estimate in-memory data size of the whole file, based on sampled 
> rows.
>
> We tried the following approaches:
> * `to_batches(batch_size=5)` - 
> https://arrow.apache.org/docs/python/generated/pyarrow.dataset.FileSystemDataset.html#pyarrow.dataset.FileSystemDataset.to_batches
> * `head(num_rows=5, batch_size=5)` - 
> https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.head
>
> But with both approaches, we encountered OOM issues when just reading 5 rows 
> several times from ~2GB Parquet file.Then we tried 
> `to_batches(batch_size=10)`, and it works fine without OOM issue.
>
> I am confused and what to know is the underlying behavior in C++ Arrow 
> Parquet reader, when setting batch_size to be small? I guess there might be 
> some exponential overhead associated with batch_size when its value is small.
>
> Thanks,
> Cheng Su


Re: [C++] How often does Parquet StreamWriter flush to disk?

2022-08-26 Thread Weston Pace
> Does that align with your understanding? If so, then wouldn't the 
> MaxRowGroupSize affect memory usage when writing?

Not really.  I would expect the writer to write a column to disk as
soon as it has accumulated enough data to fill a data page (if not
sooner).  I'm not sure why it would need to buffer up an entire row
group before it starts writing to disk.

> Would an int32 rowgroup have less data pages than an int64 rowgroup?

Yes, each column could have a different number of data pages.

P.S.

As I've recently been reminded, a write is only pushing data from the
process' RSS space into the kernel's page cache.  So if you are
writing a lot of data quickly you may see the system's available RAM
decrease because the page cache is filling up (even though the
process' RSS space remains small).

On Fri, Aug 26, 2022 at 7:48 AM Arun Joseph  wrote:
>
> Hi Weston,
>
> From my understanding, if I'm writing out multiple Gbs of data to disk via 
> StreamWriter, and if the MaxRowGroupSize is defaulted to 512MB, my 
> hypothetical memory usage should be (assuming a single writer):
>
> ~= buffer[read data size] + ~MaxRowGroupSize + SUM(# of RowGroups * 
> SizeOf(Row Group Metadata))
>
> Does that align with your understanding? If so, then wouldn't the 
> MaxRowGroupSize affect memory usage when writing?
>
> Also, while not directly related to what we've been discussing, your 
> explanation of the data pages did raise another question. Since column data 
> types can be different data types, and data pages are a fixed size (e.g. 
> default 1MB), how do mixed-size tables work w.r.t written data pages? Would 
> an int32 rowgroup have less data pages than an int64 rowgroup?
>
> Thank You,
> Arun
>
> On Fri, Aug 26, 2022 at 10:35 AM Weston Pace  wrote:
>>
>> If your goal is to save memory when writing then I wouldn't expect the
>> MaxRowGroupSize to have much effect actually.  However, I have not
>> really studied the parquet writer in depth, so this is theoretical
>> based on the format.
>>
>> Columns in a parquet file are written in row groups, which has a
>> length (# of rows) that all the column chunks in the row group match
>> (i.e. if the row group has a length of 1Mi rows then each column chunk
>> will have 1Mi items).  However, each column chunk is written as a
>> series of data pages.  Data pages are indivisible, so a writer may
>> need to accumulate an entire page's worth of data to persist it to the
>> disk (although, if using a streaming compression algorithm, perhaps
>> this is not required).  Even if this is required a data page is
>> usually quite small.  I believe Arrow defaults a data page to 1MiB.
>> So, at most, I would expect a writer to have to accumulate ~
>> data_pagesize * # columns of RAM.
>>
>> However, I believe the writer also accumulates all row group metadata
>> in memory as well.  I could be wrong on this (perhaps it was the
>> reader) and I don't recall if this is strictly needed (e.g. to
>> populate the footer) or if it is more of a convenience.  This metadata
>> should generally be pretty small but if you shrink the row group size
>> significantly then you might actually see more RAM usage by the
>> parquet writer.
>>
>> If your goal is to save memory when reading then the row group size
>> might matter, depending on how you read the parquet file.  For
>> example, the most common way to read a parquet file in C++-arrow and
>> pyarrow is to read an entire row group all at once.  There are no
>> utilities in pyarrow to read part of a row group or individual data
>> pages, I'm not sure if there are any in C++-arrow or not.  There could
>> be, and I would very much like to see such readers exist someday.  I
>> believe parquet-mr (the Java parquet reader) supports this.  As a
>> result, if a reader doing streaming processing has to read in an
>> entire row group's worth of data, then the row group size will play a
>> large role in how much RAM that streaming reader requires.
>>
>> On Fri, Aug 26, 2022 at 7:05 AM Arun Joseph  wrote:
>> >
>> > Hi Weston,
>> >
>> > Thank you for the clarification! The default 512MB, and the slightly 
>> > smaller writes align with what I've been seeing and after using 
>> > SetMaxRowGroupSize to change the MaxRowGroupSize, I am seeing the expected 
>> > behavior with smaller values.
>> >
>> > In terms of the implications of setting a smaller value for the 
>> > MaxRowGroupSize, is it mainly the increased number of syscalls required to 
>> > persist to disk, or is there anything else that would be a side effect?
>> >
>> > I am pa

Re: [C++] How often does Parquet StreamWriter flush to disk?

2022-08-26 Thread Weston Pace
If your goal is to save memory when writing then I wouldn't expect the
MaxRowGroupSize to have much effect actually.  However, I have not
really studied the parquet writer in depth, so this is theoretical
based on the format.

Columns in a parquet file are written in row groups, which has a
length (# of rows) that all the column chunks in the row group match
(i.e. if the row group has a length of 1Mi rows then each column chunk
will have 1Mi items).  However, each column chunk is written as a
series of data pages.  Data pages are indivisible, so a writer may
need to accumulate an entire page's worth of data to persist it to the
disk (although, if using a streaming compression algorithm, perhaps
this is not required).  Even if this is required a data page is
usually quite small.  I believe Arrow defaults a data page to 1MiB.
So, at most, I would expect a writer to have to accumulate ~
data_pagesize * # columns of RAM.

However, I believe the writer also accumulates all row group metadata
in memory as well.  I could be wrong on this (perhaps it was the
reader) and I don't recall if this is strictly needed (e.g. to
populate the footer) or if it is more of a convenience.  This metadata
should generally be pretty small but if you shrink the row group size
significantly then you might actually see more RAM usage by the
parquet writer.

If your goal is to save memory when reading then the row group size
might matter, depending on how you read the parquet file.  For
example, the most common way to read a parquet file in C++-arrow and
pyarrow is to read an entire row group all at once.  There are no
utilities in pyarrow to read part of a row group or individual data
pages, I'm not sure if there are any in C++-arrow or not.  There could
be, and I would very much like to see such readers exist someday.  I
believe parquet-mr (the Java parquet reader) supports this.  As a
result, if a reader doing streaming processing has to read in an
entire row group's worth of data, then the row group size will play a
large role in how much RAM that streaming reader requires.

On Fri, Aug 26, 2022 at 7:05 AM Arun Joseph  wrote:
>
> Hi Weston,
>
> Thank you for the clarification! The default 512MB, and the slightly smaller 
> writes align with what I've been seeing and after using SetMaxRowGroupSize to 
> change the MaxRowGroupSize, I am seeing the expected behavior with smaller 
> values.
>
> In terms of the implications of setting a smaller value for the 
> MaxRowGroupSize, is it mainly the increased number of syscalls required to 
> persist to disk, or is there anything else that would be a side effect?
>
> I am particularly interested in keeping my memory usage down, so I'm trying 
> to get a better sense of the memory "landscape" that parquet utilizes. Once 
> the row group is persisted to disk, the space that the row group previously 
> utilized in memory should be freed for use once more right?
>
> Thank You,
> Arun Joseph
>
> On Fri, Aug 26, 2022 at 9:36 AM Weston Pace  wrote:
>>
>> The constant DEFAULT_MAX_ROW_GROUP_LENGTH is for
>> parquet::WriterProperties::max_row_group_length and the unit here is #
>> of rows.  This is used by parquet::ParquetFileWriter.  The
>> parquet::StreamWriter class wraps an instance of a file writer and
>> adds the property MaxRowGroupSize.  This units for MaxRowGroupSize is
>> indeed bytes.
>>
>> The max_row_group_length property is only applied when calling
>> ParquetFileWriter::WriteTable.  The stream writer operates at a lower
>> level and never calls this method.  So the stream writer should never
>> be affected by the max_row_group_length property.
>>
>> One thing to keep in mind is that MaxRowGroupSize is an estimate only.
>> With certain encodings it can be rather difficult to know ahead of
>> time how many bytes you will end up writing unless you separate the
>> encoding step from the write step (which would require an extra memcpy
>> I think).  In practice I think the estimators are conservative so you
>> will usually end up with something slightly smaller than 512MB.  If it
>> is significantly smaller you may need to investigate how effective
>> your encodings are and see if that is the cause.
>>
>> On Fri, Aug 26, 2022 at 4:51 AM Arun Joseph  wrote:
>> >
>> > Hi all,
>> >
>> > My understanding of the StreamWriter class is that it would persist Row 
>> > Groups to disk once they exceed a certain size. In the documentation, it 
>> > seems like this size is 512MB, but if I look at 
>> > arrow/include/parquet/properties.h, the DEFAULT_MAX_ROW_GROUP_LENGTH seems 
>> > to be 64MB. Is this reset to 512MB elsewhere? My parquet version is
>> >
>> > #define CREATED_BY_VERSION "parquet-cpp-arrow version 9.0.0-SNAPSHOT
>> >
>> > Thank You,
>> > Arun Joseph
>
>
>
> --
> Arun Joseph
>


Re: [C++] How often does Parquet StreamWriter flush to disk?

2022-08-26 Thread Weston Pace
The constant DEFAULT_MAX_ROW_GROUP_LENGTH is for
parquet::WriterProperties::max_row_group_length and the unit here is #
of rows.  This is used by parquet::ParquetFileWriter.  The
parquet::StreamWriter class wraps an instance of a file writer and
adds the property MaxRowGroupSize.  This units for MaxRowGroupSize is
indeed bytes.

The max_row_group_length property is only applied when calling
ParquetFileWriter::WriteTable.  The stream writer operates at a lower
level and never calls this method.  So the stream writer should never
be affected by the max_row_group_length property.

One thing to keep in mind is that MaxRowGroupSize is an estimate only.
With certain encodings it can be rather difficult to know ahead of
time how many bytes you will end up writing unless you separate the
encoding step from the write step (which would require an extra memcpy
I think).  In practice I think the estimators are conservative so you
will usually end up with something slightly smaller than 512MB.  If it
is significantly smaller you may need to investigate how effective
your encodings are and see if that is the cause.

On Fri, Aug 26, 2022 at 4:51 AM Arun Joseph  wrote:
>
> Hi all,
>
> My understanding of the StreamWriter class is that it would persist Row 
> Groups to disk once they exceed a certain size. In the documentation, it 
> seems like this size is 512MB, but if I look at 
> arrow/include/parquet/properties.h, the DEFAULT_MAX_ROW_GROUP_LENGTH seems to 
> be 64MB. Is this reset to 512MB elsewhere? My parquet version is
>
> #define CREATED_BY_VERSION "parquet-cpp-arrow version 9.0.0-SNAPSHOT
>
> Thank You,
> Arun Joseph


Re: value_counts after group_by

2022-08-26 Thread Weston Pace
I'm happy to spread the word.  The thanks here go to Eduardo Ponce,
Aldrin Montana, and the various reviewers who have all worked hard to
create this doc.

On Fri, Aug 26, 2022 at 6:05 AM Suresh V  wrote:
>
> Hi Weston
>
> Thanks a lot for the response. I tried the list approach a while back to get 
> the group keys in this fashion and run parallel computation at group level 
> and the performance penalty for the dataset of 50m rows was way too high(2s 
> vs 8s).
>
> Thanks a lot for the awesome initiative of teaching people how to create new 
> kernels. This PR is what I was looking for and helps alleviate the learning 
> curve.
>
> Thanks
>
>
>
> On Thu, Aug 25, 2022, 8:45 PM Weston Pace  wrote:
>>
>> > Is there a way to get value_counts of a given column after doing table 
>> > group_by?
>>
>> Is your goal to group by some key and then get the value counts of an
>> entirely different non-key column?  If so, then no, not today, at
>> least not directly.  The only group by node we have is a hash-group-by
>> and this can only accept "hash aggregate functions".  These are
>> defined in [1] and value_counts does not have a "hash aggregate"
>> variant but it does seem like it would make sense.
>>
>> Indirectly, you can use the "list" aggregate function as a sort of escape 
>> hatch:
>>
>> ```
>> import pyarrow as pa
>> import pyarrow.compute as pc
>>
>> tab = pa.Table.from_pydict({
>> 'state': ['Washington', 'Washington', 'Colorado', 'Colorado', 
>> 'Colorado'],
>> 'city': ['Seattle', 'Seattle', 'Denver', 'Colorado Springs', 'Denver'],
>> 'temp': [70, 75, 83, 89, 94]
>> })
>>
>> grouped = pa.TableGroupBy(tab, 'state').aggregate([('city', 'list')])
>> print(grouped)
>>
>> # pyarrow.Table
>> # city_list: list
>> #   child 0, item: string
>> # state: string
>> # 
>> # city_list: [[["Seattle","Seattle"],["Denver","Colorado Springs","Denver"]]]
>> # state: [["Washington","Colorado"]]
>> ```
>>
>> You could then use a for-loop to walk through each cell of city_list
>> and run value_counts on that array.
>>
>> > If its not possible, can you please point me the relevant cpp/python files 
>> > I need to modify for this to work?
>>
>> You would need to create a "hash aggregate function" for value_counts
>> (it would presumably be called hash_value_counts to match the existing
>> pattern).  The starting point for understanding such functions would
>> probably be [2].  Each hash-aggregate kernel consists of 5 different
>> functions (init, resize, consume, merge, and finalize) that you will
>> need to provide.  You can use any of the other hash_* functions as
>> examples for how you might implement these.  Basically, these
>> functions take in a column of values and a column of ids and they
>> update some kind of running state (one per thread).  At the end of the
>> stream the various thread states are merged together and the finalize
>> function turns this final state into an output array.
>>
>> Work is underway on a guide to help with authoring new kernel
>> functions.  The current PR for this guide can be found at [3].
>>
>> [1] 
>> https://arrow.apache.org/docs/cpp/compute.html#grouped-aggregations-group-by
>> [2] 
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernel.h#L678
>> [3] https://github.com/apache/arrow/pull/13933
>>
>> On Thu, Aug 25, 2022 at 10:26 AM Suresh V  wrote:
>> >
>> > Hi,
>> >
>> > Is there a way to get value_counts of a given column after doing table 
>> > group_by?
>> >
>> > If its not possible, can you please point me the relevant cpp/python files 
>> > I need to modify for this to work?
>> >
>> > Thanks
>> >
>> >


Re: value_counts after group_by

2022-08-25 Thread Weston Pace
> Is there a way to get value_counts of a given column after doing table 
> group_by?

Is your goal to group by some key and then get the value counts of an
entirely different non-key column?  If so, then no, not today, at
least not directly.  The only group by node we have is a hash-group-by
and this can only accept "hash aggregate functions".  These are
defined in [1] and value_counts does not have a "hash aggregate"
variant but it does seem like it would make sense.

Indirectly, you can use the "list" aggregate function as a sort of escape hatch:

```
import pyarrow as pa
import pyarrow.compute as pc

tab = pa.Table.from_pydict({
'state': ['Washington', 'Washington', 'Colorado', 'Colorado', 'Colorado'],
'city': ['Seattle', 'Seattle', 'Denver', 'Colorado Springs', 'Denver'],
'temp': [70, 75, 83, 89, 94]
})

grouped = pa.TableGroupBy(tab, 'state').aggregate([('city', 'list')])
print(grouped)

# pyarrow.Table
# city_list: list
#   child 0, item: string
# state: string
# 
# city_list: [[["Seattle","Seattle"],["Denver","Colorado Springs","Denver"]]]
# state: [["Washington","Colorado"]]
```

You could then use a for-loop to walk through each cell of city_list
and run value_counts on that array.

> If its not possible, can you please point me the relevant cpp/python files I 
> need to modify for this to work?

You would need to create a "hash aggregate function" for value_counts
(it would presumably be called hash_value_counts to match the existing
pattern).  The starting point for understanding such functions would
probably be [2].  Each hash-aggregate kernel consists of 5 different
functions (init, resize, consume, merge, and finalize) that you will
need to provide.  You can use any of the other hash_* functions as
examples for how you might implement these.  Basically, these
functions take in a column of values and a column of ids and they
update some kind of running state (one per thread).  At the end of the
stream the various thread states are merged together and the finalize
function turns this final state into an output array.

Work is underway on a guide to help with authoring new kernel
functions.  The current PR for this guide can be found at [3].

[1] https://arrow.apache.org/docs/cpp/compute.html#grouped-aggregations-group-by
[2] 
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernel.h#L678
[3] https://github.com/apache/arrow/pull/13933

On Thu, Aug 25, 2022 at 10:26 AM Suresh V  wrote:
>
> Hi,
>
> Is there a way to get value_counts of a given column after doing table 
> group_by?
>
> If its not possible, can you please point me the relevant cpp/python files I 
> need to modify for this to work?
>
> Thanks
>
>


Re: Ask some questions about Acero

2022-08-22 Thread Weston Pace
Well, all code has potential bugs :).  We generally do not lower our
quality standards for experimental code.  So it should still be unit
tested, reviewed, etc.  The term is primarily meant to indicate that the
interface could be modified later.  Acero, in particular, has quite a few
tests in place.  There are some known bugs, the most critical of which is
probably a potential crash-on-shutdown bug, which is being worked on.

On Mon, Aug 22, 2022 at 7:56 AM Aldrin  wrote:

> I think it means both
>
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Mon, Aug 22, 2022 at 2:53 AM 1057445597 <1057445...@qq.com> wrote:
>
>> I noticed that “Acero is experimental and a stable API is not yet
>> guaranteed”
>>
>> Does this mean that Acero's interface could be modified later, or are
>> there potential bugs in the functionality?
>>
>>
>> --
>> 1057445597
>> 1057445...@qq.com
>>
>> 
>>
>>
>


Re: why these two expression not equal

2022-08-18 Thread Weston Pace
I think it is a case of "and" vs. "and_kleene".  Also note that `ToString`
has special cases for the _kleene variants which is why you see "(a and b)"
vs. "and(a, b)".

`and_(field_ref("a"), field_ref("b"))` yields `call("and_kleene",
{field_ref("a"), field_ref("b")})`

On Thu, Aug 18, 2022 at 12:01 AM 1057445597 <1057445...@qq.com> wrote:

> EXPECT_EQ(and_(field_ref("a"), field_ref("b")),
> call("and", {field_ref("a"), field_ref("b")}));
>
>
> Expected equality of these values: and_(field_ref("a"), field_ref("b"))
> Which is: (a and b) call("and", {field_ref("a"), field_ref("b")}) Which is:
> and(a, b)
>
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>


Re: Issue filtering partitioned Parquet files on partition keys using PyArrow

2022-08-05 Thread Weston Pace
Oh!  I didn't know that.  Thanks.  I think this suggests even more that the
problem is with the discovery taking too long and not with the filter being
applied but I suppose we should benchmark both at some point.

On Thu, Aug 4, 2022 at 5:01 PM David Li  wrote:

> FWIW, we _should_ already perform the "subtree" filtering (see
> subtree_internal.h [1]) so either it's not the bottleneck or the
> optimization is not as effective as we would like. Or possibly we need to
> maintain the files as the tree in the first place instead of trying to
> recover the structure later [2].
>
> [1]:
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/subtree_internal.h
> [2]:
> https://github.com/apache/arrow/blob/60a1919527003a55a39785633bdbbbeef412c362/cpp/src/arrow/dataset/file_base.cc#L207-L268
>
> On Thu, Aug 4, 2022, at 17:30, Weston Pace wrote:
>
> Awesome.
>
> # Partitioning (src/arrow/dataset/partition.h)
>
> The first spot to look at might be to understand the Partitioning class.
> A Partitioning (e.g. hive partitioning, directory partitioning, filename
> partitioning) has two main methods that convert between a path (e.g.
> "/state_code=11/city_code=106/chunk-0.parquet") and an expression (e.g.
> field_ref("state_code") == 11 && field_ref("city_code") == 106).
>
> virtual Result Parse(const std::string& path) const =
> 0;
> virtual Result Format(const compute::Expression&
> expr) const = 0;
>
> We use expressions instead of something simpler like a dictionary of
> key/value pairs.  I believe the intention was to leave the door opening for
> unique partitionings that might map something like "temp=medium" to an
> expression like "30 < field_ref("temp") < 60" but in practice these
> expressions are always a collection of equality expressions and'd together.
>
> One thing I'm fairly certain should work, but we might want to verify (and
> potentially add unit tests), is that hive & directory partitioning can
> correctly convert directories (e.g. "/state_code=11") into expressions and
> that we aren't relying on full paths.
>
> # Discovery (src/arrow/dataset/discovery.h)
>
> The next class is DatasetFactory and, specifically, we are probably
> interested in FileSystemDatasetFactory.  The FileSystemDatasetFactory class
> scans a directory, discovering files.  The FileSystemDatasetFactory can
> also discover a partitioning.  This is all usually transparent in pyarrow
> but there are bindings and you can use this class explicitly:
>
> import pyarrow.dataset as ds
> import pyarrow.fs as fs
> local_fs = fs.LocalFileSystem()
> format = ds.ParquetFileFormat()
> opts =
> ds.FileSystemFactoryOptions(partitioning=ds.HivePartitioning.discover())
> factory = ds.FileSystemDatasetFactory(local_fs,
> fs.FileSelector('/tmp/my_dataset', recursive=True), format, opts)
> sch = factory.inspect()
> my_dataset = factory.finish(sch)
> print(my_dataset.partitioning.schema)
> # state_code: int32
> # city_code: int32
>
> Although, in practice, we usually would write something much shorter:
>
> import pyarrow.dataset as ds
> my_dataset = ds.dataset('/tmp/my_dataset', partitioning='hive') # This one
> line will expand to everything above
>
> If, however, we want to apply a filter while we are discovering the
> dataset, then we cannot rely on dataset discovery to also discover our
> partition.  We will have to specify it manually (or you could run dataset
> discovery once, discover the partitioning, and then run it many times using
> this discovered partition).  So I think, in the end, we want to be able to
> support something like this:
>
> import pyarrow.dataset as ds
> partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
> pa.int32()), pa.field('city_code', pa.int32())]))
> filter = (ds.field('state_code') == 31) & (ds.field('city_code') == 6200)
> ds = ds.dataset('/tmp/my_dataset', partitioning=partitioning,
> filter=filter)
>
> This will require making changes to FileSystemDatasetFactory.  We will
> want to add "filter" to FileSystemFactoryOptions...
>
> struct FileSystemFactoryOptions {
>
>   PartitioningOrFactory partitioning{Partitioning::Default()};
>   std::string partition_base_dir;
>   bool exclude_invalid_files = false;
>   std::vector selector_ignore_prefixes = {".", "_",};
>   // `compute::literal(true)` is often used as a "default" value for a
> filter
>   compute::Expression filter = compute::literal(true);
> };
>
> * If a filter is specified (is not literal(true)) then partitioning MUST
> be a partitioning (and not a partitioning factory).
>
>

Re: Issue filtering partitioned Parquet files on partition keys using PyArrow

2022-08-04 Thread Weston Pace
te_code in range(100):
for city_code in range(100):
pq_dir = os.path.join(tempdir, f'state_code={state_code}',
f'city_code={city_code}')
os.makedirs(pq_dir)
pq_path = os.path.join(pq_dir, 'chunk-0.parquet')
pq.write_table(simple_table, pq_path)

start = time.time()
partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
pa.int32()), pa.field('city_code', pa.int32())]))
my_dataset = ds.dataset(tempdir, partitioning=partitioning)
end = time.time()
print(f'Elapsed: {end - start}s')

# Approach 2, faster filtering

There is an entirely different approach that could be taken which wouldn't
speed up the discovery at all, but should speed up the filtering.  In this
approach you could modify FileSystemDataset so that, instead of storing a
flat list of FileFragment objects, it stored a tree of FileFragment
objects.  Each node of the tree would have its own partitioning
expression.  Then, GetFragments(predicate) could walk the tree (in DFS
order), skipping entire nodes that fail the predicate.

On Thu, Aug 4, 2022 at 9:54 AM Tomaz Maia Suller 
wrote:

> Weston, I'm interested in following up.
>
>
> --
> *De:* Weston Pace 
> *Enviado:* quinta-feira, 4 de agosto de 2022 12:15
> *Para:* user@arrow.apache.org 
> *Assunto:* Re: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
> Você não costuma receber emails de weston.p...@gmail.com. Saiba por que
> isso é importante <https://aka.ms/LearnAboutSenderIdentification>
> *[EXTERNAL EMAIL]*
>
> There is a lot of room for improvement here.  In the datasets API the call
> that you have described (read_parquet) is broken into two steps:
>
>  * dataset discovery
>
> During dataset discovery we don't use any partition filter.  The goal is
> to create the "total dataset" of all the files.  So in your case this means
> listing out all 150,120 directories.  For every file we discover we capture
> a partition expression for this file.  This is probably where the bulk of
> time is being spent (listing the directories).
>
>  * dataset read
>
> During the dataset read we apply the partition filter.  So we are going to
> iterate through all ~150k files and compare the filter expression with the
> previously captured partition expression, eliminating files that don't
> match.  In this phase we don't have any idea of the original directory
> structure.  So instead of performing 27 top-level comparisons + 5560
> second-level comparisons we end up having to calculate all 150k comparisons.
>
> Both of these steps are considerably longer than they need to be.  If I
> were to guess I would guess that a majority of the time is spent in that
> first step but I don't think the time spent in that second step is
> negligible.
>
> One fairly straightforward solution would be to allow the partition filter
> to be used during dataset discovery.  This would yield a much smaller
> dataset so step 2 would be much faster but it could also allow the
> discovery process to skip entire directories.  If anyone is interested in
> working on a fix for this I'd be happy to point them at the files that will
> need to be changed and go into a more detailed discussion of potential
> solutions.
>
>
> On Thu, Aug 4, 2022 at 5:53 AM Tomaz Maia Suller 
> wrote:
>
> Hi David,
>
> I wonder if the problem with the attachments has to do with the files not
> having extensions... I'm trying to send them with .prof this time.
>
> Anyway:
>
>1. I'm writing to a local filesystem; I've mounted a NFTS partition
>which is on a HDD. Since the dataset is only ~1.5 GB, I'll try to move it
>to the SSD I have available and see if I get lower access times.
>2. I'm using trying to use ParquetDataset; though I'm using it
>directly most of the time, i.e. I'm using Pandas which then itself uses (if
>I understood it correctly) ParquetDataset.
>
> I've tried accessing with both the legacy and new versions of the API,
> according to that use_legacy_dataset parameter. The legacy API is
> significantly faster, with access time of about 1 second, though still
> ridiculously slow compared to accessing the path straight away.
>
> If the attachments still don't work for some reason, I'll write up what I
> ran:
>
> >>> pq_command_new = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=False)"
> >>> pq_command_old = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=True)"
> >>> pq_baseline = "pq.ParquetDataset('./state_code=31/city_code=6200')"
>
> >>> cPro

Re: Issue filtering partitioned Parquet files on partition keys using PyArrow

2022-08-04 Thread Weston Pace
There is a lot of room for improvement here.  In the datasets API the call
that you have described (read_parquet) is broken into two steps:

 * dataset discovery

During dataset discovery we don't use any partition filter.  The goal is to
create the "total dataset" of all the files.  So in your case this means
listing out all 150,120 directories.  For every file we discover we capture
a partition expression for this file.  This is probably where the bulk of
time is being spent (listing the directories).

 * dataset read

During the dataset read we apply the partition filter.  So we are going to
iterate through all ~150k files and compare the filter expression with the
previously captured partition expression, eliminating files that don't
match.  In this phase we don't have any idea of the original directory
structure.  So instead of performing 27 top-level comparisons + 5560
second-level comparisons we end up having to calculate all 150k comparisons.

Both of these steps are considerably longer than they need to be.  If I
were to guess I would guess that a majority of the time is spent in that
first step but I don't think the time spent in that second step is
negligible.

One fairly straightforward solution would be to allow the partition filter
to be used during dataset discovery.  This would yield a much smaller
dataset so step 2 would be much faster but it could also allow the
discovery process to skip entire directories.  If anyone is interested in
working on a fix for this I'd be happy to point them at the files that will
need to be changed and go into a more detailed discussion of potential
solutions.


On Thu, Aug 4, 2022 at 5:53 AM Tomaz Maia Suller 
wrote:

> Hi David,
>
> I wonder if the problem with the attachments has to do with the files not
> having extensions... I'm trying to send them with .prof this time.
>
> Anyway:
>
>1. I'm writing to a local filesystem; I've mounted a NFTS partition
>which is on a HDD. Since the dataset is only ~1.5 GB, I'll try to move it
>to the SSD I have available and see if I get lower access times.
>2. I'm using trying to use ParquetDataset; though I'm using it
>directly most of the time, i.e. I'm using Pandas which then itself uses (if
>I understood it correctly) ParquetDataset.
>
> I've tried accessing with both the legacy and new versions of the API,
> according to that use_legacy_dataset parameter. The legacy API is
> significantly faster, with access time of about 1 second, though still
> ridiculously slow compared to accessing the path straight away.
>
> If the attachments still don't work for some reason, I'll write up what I
> ran:
>
> >>> pq_command_new = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=False)"
> >>> pq_command_old = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=True)"
> >>> pq_baseline = "pq.ParquetDataset('./state_code=31/city_code=6200')"
>
> >>> cProfile.run(pq_command_new, '/tmp/pq_legacy_false.prof')
> This took about 17 seconds.
>
> >>> cProfile.run(pq_command_old, '/tmp/pq_legacy_true.prof')
> This took about 1 second.
>
> >>> cProfile.run(pq_baseline, '/tmp/pq_legacy_true.prof')
> This took 0.0075 second.
>
> These runs were all after the first run after booting up the computer,
> which took over 500 seconds as I've said.
>
> I'm starting to think I should send this to the development mailing list
> rather than the user one, since the obvious solution is specifying the
> paths directly rather than trying to use the API.
> --
> *De:* Lee, David 
> *Enviado:* quarta-feira, 3 de agosto de 2022 19:49
> *Para:* user@arrow.apache.org 
> *Assunto:* RE: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
> Você não costuma receber emails de david@blackrock.com. Saiba por que
> isso é importante 
> *[EXTERNAL EMAIL]*
>
> The attachments didn’t come through properly..
>
>
>
> I’ve got additional questions.
>
>
>
>1. What filesystem are these files stored on? I’ve seen issues using
>S3 if HEAD operations aren’t prioritized. I’m assuming that without HEAD
>operations you can’t effectively scan a parquet file’s footer and reading
>the entire file isn’t efficient.
>
>
>
> *available (eventual consistency for HEAD operations)* Behaves the same
> as the “read-after-new-write” consistency level, but only provides eventual
> consistency for HEAD operations. Offers higher availability for HEAD
> operations than “read-after-new-write” if Storage Nodes are unavailable.
> Differs from AWS S3 consistency guarantees for HEAD operations only.
>
>
> 2.Are you using pyarrow.parquet.ParquetDataset or pyarrow.dataset?
>
> https://arrow.apache.org/docs/python/parquet.html
>
> *Note*
>
> The ParquetDataset is being reimplemented based on the new generic Dataset
> API (see the Tabular Datasets

Re: [C++][arrow::compute::Expression]Is there a way to construct arrow::compute::Expression from the evaluated Expression string?

2022-07-18 Thread Weston Pace
No.  There is no text format for expressions or related parser.  I believe
the Substrait project still would like to invent a text format someday and
expressions would need to be a part of that so I think there is some
interest.

In the meantime a PR here (or at the Substrait project to align on spec)
would be welcome.

As a workaround there are C++ utilities for creating expressions:

call("subtract", {field_ref("a210"), call("multiply", {call("divide",
{field_ref("a210"), literal(203)}), literal(203)})})

There is also Isthmus which can go from SQL -> Substrait and then you we
could pretty easily expose Substrait expression parsing as its own
standalone thing (returning arrow::compute::Expression)

On Mon, Jul 18, 2022 at 3:52 AM 1057445597 <1057445...@qq.com> wrote:

> I noticed that the class arrow::compute::Expression have a ToString member
> function, is there good way from a string mathematical Expression to
> construct a arrow::compute::Expression?
>
> (subtract(a210, multiply(divide(a210, 203), 203)) == 0)
>
> --
> 1057445597
> 1057445...@qq.com
>
> 
>
>


Re: [Python] Cast decimal to string

2022-07-11 Thread Weston Pace
I've added [1].  I agree, it should be a fairly easy fix, but requires
understanding where all the casting code lives.
arrow/compute/kernels/scalar_cast_string.cc would be a good place to
start if anyone is interested.  We have decimal->string methods in
arrow/util/decimal.h which can be used.
[1] https://issues.apache.org/jira/browse/ARROW-17042

On Mon, Jul 11, 2022 at 6:58 AM Wes McKinney  wrote:
>
> Would someone like to open a Jira issue about this? This seems like an
> easy rough edge to fix
>
> On Wed, Jul 6, 2022 at 12:44 PM Weston Pace  wrote:
> >
> > If precision is not important you can cast the column to float64 first.
> >
> > >>> x = pa.array([1, 2, 3], type=pa.decimal128(6, 1))
> > >>> x.cast(pa.float64()).cast(pa.string())
> > 
> > [
> >   "1",
> >   "2",
> >   "3"
> > ]
> >
> > If precision is important you could use python or pandas to do the
> > conversion to string.
> >
> > >>> pa.array([str(v) for v in x.to_pylist()])
> > 
> > [
> >   "1.0",
> >   "2.0",
> >   "3.0"
> > ]
> >
> > On Wed, Jul 6, 2022 at 5:45 AM François Pacull
> >  wrote:
> > >
> > > Dear Arrow team and users, I have a simple question regarding the decimal 
> > > data type with pyarrow. I am trying to cast a table with decimal columns 
> > > to string, or to write it to a csv file. In both cases I get the error 
> > > message:
> > >
> > > pyarrow.lib.ArrowNotImplementedError: Unsupported cast from 
> > > decimal128(18, 9) to utf8 using function cast_string
> > >
> > > I understand that is not implemented yet, but is there by chance a way to 
> > > get around this?
> > > Thanks, François.
> > >
> > > PS: I am using Python : 3.9.13 & pyarrow : 8.0.0
> > > Here is a code snippet:
> > >
> > > import decimal
> > >
> > > import pyarrow as pa
> > > import pyarrow.compute as pc
> > > import pyarrow.csv
> > >
> > > PREC, SCAL = 18, 9  # decimal precision & scale
> > >
> > > context = decimal.getcontext()
> > > context.prec = PREC
> > > ref_decimal = decimal.Decimal('0.123456789')
> > >
> > > float_numbers = [0.1, 654.5, 4.65742]
> > > decimal_numbers = [
> > > decimal.Decimal(str(f)).quantize(ref_decimal) for f in float_numbers
> > > ]
> > >
> > > pa_arr_dec = pa.array(
> > > decimal_numbers, type=pa.decimal128(precision=PREC, scale=SCAL)
> > > )
> > > pa_arr_str = pc.cast(pa_arr_dec, pa.string())
> > >
> > >
> > >   Traceback (most recent call last):
> > > File "/home/francois/Workspace/.../scripts/pyarrow_decimal.py", line 
> > > 21, in 
> > >   pa_arr_str = pc.cast(pa_arr_dec, pa.string())
> > > File 
> > > "/home/francois/miniconda3/envs/tableau2/lib/python3.9/site-packages/pyarrow/compute.py",
> > >  line 376, in cast
> > >   return call_function("cast", [arr], options)
> > > File "pyarrow/_compute.pyx", line 542, in 
> > > pyarrow._compute.call_function
> > > File "pyarrow/_compute.pyx", line 341, in 
> > > pyarrow._compute.Function.call
> > > File "pyarrow/error.pxi", line 144, in 
> > > pyarrow.lib.pyarrow_internal_check_status
> > > File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
> > >   pyarrow.lib.ArrowNotImplementedError: Unsupported cast from 
> > > decimal128(18, 9) to utf8 using function cast_string


Re: [Python] Cast decimal to string

2022-07-06 Thread Weston Pace
If precision is not important you can cast the column to float64 first.

>>> x = pa.array([1, 2, 3], type=pa.decimal128(6, 1))
>>> x.cast(pa.float64()).cast(pa.string())

[
  "1",
  "2",
  "3"
]

If precision is important you could use python or pandas to do the
conversion to string.

>>> pa.array([str(v) for v in x.to_pylist()])

[
  "1.0",
  "2.0",
  "3.0"
]

On Wed, Jul 6, 2022 at 5:45 AM François Pacull
 wrote:
>
> Dear Arrow team and users, I have a simple question regarding the decimal 
> data type with pyarrow. I am trying to cast a table with decimal columns to 
> string, or to write it to a csv file. In both cases I get the error message:
>
> pyarrow.lib.ArrowNotImplementedError: Unsupported cast from 
> decimal128(18, 9) to utf8 using function cast_string
>
> I understand that is not implemented yet, but is there by chance a way to get 
> around this?
> Thanks, François.
>
> PS: I am using Python : 3.9.13 & pyarrow : 8.0.0
> Here is a code snippet:
>
> import decimal
>
> import pyarrow as pa
> import pyarrow.compute as pc
> import pyarrow.csv
>
> PREC, SCAL = 18, 9  # decimal precision & scale
>
> context = decimal.getcontext()
> context.prec = PREC
> ref_decimal = decimal.Decimal('0.123456789')
>
> float_numbers = [0.1, 654.5, 4.65742]
> decimal_numbers = [
> decimal.Decimal(str(f)).quantize(ref_decimal) for f in float_numbers
> ]
>
> pa_arr_dec = pa.array(
> decimal_numbers, type=pa.decimal128(precision=PREC, scale=SCAL)
> )
> pa_arr_str = pc.cast(pa_arr_dec, pa.string())
>
>
>   Traceback (most recent call last):
> File "/home/francois/Workspace/.../scripts/pyarrow_decimal.py", line 21, 
> in 
>   pa_arr_str = pc.cast(pa_arr_dec, pa.string())
> File 
> "/home/francois/miniconda3/envs/tableau2/lib/python3.9/site-packages/pyarrow/compute.py",
>  line 376, in cast
>   return call_function("cast", [arr], options)
> File "pyarrow/_compute.pyx", line 542, in pyarrow._compute.call_function
> File "pyarrow/_compute.pyx", line 341, in pyarrow._compute.Function.call
> File "pyarrow/error.pxi", line 144, in 
> pyarrow.lib.pyarrow_internal_check_status
> File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
>   pyarrow.lib.ArrowNotImplementedError: Unsupported cast from decimal128(18, 
> 9) to utf8 using function cast_string


Re: Arrow FunctionRegsitry usage in Python

2022-06-23 Thread Weston Pace
> I was wondering if it is possible to add a C++ Function to the Compute 
> FunctionRegistry and then use the functions in python.

Yes.  This is partly handled automatically I believe when functions
are added to the C++ function registry.  There are generally two ways
to call the function.  For example, consider the "take" function:

```
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> arr = pa.array([1, 2, 3])
>>> idxs = pa.array([1, 2])
>>> arr.take(idxs)

[
  2,
  3
]
>>> pc.call_function('take', [arr, idxs])

[
  2,
  3
]
```

The first method (`arr.take(idxs)`) is shorthand for the second.  I
believe these convenience shorthands have to be manually added to
pyarrow for each new function.  The second method (`pc.call_function`)
is more generic and I don't think you need to do anything special to
call a function in this way once it has been added to the function
registry.

> Would be great if you could provide examples of such usage.

Some of these shorthand methods are shown in the cookbook[1].
However, I don't see anything geared towards creating new functions
and calling them.  Can you create a JIRA issue (or, even better, a PR)
describing what you'd like to see?

> Also are all functions added to the FunctionRegistry only callable using the 
> GetFunction API with the function name as string ? Would like to know if 
> there is a way to just do arrow::compute::FuncA where FuncA is the newly 
> added function

The C++ API is similar to python.  Some functions have been manually
given a shortcut.  For example, the Take function has a shortcut
here[2].  However, this does not happen automatically and not all
functions have shortcuts.

[1] https://arrow.apache.org/cookbook/py/data.html
[2] 
https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/compute/api_vector.h#L352

On Tue, Jun 21, 2022 at 6:31 PM Murali S  wrote:
>
> Hi ,
>
> I was wondering if it is possible to add a C++ Function to the Compute 
> FunctionRegistry and then use the functions in python. Would be great if you 
> could provide examples of such usage.
> Also are all functions added to the FunctionRegistry only callable using the 
> GetFunction API with the function name as string ? Would like to know if 
> there is a way to just do arrow::compute::FuncA where FuncA is the newly 
> added function
>
> Thanks in advance
> Mura
>
>


Re: Arrow compute/dataset design doc missing

2022-06-09 Thread Weston Pace
> Hi, do you guys know which functions support vectorized SIMD in arrow compute?

I don't know that anyone has done a fully systematic analysis of which
kernels support and do not support SIMD at the moment.  The kernels
are still in flux.  There is an active effort to reduce overhead[1]
which is the top priority as this could possibly have more impact on
performance than SIMD when running expressions involving multiple
kernels across multiple threads.

> I only found very little functions support vectorized SIMD:
> ● bloom filter: avx2 ● key compare: avx2 ● key hash: avx2 ● key map: avx2
>
> Does scalar operation support vectorized SIMD?

A lack of explicit vectorization instructions does not mean a lack of
SIMD support.  For many kernels we expect modern compilers to be smart
enough to automatically implement vectorization as long as the data is
provided in a vectorized fashion (e.g. columnar) and the kernel is
simple enough.  For more complex kernels there are options such as
xsimd but this hasn't yet been very thoroughly explored.  At the
moment I'm not aware of anyone writing explicitly vectorized kernels
as this tends to be rather hardware specific and have a small return
on investment.  Instead, we benchmark regularly and have
micro-optimized certain critical sections (e.g. some of the hash
stuff).

> I tested with numpy and found arrow is ten times slower:

That result you posted appears to be 3.5x slower.  You might want to
double check and ensure that Arrow was compiled with the appropriate
architecture (the cmake files are generally good at figuring this out)
but I wouldn't be too surprised if this was the case.  Some of this
might be unavoidable.  For example, does numpy support null values (I
don't know for sure but I seem to recall it does not)?  Some of this
might be an inefficiency or overhead problem in Arrow-C++.  It is
possible that the add kernel is not being vectorized correctly by the
compiler but I don't think those numbers alone are enough proof of
that.

Performance can be quite tricky.  It is important for us but Arrow's
compute functionality is still relatively new compared to numpy and
work on performance is balanced with work on features.

[1] https://lists.apache.org/thread/rh10ykcolt0gxydhgv4vxk2m7ktwx5mh

On Wed, Jun 8, 2022 at 11:08 PM Shawn Yang  wrote:
>
> Hi, do you guys know which functions support vectorized SIMD in arrow 
> compute? After a quick look as arrow compute cpp code, I only found very 
> little functions support vectorized SIMD:
> ● bloom filter: avx2 ● key compare: avx2 ● key hash: avx2 ● key map: avx2
>
> Does scalar operation support vectorized SIMD?
>
> I tested with numpy and found arrow is ten times slower:
>
> def test_multiply(rows=500):
> a = pa.array(list(range(rows, 0, -1)))
> b = pa.array(list(range(rows)))
> import pyarrow.compute as pc
>
> print("arrow multiply took", timeit.timeit(
> lambda: pc.multiply(a, b), number=3))
> a = np.array(list(range(rows, 0, -1)))
> b = np.array(list(range(rows)))
> print("numpy multiply took", timeit.timeit(
> lambda: a * b, number=3))
> # arrow multiply took 0.1482605700015
> # numpy multiply took 0.0404705130071
>
>
> On Wed, May 25, 2022 at 10:09 PM Shawn Yang  wrote:
>>
>> I see, the key for multiple loop is to ensure the data can be hold in l2 
>> cache, so that later
>> calculation can process this batch without reading from the main memory, and 
>> we can record the exec stats for every batch , and do better local task 
>> scheduling based on those stats.  Thanks a lot. Morsels is new to me, very 
>> interesting ideas
>> Sent from my iPhone
>>
>> > On May 25, 2022, at 7:23 AM, Weston Pace  wrote:
>> >
>> > There are a few levels of loops.  Two at the moment and three in the
>> > future.  Some are fused and some are not.  What we have right now is
>> > early stages, is not ideal, and there are people investigating and
>> > working on improvements.  I can speak a little bit about where we want
>> > to go.  An example may be helpful.
>> >
>> > For example, given a filter "x < 100 && x > 0" we have something like
>> > (this is an approximation of the work done by
>> > arrow::compute::ExecuteScalarExpression and not actual code):
>> >
>> > ```
>> > for batch_of_128k_rows in data:
>> >auto lt_one_hundred = less_than(batch_of_128k_rows, 100)
>> >auto gt_zero = greater_than(batch_of_128k_rows, 0)
>> >auto filter_pred = and(lt_one_hundred, gt_zero)
>> >consume(filter(batch_of_128k_rows, filter_pred))
>> > ```
>> >
>> > There are two big things 

Re: [Python] Pyarrow Computation inplace?

2022-05-31 Thread Weston Pace
I'd be more interested in some kind of buffer / array pool plus the
ability to specify an output buffer for a kernel function.  I think it
would achieve the same goal (avoiding allocation) with more
flexibility (e.g. you wouldn't have to overwrite your input buffer).

At the moment though I wonder if this is a concern.  Jemalloc should
do some level of memory reuse.  Is there a specific performance issue
you are encountering?

On Tue, May 31, 2022 at 11:45 AM Wes McKinney  wrote:
>
> *In principle*, it would be possible to provide mutable output buffers
> for a kernel's execution, so that input and output buffers could be
> the same (essentially exposing the lower-level kernel execution
> interface that underlies arrow::compute::CallFunction). But this would
> be a fair amount of development work to achieve. If there are others
> interested in exploring an implementation, we could create a Jira
> issue.
>
> On Sun, May 29, 2022 at 3:04 PM Micah Kornfield  wrote:
> >
> > I think even in cython this might be difficult as Array data structures are 
> > generally considered immutable, so this is inherently unsafe, and requires 
> > doing with care.
> >
> > On Sun, May 29, 2022 at 11:21 AM Cedric Yau  wrote:
> >>
> >> Suppose I have an array with 1MM integers and I add 1 to them with 
> >> pyarrow.compute.add.  It looks like a new array is assigned.
> >>
> >> Is there a way to do this inplace?  It looks like a new array is 
> >> allocated.  Would cython be required at this point?
> >>
> >> ```
> >> import pyarrow as pa
> >> import pyarrow.compute as pc
> >>
> >> a = pa.array(range(100))
> >> print(id(a))
> >> a = pc.add(a,1)
> >> print(id(a))
> >>
> >> # output
> >> # 139634974909024
> >> # 139633492705920
> >> ```
> >>
> >> Thanks,
> >> Cedric


Re: cpp Windows 32 build (re-subscribed with my work email)

2022-05-26 Thread Weston Pace
Arrow C++ depends on a number of 3rd party libraries.  Often this is
invisible because, by default, Arrow will do this:

 * Check to see if the library is installed on the system
 * If it is not installed on the system, download and build the library

So for gflags it appears gflags was not installed by the system.
Later, Arrow attempted to build it from source, and it did build
something:

> -- Building gflags from source
> -- Added static library dependency gflags::gflags_static: 
> C:/Users/avertleyb/git/arrow/cpp/build/gflags_ep-prefix/src/gflags_ep/lib/gflags_static.lib

However, this library does not appear to be getting picked up by the
linker.  My guess is that Arrow's "build gflags from source" routine
(build_gflags in ThirdPartyToolchain.cmake) is not passing the correct
flags to build a 32-bit version of gflags.  It would probably be good
to confirm this (I'm not a build expert and would not be surprised if
I'm wrong).

Assuming this is the case, the options at this point would be to fix
the build_gflags function or to build gflags yourself and install it
somewhere that Arrow's build can pick it up.

You could probably file a JIRA on this point but there has not been
much motivation in the past to fix win32 issues.

On Thu, May 26, 2022 at 7:12 AM Arkadiy Vertleyb (BLOOMBERG/ 120 PARK)
 wrote:
>
> Sure it's possible since I don't even know what it is... I probably don't 
> have a 64bit version either.
>
> Is this some kind of pre-requsite?
>
> Thanks,
> Arkadiy
>
> From: user@arrow.apache.org At: 05/26/22 13:07:06 UTC-4:00
> To: Arkadiy Vertleyb (BLOOMBERG/ 120 PARK ) , user@arrow.apache.org
> Subject: Re: cpp Windows 32 build (re-subscribed with my work email)
>
> It appears that it cannot find gflags and that is causing the build to
> fail. Is it possible you do not have a 32bit version of gflags
> installed?
>
> > CMake Warning at cmake_modules/FindgflagsAlt.cmake:25 (find_package):
> > By not providing "Findgflags.cmake" in CMAKE_MODULE_PATH this project has
> > asked CMake to find a package configuration file provided by "gflags", but
> > CMake did not find one.
> >
> > Could not find a package configuration file provided by "gflags" (requested
> > version 2.1.0) with any of the following names:
> >
> > gflagsConfig.cmake
> > gflags-config.cmake
> >
> > Add the installation prefix of "gflags" to CMAKE_PREFIX_PATH or set
> > "gflags_DIR" to a directory containing one of the above files. If "gflags"
> > provides a separate development package or SDK, be sure it has been
> > installed.
> > Call Stack (most recent call first):
> > cmake_modules/ThirdpartyToolchain.cmake:243 (find_package)
> > cmake_modules/ThirdpartyToolchain.cmake:1399 (resolve_dependency)
> > CMakeLists.txt:567 (include)
> > ...
> > LINK : fatal error LNK1181: cannot open input file
> 'C:\Users\avertleyb\git\arrow\cpp\build\gflags_ep-prefix\src\gflags_ep\lib\gflag
> s_static.lib'
> [C:\Users\avertleyb\git\arrow\cpp\build\src\arrow\arrow_bundled_dependencies.vcx
> proj]
> > C:\Program Files (x86)\Microsoft Visual
> Studio\2019\Professional\MSBuild\Microsoft\VC\v160\Microsoft.CppCommon.targets(2
> 41,5): error MSB8066: Custom build for
> 'C:\Users\avertleyb\git\arrow\cpp\build\CMakeFiles\b033194e6d32d6a2595cc88c82
> >
> 72e4b2\arrow_bundled_dependencies.lib.rule;C:\Users\avertleyb\git\arrow\cpp\buil
> d\CMakeFiles\672df30e18a621ddf9c15292835268fd\arrow_bundled_dependencies.rule'
> exited with code 1181.
>
> On Thu, May 26, 2022 at 5:13 AM Arkadiy Vertleyb (BLOOMBERG/ 120 PARK)
>  wrote:
> >
> > Hello Kou.
> >
> > Thanks for answering my original question.
> >
> > The reason I need 32 bit is that our product supports both, so I have to be
> able to do both.
> >
> > Here is what I got during the compile:
> >
> > C:\Users\avertleyb\git\arrow\cpp\build>cmake .. -G "Visual Studio 16 2019" 
> > -A
> Win32 -DARROW_BUILD_TESTS=ON
> > -- Building using CMake version: 3.20.21032501-MSVC_2
> > -- Selecting Windows SDK version 10.0.19041.0 to target Windows 10.0.19042.
> > -- The C compiler identification is MSVC 19.29.30143.0
> > -- The CXX compiler identification is MSVC 19.29.30143.0
> > -- Detecting C compiler ABI info
> > -- Detecting C compiler ABI info - done
> > -- Check for working C compiler: C:/Program Files (x86)/Microsoft Visual
> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x86/cl.exe -
> skipped
> > -- Detecting C compile features
> > -- Detecting C compile features - done
> > -- Detecting CXX compiler ABI info
> > -- Detecting CXX compiler ABI info - done
> > -- Check for working CXX compiler: C:/Program Files (x86)/Microsoft Visual
> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x86/cl.exe -
> skipped
> > -- Detecting CXX compile features
> > -- Detecting CXX compile features - done
> > -- Arrow version: 9.0.0 (full: '9.0.0-SNAPSHOT')
> > -- Arrow SO version: 900 (full: 900.0.0)
> > -- clang-tidy 12 not found
> > -- clang-format 12 not found
> > -- Could NOT find ClangTools (missing: 

Re: cpp Windows 32 build (re-subscribed with my work email)

2022-05-26 Thread Weston Pace
It appears that it cannot find gflags and that is causing the build to
fail.  Is it possible you do not have a 32bit version of gflags
installed?

> CMake Warning at cmake_modules/FindgflagsAlt.cmake:25 (find_package):
> By not providing "Findgflags.cmake" in CMAKE_MODULE_PATH this project has
> asked CMake to find a package configuration file provided by "gflags", but
> CMake did not find one.
>
> Could not find a package configuration file provided by "gflags" (requested
> version 2.1.0) with any of the following names:
>
> gflagsConfig.cmake
> gflags-config.cmake
>
> Add the installation prefix of "gflags" to CMAKE_PREFIX_PATH or set
> "gflags_DIR" to a directory containing one of the above files. If "gflags"
> provides a separate development package or SDK, be sure it has been
> installed.
> Call Stack (most recent call first):
> cmake_modules/ThirdpartyToolchain.cmake:243 (find_package)
> cmake_modules/ThirdpartyToolchain.cmake:1399 (resolve_dependency)
> CMakeLists.txt:567 (include)
> ...
> LINK : fatal error LNK1181: cannot open input file 
> 'C:\Users\avertleyb\git\arrow\cpp\build\gflags_ep-prefix\src\gflags_ep\lib\gflags_static.lib'
>  
> [C:\Users\avertleyb\git\arrow\cpp\build\src\arrow\arrow_bundled_dependencies.vcxproj]
> C:\Program Files (x86)\Microsoft Visual 
> Studio\2019\Professional\MSBuild\Microsoft\VC\v160\Microsoft.CppCommon.targets(241,5):
>  error MSB8066: Custom build for 
> 'C:\Users\avertleyb\git\arrow\cpp\build\CMakeFiles\b033194e6d32d6a2595cc88c82
> 72e4b2\arrow_bundled_dependencies.lib.rule;C:\Users\avertleyb\git\arrow\cpp\build\CMakeFiles\672df30e18a621ddf9c15292835268fd\arrow_bundled_dependencies.rule'
>  exited with code 1181.

On Thu, May 26, 2022 at 5:13 AM Arkadiy Vertleyb (BLOOMBERG/ 120 PARK)
 wrote:
>
> Hello Kou.
>
> Thanks for answering my original question.
>
> The reason I need 32 bit is that our product supports both, so I have to be 
> able to do both.
>
> Here is what I got during the compile:
>
> C:\Users\avertleyb\git\arrow\cpp\build>cmake .. -G "Visual Studio 16 2019" -A 
> Win32 -DARROW_BUILD_TESTS=ON
> -- Building using CMake version: 3.20.21032501-MSVC_2
> -- Selecting Windows SDK version 10.0.19041.0 to target Windows 10.0.19042.
> -- The C compiler identification is MSVC 19.29.30143.0
> -- The CXX compiler identification is MSVC 19.29.30143.0
> -- Detecting C compiler ABI info
> -- Detecting C compiler ABI info - done
> -- Check for working C compiler: C:/Program Files (x86)/Microsoft Visual 
> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x86/cl.exe - 
> skipped
> -- Detecting C compile features
> -- Detecting C compile features - done
> -- Detecting CXX compiler ABI info
> -- Detecting CXX compiler ABI info - done
> -- Check for working CXX compiler: C:/Program Files (x86)/Microsoft Visual 
> Studio/2019/Professional/VC/Tools/MSVC/14.29.30133/bin/Hostx64/x86/cl.exe - 
> skipped
> -- Detecting CXX compile features
> -- Detecting CXX compile features - done
> -- Arrow version: 9.0.0 (full: '9.0.0-SNAPSHOT')
> -- Arrow SO version: 900 (full: 900.0.0)
> -- clang-tidy 12 not found
> -- clang-format 12 not found
> -- Could NOT find ClangTools (missing: CLANG_FORMAT_BIN CLANG_TIDY_BIN)
> -- infer not found
> -- Could NOT find Python3 (missing: Python3_EXECUTABLE Interpreter)
> -- Found cpplint executable at 
> C:/Users/avertleyb/git/arrow/cpp/build-support/cpplint.py
> -- System processor: AMD64
> -- Performing Test CXX_SUPPORTS_AVX2
> -- Performing Test CXX_SUPPORTS_AVX2 - Success
> -- Performing Test CXX_SUPPORTS_AVX512
> -- Performing Test CXX_SUPPORTS_AVX512 - Success
> -- Arrow build warning level: PRODUCTION
> Configured for RELEASE build (set with cmake 
> -DCMAKE_BUILD_TYPE={release,debug,...})
> -- Build Type: RELEASE
> -- Performing Test CXX_LINKER_SUPPORTS_VERSION_SCRIPT
> -- Performing Test CXX_LINKER_SUPPORTS_VERSION_SCRIPT - Failed
> -- Using AUTO approach to find dependencies
> -- ARROW_ABSL_BUILD_VERSION: 20210324.2
> -- ARROW_ABSL_BUILD_SHA256_CHECKSUM: 
> 59b862f50e710277f8ede96f083a5bb8d7c9595376146838b9580be90374ee1f
> -- ARROW_AWSSDK_BUILD_VERSION: 1.8.133
> -- ARROW_AWSSDK_BUILD_SHA256_CHECKSUM: 
> d6c495bc06be5e21dac716571305d77437e7cfd62a2226b8fe48d9ab5785a8d6
> -- ARROW_AWS_CHECKSUMS_BUILD_VERSION: v0.1.12
> -- ARROW_AWS_CHECKSUMS_BUILD_SHA256_CHECKSUM: 
> 394723034b81cc7cd528401775bc7aca2b12c7471c92350c80a0e2fb9d2909fe
> -- ARROW_AWS_C_COMMON_BUILD_VERSION: v0.6.9
> -- ARROW_AWS_C_COMMON_BUILD_SHA256_CHECKSUM: 
> 928a3e36f24d1ee46f9eec360ec5cebfe8b9b8994fe39d4fa74ff51aebb12717
> -- ARROW_AWS_C_EVENT_STREAM_BUILD_VERSION: v0.1.5
> -- ARROW_AWS_C_EVENT_STREAM_BUILD_SHA256_CHECKSUM: 
> f1b423a487b5d6dca118bfc0d0c6cc596dc476b282258a3228e73a8f730422d4
> -- ARROW_BOOST_BUILD_VERSION: 1.75.0
> -- ARROW_BOOST_BUILD_SHA256_CHECKSUM: 
> 267e04a7c0bfe85daf796dedc789c3a27a76707e1c968f0a2a87bb96331e2b61
> -- ARROW_BROTLI_BUILD_VERSION: v1.0.9
> -- ARROW_BROTLI_BUILD_SHA256_CHECKSUM: 
> 

Re: Arrow compute/dataset design doc missing

2022-05-24 Thread Weston Pace
There are a few levels of loops.  Two at the moment and three in the
future.  Some are fused and some are not.  What we have right now is
early stages, is not ideal, and there are people investigating and
working on improvements.  I can speak a little bit about where we want
to go.  An example may be helpful.

For example, given a filter "x < 100 && x > 0" we have something like
(this is an approximation of the work done by
arrow::compute::ExecuteScalarExpression and not actual code):

```
for batch_of_128k_rows in data:
auto lt_one_hundred = less_than(batch_of_128k_rows, 100)
auto gt_zero = greater_than(batch_of_128k_rows, 0)
auto filter_pred = and(lt_one_hundred, gt_zero)
consume(filter(batch_of_128k_rows, filter_pred))
```

There are two big things we need to fix here.  First,
`batch_of_128k_rows` is meant to be some percentage of one thread's
portion of the L3 cache.  This is a good unit of parallelism but it is
not ideal for processing because we'd rather use the L2 cache since we
are making three passes across `batch_of_128k_rows`.  Second, each of
those `auto ... =` lines is allocating new memory.  This is not ideal
because we'd like to avoid excess allocation if possible.

To solve the first problem we are moving towards the "morsel/batch"
model[1].  This means we have two "batch" sizes.  The outer batch
(ironically, the morsel) is the largest and is the one used for
determining parallelism.  The inner batch should be smaller (size
based on L2).

To solve the second problem a number of solutions have been proposed
(thread-local buffer pools, thread-local buffer stacks, etc.) and we
will hopefully adopt one at some point.  So the above code snippet
would hopefully become something like:

```
thread_local auto lt_one_hundred = allocate_array(l2_sized_batch_size, bool)
thread_local auto gt_zero = allocate_array(l2_sized_batch_size, bool)
thread_local auto filter_pred = allocate_array(l2_sized_batch_size, bool)
for batch_of_128k_rows in data:
for l2_sized_batch in batch_of_128k_rows:
less_than(l2_sized_batch, 100, _one_hundred)
greater_than(l2_sized_batch, 0, _zero)
and(lt_one_hundred, gt_zero, _pred)
consume(filter(l2_sized_batch, filter_pred))
```

There is still a fair amount of work to do before we get here but I
hope this gives you some idea of the direction we are headed.

[1] https://db.in.tum.de/~leis/papers/morsels.pdf

On Tue, May 24, 2022 at 6:27 AM Shawn Yang  wrote:
>
> Hi Ion, thank you for your reply which recaps  the history of arrow compute. 
> Those links are very valuable for me to understand arrow compute internal. I 
> took a quick for those documents and will take a deeper into those later. I 
> have another question, does arrow compute supports loop fusion, which execute 
> multiple vectorized operand in one loop? This is very common in dataframe 
> comouting, our engine can extract those expressions into a dag/tree. If arrow 
> computer support loop fusion,the performance would be very promising
>
> Sent from my iPhone
>
> > On May 24, 2022, at 4:49 AM, Ian Cook  wrote:
> >
> > Hi Shawn,
> >
> > In March of 2021, when major work on the C++ query execution machinery
> > in Arrow was beginning, Wes sent a message [1] to the dev list and
> > linked to a doc [2] with some details about the planned design. A few
> > months later Neal sent an update [3] about this work. However those
> > documents are now somewhat out of date. More recently, Wes shared
> > another update [4] and linked to a doc [5] regarding task execution /
> > control flow / scheduling. However I think the best source of
> > information is the doc you linked to. The query execution work has
> > proceeded organically with many contributors, and efforts to document
> > the overall design in sufficient detail have not kept pace.
> >
> > Regarding benchmarks: There has been extensive work done using
> > Conbench [6] as part of the Arrow CI infrastructure to benchmark
> > commits, for purposes of avoiding / identifying performance
> > regressions and measuring efforts to improve performance. However I am
> > not aware of any efforts to produce and publicly share benchmarks for
> > the purpose of comparing performance vs. other query engines.
> >
> > There is a proposal [7] to give the name "Acero" to the Arrow C++
> > compute engine, so in the future you will likely see it referred to by
> > that name. I think that having a clearer name for this will motivate
> > more efforts to write and share more about it.
> >
> > Ian
> >
> > [1] https://lists.apache.org/thread/n632pmjnb85o49lyxy45f7sgh4cshoc0
> > [2] 
> > https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/
> > [3] https://lists.apache.org/thread/3pmb592zmonz86nmmbjcw08j5tcrfzm1
> > [4] https://lists.apache.org/thread/ltllzpt1r2ch06mv1ngfgdl7wv2tm8xc
> > [5] 
> > https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/
> > [6] https://conbench.ursa.dev
> > [7] 

Re: Can arrow::dataset::Scanner¶ skip a certain number of rows?

2022-05-18 Thread Weston Pace
We do not have the option to do this today.  However, it is something
we could do a better job of as long as we aren't reading CSV.

Aldrin's workaround is pretty solid, especially if you are reading
parquet and have a row_index column.  Parquet statistics filtering
should ensure we are only reading the needed row groups.

We will need to implement something similar for [1] and it seems we
should have a general JIRA for "paging support (start_index & count)"
for datasets but I couldn't find one with a quick search.

[1] https://issues.apache.org/jira/browse/ARROW-15589

On Tue, May 17, 2022 at 10:09 AM Aldrin  wrote:
>
> I think batches are all or nothing as far as reading/deserializing. However, 
> you can manage a slice of that batch instead of the whole batch in the  the batch> portion. That is, if you have 2 batches with 10 rows each, and you 
> want to skip rows [10, 15) (0-indexed, inclusive of 10, exclusive of 15), 
> then you can track the first batch in a vector (or handle directly), then in 
> the 2nd batch you can use `Slice(5)` [1] to track rows [15, 20).
>
> Some other approaches might include using the `Take` compute function on a 
> "super" table or on the particular batch [2], or putting a "row index" column 
> in your data and using that as a filter, e.g.:
>
> ```
> #include 
> #include 
> #include 
>
> // for arrow expressions
> using arrow::compute::greater_equal;
> using arrow::compute::and_;
> using arrow::compute::literal;
> using arrow::compute::field_ref;
>
> // exclude rows [10, 15) (include 10, exclude 15, 0-indexed)
> Expression filter_rowstokeep = and_({
>  less (field_ref(FieldRef("row_index")), literal(10))
> ,greater_equal(field_ref(FieldRef("row_index")), literal(15))
> })
>
> // construct scanner builder as usual
> ...
> scanner_builder->project()
>
> // bind filter to scanner builder
> scanner_builder->Filter(filter_rowstokeep)
>
> // finish and execute as usual
> scanner = scanner_builder->finish()
> ...
> ```
>
> The above code sample is adapted and simplified from what I do in [3], which 
> you can refer to if you'd like.
>
> Finally, you can also construct a new table with the row_index column and 
> then filter that instead, which I think could be fairly efficient, but I 
> haven't played with the API enough to know the most efficient way. I also 
> suspect it might be slightly annoying with the existing interface. Either:
> (a) dataset -> table -> table with extra column -> dataset -> scanner builder 
> with filter as above -> scanner -> fiinish
> (b) table -> table with extra column -> dataset -> scanner builder with 
> filter as above -> scanner -> finish
>
> The difference between (a) and (b) above being how you initially read the 
> data from S3 into memory (either as a dataset, leveraging the dataset 
> framework, or as tables, probably managing the reads a bit more manually).
>
>
> < references -->
>
> [1]: 
> https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4NK5arrow11RecordBatch5SliceE7int64_t
> [2]: https://arrow.apache.org/docs/cpp/compute.html#selections
> [3]: 
> https://github.com/drin/cookbooks/blob/mainline/arrow/projection/project_from_dataset.cpp#L137
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Tue, May 17, 2022 at 9:33 AM 1057445597 <1057445...@qq.com> wrote:
>>
>> Can arrow skip a certain number of lines when reading data? I want to do 
>> distributed training, read data through arrow, my code is as follows
>>
>> dataset = getDatasetFromS3()
>> scanner_builder = dataset->NewScan()
>> scanner_builder->project()
>> scanner = scanner_builder->finish()
>> batch_reader = scanner->ToBatchReader()
>> current_batch_ = batch_reader->ReadNext()
>> deal the batch 。。。
>>
>> Can I skip a certain number of lines before calling ReadNext()? Or is there 
>> a skip() interface or an offset() interface?
>>
>>


Re: Using the js lib with AWS Athena

2022-05-06 Thread Weston Pace
Can you serialize the schema by creating an IPC file with zero record
batches?  I apologize, but I do not know the JS API as well.  Maybe
you can create a table from just a schema (or a schema and a set of
empty arrays) and then turn that into an IPC file?  This shouldn't add
too much overhead.

On Thu, May 5, 2022 at 8:23 AM Howard Engelhart
 wrote:
>
> I'm looking to implement an Athena federated query custom connector using the 
> arrow js lib.  I'm getting stuck on figuring out how to encode a Schema 
> properly for the Athena GetTableResponse.  I have found an example using 
> python that does something like this.. (paraphrasing...)
>
> import pyarrow as pa
> .
>return {
> "@type": "GetTableResponse",
> "catalogName": self.catalogName,
> "tableName": {'schemaName': self.databaseName, 'tableName': 
> self.tableName},
> "schema": {"schema": 
> base64.b64encode(pa.schema(args...).serialize().slice(4)).decode("utf-8")},
> "partitionColumns": self.partitions,
> "requestType": self.request_type
> }
> What i'm looking for is the js equivalent of
> pa.schema(args...).serialize()
>
> Is there one?  If not, could someone point me in the right direction of how 
> to code up something similar?


Re: Compute expression using pc.call_function not working as expected

2022-04-21 Thread Weston Pace
Awesome.  I've created ARROW-16275[1] to track this.

Also, I discovered that, starting with 8.0.0, we have support for
expressing nested references in python so you can write:

dataset.to_table(filter=ds.field("values", "one") < 200)

[1] https://issues.apache.org/jira/browse/ARROW-16275

On Thu, Apr 21, 2022 at 6:44 AM Micah Kornfield  wrote:
>>
>> If parquet stores statistics for each column of a struct array (don't know 
>> offhand if they do) then we should create a JIRA to expose this.
>
>
> It does store statistics per-leaf column.
>
> On Wed, Apr 20, 2022 at 3:34 PM Weston Pace  wrote:
>>
>> No and no.  This filter will not be used for predicate pushdown now or in 
>> 8.0.0.  It could possibly come after 8.0.0.  If parquet stores statistics 
>> for each column of a struct array (don't know offhand if they do) then we 
>> should create a JIRA to expose this.
>>
>> On Wed, Apr 20, 2022, 11:01 AM Partha Dutta  wrote:
>>>
>>> That works! Thanks. Do you know off hand if this filter would be used in a 
>>> predicate pushdown for a parquet dataset? Or would it be possibly coming in 
>>> version 8.0.0?
>>>
>>> On Wed, Apr 20, 2022 at 3:49 PM Weston Pace  wrote:
>>>>
>>>> The second argument to `call_function` should be a list (the args to
>>>> the function).  Since `arr3` is iterable it is interpreting it as a
>>>> list of args and trying to treat each row as an argument to your call
>>>> (this is the reason it thinks you have 3 arguments).  This should
>>>> work:
>>>>
>>>> pc.call_function("struct_field", [arr3], 
>>>> pc.StructFieldOptions(indices=[0]))
>>>>
>>>> Unfortunately, that evaluates the function immediately.  If you want
>>>> to create an expression then you need some way to create a call and I
>>>> don't actually know how to do that.  I can do something a little
>>>> hackish:
>>>>
>>>> table = pa.Table.from_pydict({'values': arr3})
>>>> dataset = ds.dataset(table)
>>>> sf_call = ds.field('')._call('struct_field', [ds.field('values')],
>>>> pc.StructFieldOptions(indices=[0]))
>>>> dataset.to_table(filter=sf_call < 200)
>>>>
>>>> However, I suspect there is probably a better way to create a call
>>>> object than `ds.field('')._call(...)`
>>>>
>>>> On Wed, Apr 20, 2022 at 3:09 AM Partha Dutta  
>>>> wrote:
>>>> >
>>>> > I'm trying to use the compute function struct_field in order to create 
>>>> > an expression for dataset filtering. But running into an error. This is 
>>>> > the code snippet:
>>>> >
>>>> > arr1 = pa.array([100, 200, 300])
>>>> > arr2 = pa.array([400, 500, 600])
>>>> > arr3 = pa.StructArray.from_arrays([arr1, arr2], ["one", "two"])
>>>> > e = pc.call_function("struct_field", arr3, 
>>>> > pc.StructFieldOptions(indices=[0])) > 200
>>>> > Traceback (most recent call last):
>>>> >   File "", line 1, in 
>>>> >   File "pyarrow/_compute.pyx", line 531, in 
>>>> > pyarrow._compute.call_function
>>>> >   File "pyarrow/_compute.pyx", line 330, in 
>>>> > pyarrow._compute.Function.call
>>>> >   File "pyarrow/error.pxi", line 143, in 
>>>> > pyarrow.lib.pyarrow_internal_check_status
>>>> >   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
>>>> > pyarrow.lib.ArrowInvalid: Function 'struct_field' accepts 1 arguments 
>>>> > but attempted to look up kernel(s) with 3
>>>> >
>>>> > If I try to exclude the options, I get
>>>> > pyarrow.lib.ArrowInvalid: Function 'struct_field' cannot be called 
>>>> > without options
>>>> >
>>>> > Any advice? I am using pyarrow 7.0.0
>>>> > --
>>>> > Partha Dutta
>>>> > partha.du...@gmail.com
>>>
>>>
>>>
>>> --
>>> Partha Dutta
>>> partha.du...@gmail.com


Re: Questions on Dictionary Array types.

2022-04-20 Thread Weston Pace
> However I cannot figure out any easy way to get the mapping
> used to create the dictionary array (vals) easily from the table. Can
> you please let me know the easiest way?

A dictionary is going to be associated with an array and not a table.
So you first need to get the array from the table.  Tables are made of
columns and each column is made of chunks and each chunk is an array.
Each chunk could have a different mapping, so that is something you
may need to deal with at some point depending on your goal.

The table you are creating in your example has one column and that
column has one chunk so we can get to the mapping with:

tab.column(0).chunks[0].dictionary

And we can get to the indices with:

tab.column(0).chunks[0].indices

> Also since this is effectively a string array which is dictionary
> encoded, is there any way to use string compute kernels like
> starts_with etc. Right now I am aware of two methods and they are not
> straightforward.

Regrettably, I don't think we have kernels in place for string
functions on dictionary arrays.  At least, that is my reading of [1].
So the two workarounds you have are may be the best there is at the
moment.

[1] https://issues.apache.org/jira/browse/ARROW-14068

On Wed, Apr 20, 2022 at 10:00 AM Suresh V  wrote:
>
> Hi .. I created a pyarrow table from a dictionary array as shown
> below. However I cannot figure out any easy way to get the mapping
> used to create the dictionary array (vals) easily from the table. Can
> you please let me know the easiest way? Other than the ones which
> involve pyarrow.compute/conversion to pandas as they are expensive
> operations for large datasets.
>
> import pyarrow as pa
> import pyarrow.compute as pc
> import numpy as np
>
> vals = ['aa', 'ab', 'ac', 'ba', 'bb', 'bc']
> int_vals = [3, 4, 3, 0, 2, 0, 1, 5, 0, 0]
> x = pa.DictionaryArray.from_arrays(pa.array(int_vals), vals)
> tab = pa.Table.from_arrays([x], names=['x'])
>
> Also since this is effectively a string array which is dictionary
> encoded, is there any way to use string compute kernels like
> starts_with etc. Right now I am aware of two methods and they are not
> straightforward.
>
> approach 1:
> Cast to string and then run string kernel
> expr = pc.starts_with(pc.field("x").cast(pa.string()), "a")
> ds.Scanner.from_batches(tab.to_batches(), schema=tab.schema,
> columns={'x': pc.field('x')}, filter=expr).to_table()
>
> approach 2:
> filter using the corresponding indices assuming we have access to the 
> dictionary
> filter_ = np.where(pc.starts_with(x.dictionary, "a"))[0]
> pc.is_in(x.indices, filter_)
>
> Approach 2 is better/faster .. but I am not able to figure out how to
> get the dictionary/indices assuming we start from a table read from
> parquet/feather.
>
> Thanks


Re: Compute expression using pc.call_function not working as expected

2022-04-20 Thread Weston Pace
No and no.  This filter will not be used for predicate pushdown now or in
8.0.0.  It could possibly come after 8.0.0.  If parquet stores statistics
for each column of a struct array (don't know offhand if they do) then we
should create a JIRA to expose this.

On Wed, Apr 20, 2022, 11:01 AM Partha Dutta  wrote:

> That works! Thanks. Do you know off hand if this filter would be used in a
> predicate pushdown for a parquet dataset? Or would it be possibly coming in
> version 8.0.0?
>
> On Wed, Apr 20, 2022 at 3:49 PM Weston Pace  wrote:
>
>> The second argument to `call_function` should be a list (the args to
>> the function).  Since `arr3` is iterable it is interpreting it as a
>> list of args and trying to treat each row as an argument to your call
>> (this is the reason it thinks you have 3 arguments).  This should
>> work:
>>
>> pc.call_function("struct_field", [arr3],
>> pc.StructFieldOptions(indices=[0]))
>>
>> Unfortunately, that evaluates the function immediately.  If you want
>> to create an expression then you need some way to create a call and I
>> don't actually know how to do that.  I can do something a little
>> hackish:
>>
>> table = pa.Table.from_pydict({'values': arr3})
>> dataset = ds.dataset(table)
>> sf_call = ds.field('')._call('struct_field', [ds.field('values')],
>> pc.StructFieldOptions(indices=[0]))
>> dataset.to_table(filter=sf_call < 200)
>>
>> However, I suspect there is probably a better way to create a call
>> object than `ds.field('')._call(...)`
>>
>> On Wed, Apr 20, 2022 at 3:09 AM Partha Dutta 
>> wrote:
>> >
>> > I'm trying to use the compute function struct_field in order to create
>> an expression for dataset filtering. But running into an error. This is the
>> code snippet:
>> >
>> > arr1 = pa.array([100, 200, 300])
>> > arr2 = pa.array([400, 500, 600])
>> > arr3 = pa.StructArray.from_arrays([arr1, arr2], ["one", "two"])
>> > e = pc.call_function("struct_field", arr3,
>> pc.StructFieldOptions(indices=[0])) > 200
>> > Traceback (most recent call last):
>> >   File "", line 1, in 
>> >   File "pyarrow/_compute.pyx", line 531, in
>> pyarrow._compute.call_function
>> >   File "pyarrow/_compute.pyx", line 330, in
>> pyarrow._compute.Function.call
>> >   File "pyarrow/error.pxi", line 143, in
>> pyarrow.lib.pyarrow_internal_check_status
>> >   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
>> > pyarrow.lib.ArrowInvalid: Function 'struct_field' accepts 1 arguments
>> but attempted to look up kernel(s) with 3
>> >
>> > If I try to exclude the options, I get
>> > pyarrow.lib.ArrowInvalid: Function 'struct_field' cannot be called
>> without options
>> >
>> > Any advice? I am using pyarrow 7.0.0
>> > --
>> > Partha Dutta
>> > partha.du...@gmail.com
>>
>
>
> --
> Partha Dutta
> partha.du...@gmail.com
>


Re: Compute expression using pc.call_function not working as expected

2022-04-20 Thread Weston Pace
The second argument to `call_function` should be a list (the args to
the function).  Since `arr3` is iterable it is interpreting it as a
list of args and trying to treat each row as an argument to your call
(this is the reason it thinks you have 3 arguments).  This should
work:

pc.call_function("struct_field", [arr3], pc.StructFieldOptions(indices=[0]))

Unfortunately, that evaluates the function immediately.  If you want
to create an expression then you need some way to create a call and I
don't actually know how to do that.  I can do something a little
hackish:

table = pa.Table.from_pydict({'values': arr3})
dataset = ds.dataset(table)
sf_call = ds.field('')._call('struct_field', [ds.field('values')],
pc.StructFieldOptions(indices=[0]))
dataset.to_table(filter=sf_call < 200)

However, I suspect there is probably a better way to create a call
object than `ds.field('')._call(...)`

On Wed, Apr 20, 2022 at 3:09 AM Partha Dutta  wrote:
>
> I'm trying to use the compute function struct_field in order to create an 
> expression for dataset filtering. But running into an error. This is the code 
> snippet:
>
> arr1 = pa.array([100, 200, 300])
> arr2 = pa.array([400, 500, 600])
> arr3 = pa.StructArray.from_arrays([arr1, arr2], ["one", "two"])
> e = pc.call_function("struct_field", arr3, 
> pc.StructFieldOptions(indices=[0])) > 200
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "pyarrow/_compute.pyx", line 531, in pyarrow._compute.call_function
>   File "pyarrow/_compute.pyx", line 330, in pyarrow._compute.Function.call
>   File "pyarrow/error.pxi", line 143, in 
> pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
> pyarrow.lib.ArrowInvalid: Function 'struct_field' accepts 1 arguments but 
> attempted to look up kernel(s) with 3
>
> If I try to exclude the options, I get
> pyarrow.lib.ArrowInvalid: Function 'struct_field' cannot be called without 
> options
>
> Any advice? I am using pyarrow 7.0.0
> --
> Partha Dutta
> partha.du...@gmail.com


Re: [Parquet][C++, Python]Parallelism of reading Parquet

2022-04-18 Thread Weston Pace
> Thanks. A followup question on pre buffering. When the caching layer
> caches all the ranges, will they all issue requests to S3
> simultaneously to saturate S3 bandwidth? Or there is also a max of
> parallelism downloading or pipelining technique?

All requests should be issued simultaneously to the I/O thread pool.
The I/O thread pool has a max parallelism (defaults to 8).  This is
configurable.  More info at [1].

> This question may be a little dev detail. Will PreBuffer(aka Cache in
> the caching layer) be effective if only there is WhenBuffered(aka
> WaitFor in caching layer) being called afterwards? I notice that in
> old dataset implementation, only PreBuffer is called but no one wait
> for it.

This logic is governed by ReadRangeCache in src/arrow/io/caching.h.
What follows is my (potentially incorrect) understanding.  There are
two versions, a lazy and an eager version.

The eager version will start the reads as soon as the ranges are
requested.  This results in faster reads and more likely parallel
reads but ranges can only be coalesced within a call to Cache (this
function takes in a vector of ranges).  In this case there is no need
to call Wait/WaitFor to get parallelism.

The lazy version will only start the read when it receives a call to
Read/Wait/WaitFor.  This version allows for ranges to be coalesced
across different calls to Cache.  If a user calls Read without calling
Wait/WaitFor then they will get coalesced ranges but they will not get
parallel reads (unless multiple reads are required to satisfy the
single call to Read because it is larger than the max allowed range
that defaults to 64MiB).

[1] https://arrow.apache.org/docs/dev/cpp/threading.html#thread-pools

On Wed, Apr 13, 2022 at 7:58 PM Xinyu Zeng  wrote:
>
> This question may be a little dev detail. Will PreBuffer(aka Cache in
> the caching layer) be effective if only there is WhenBuffered(aka
> WaitFor in caching layer) being called afterwards? I notice that in
> old dataset implementation, only PreBuffer is called but no one wait
> for it.
>
> On Thu, Apr 14, 2022 at 10:16 AM Xinyu Zeng  wrote:
> >
> > Thanks. A followup question on pre buffering. When the caching layer
> > caches all the ranges, will they all issue requests to S3
> > simultaneously to saturate S3 bandwidth? Or there is also a max of
> > parallelism downloading or pipelining technique?
> >
> > On Thu, Apr 14, 2022 at 4:51 AM Weston Pace  wrote:
> > >
> > > Yes, that matches my understanding as well.  I think, when
> > > pre-buffering is enabled, you might get parallel reads even if you are
> > > using ParquetFile/read_table but I would have to check.  I agree that
> > > it would be a good idea to add some documentation to all the readers
> > > going over our parallelism at a high level.  I created [1] and will
> > > try to update this when I get a chance.
> > >
> > > > I was also wondering how pre_buffer works. Will coalescing ColumnChunk
> > > > ranges hurt parallelism? Or you can still parallelly read a huge range
> > > > after coalescing? To me, coalescing and parallel reading seem like a
> > > > tradeoff on S3?
> > >
> > > It's possible but I think there is a rather small range of files/reads
> > > that would be affected by this.  The coalescing will only close holes
> > > smaller than 8KiB and will only coalesce up to 64MiB.  Generally files
> > > are either larger than 64MiB or there are many files (in which case
> > > the I/O from a single file doesn't really need to be parallel).
> > > Furthermore, if we are not reading all of the columns then the gaps
> > > between columns are larger than 8KiB.
> > >
> > > We did benchmark pre buffering on S3 and, if I remember correctly, the
> > > pre buffering option had a very beneficial impact when running in S3.
> > > AWS recommends reads in the 8MB/16MB range and without pre-buffering I
> > > think our reads are too small to be effective.
> > >
> > > [1] https://issues.apache.org/jira/browse/ARROW-16194
> > >
> > > On Wed, Apr 13, 2022 at 3:16 AM Xinyu Zeng  wrote:
> > > >
> > > > I want to make sure a few of my understanding is correct in this
> > > > thread. There are two ways to read a parquet file in C++, either
> > > > through ParquetFile/read_table, or through ParquetDataset. For the
> > > > former, the parallelism is per column because read_table simply passes
> > > > all row groups indices to DecodeRowGroups in reader.cc, and there is
> > > > no row group level parallelism. For the latter, the parallelism is per
> > > > column and per ro

Re: [C++] Null indices and byte lengths of string columns

2022-04-18 Thread Weston Pace
>From a pure metadata-only perspective you should be able to get the
size of the column and possibly a null count (for parquet files where
statistics are stored).  However, you will not be able to get the
indices of the nulls.

The null count and column size are going to come from the parquet
metadata and you will need to use the parquet APIs to get this
information.  In pyarrow this would be:

```
>>> pq.ParquetFile('/tmp/foo.parquet').metadata.row_group(0).column(0).statistics.null_count
1
>>> pq.ParquetFile('/tmp/foo.parquet').metadata.row_group(0).column(0).total_compressed_size
122
>>> pq.ParquetFile('/tmp/foo.parquet').metadata.row_group(0).column(0).total_uncompressed_size
119
```

In the C++ API you will want to look at `parquet::ParquetFileReader::metadata`

On Mon, Apr 18, 2022 at 6:18 AM McDonald, Ben  wrote:
>
> It seems that these options require reading into `ArrayData`. I have been 
> using `ReadBatch` to read directly into a malloced C buffer to avoid having 
> to create the additional copy, which is why I was hoping there would be a way 
> to get this from the file metadata or some operation on the file rather than 
> from the data that has already been read into an Arrow data structure.
>
>
>
> So, the only way that I could do this today would be to read into an 
> `ArrayData` and then call an `arrow::compute` function? There is no way to 
> get the info from the file?
>
>
>
> Best,
>
> Ben McDonald
>
>
>
> From: Niranda Perera 
> Date: Friday, April 15, 2022 at 5:43 PM
> To: user@arrow.apache.org 
> Subject: Re: [C++] Null indices and byte lengths of string columns
>
> Hi Ben,
>
>
>
> I believe you could use arrow::compute for this.
>
>
>
> On Fri, Apr 15, 2022 at 6:28 PM McDonald, Ben  wrote:
>
> Hello,
>
>
>
> I have been writing some code to read Parquet files and it would be useful if 
> there was an easy way to get the number of bytes in a string column as well 
> as the null indices of that column. I would have expected this to be 
> available in metadata somewhere, but I have not seen any way to query that 
> from the API and don’t see anything like this using `parquet-tools` to 
> inspect the files.
>
>
>
> Is there any way to get the null indices of a Parquet string column besides 
> reading the whole file and manually checking for nulls?
>
> There is an internal method for this [1]. But unfortunately I don't this is 
> exposed to the outside. One possible solution is, calling compute::is_null 
> and pass the result to compute::indices_nonzero.
>
>
>
>
>
> Is there any way to get the byte lengths of string columns without reading 
> each string and summing the number of bytes of each string?
>
> Do you want the non-null byte length?
>
> If not, you can simply take the offsets int64 buffer from ArrayData and take 
> the last value. That would be the full bytesize of the string array.
>
> If yes, I believe you can achieve this by using VisitArrayDataInline/ 
> VisitNullBitmapInline methods [2].
>
>
>
>
>
> Thank you.
>
>
>
> Best,
>
> Ben McDonald
>
>
>
> [1] 
> https://github.com/apache/arrow/blob/d36b2b3392ed78b294b565c3bd3f32eb6675b23a/cpp/src/arrow/compute/api_vector.h#L226
>
> [2] 
> https://github.com/apache/arrow/blob/d36b2b3392ed78b294b565c3bd3f32eb6675b23a/cpp/src/arrow/visit_data_inline.h#L224
>
>
> --
>
> Niranda Perera
> https://niranda.dev/
>
> @n1r44
>
>


Re: [Parquet][C++, Python]Parallelism of reading Parquet

2022-04-13 Thread Weston Pace
Yes, that matches my understanding as well.  I think, when
pre-buffering is enabled, you might get parallel reads even if you are
using ParquetFile/read_table but I would have to check.  I agree that
it would be a good idea to add some documentation to all the readers
going over our parallelism at a high level.  I created [1] and will
try to update this when I get a chance.

> I was also wondering how pre_buffer works. Will coalescing ColumnChunk
> ranges hurt parallelism? Or you can still parallelly read a huge range
> after coalescing? To me, coalescing and parallel reading seem like a
> tradeoff on S3?

It's possible but I think there is a rather small range of files/reads
that would be affected by this.  The coalescing will only close holes
smaller than 8KiB and will only coalesce up to 64MiB.  Generally files
are either larger than 64MiB or there are many files (in which case
the I/O from a single file doesn't really need to be parallel).
Furthermore, if we are not reading all of the columns then the gaps
between columns are larger than 8KiB.

We did benchmark pre buffering on S3 and, if I remember correctly, the
pre buffering option had a very beneficial impact when running in S3.
AWS recommends reads in the 8MB/16MB range and without pre-buffering I
think our reads are too small to be effective.

[1] https://issues.apache.org/jira/browse/ARROW-16194

On Wed, Apr 13, 2022 at 3:16 AM Xinyu Zeng  wrote:
>
> I want to make sure a few of my understanding is correct in this
> thread. There are two ways to read a parquet file in C++, either
> through ParquetFile/read_table, or through ParquetDataset. For the
> former, the parallelism is per column because read_table simply passes
> all row groups indices to DecodeRowGroups in reader.cc, and there is
> no row group level parallelism. For the latter, the parallelism is per
> column and per row group, which is a ColumnChunk, according to
> RowGroupGenerator in file_parquet.cc. The difference between the
> former and the latter is also differentiated by use_legacy_dataset in
> Python. If my understanding is correct, I think this difference may be
> better explained in doc to avoid confusion. I have to crush the code
> to understand.
>
> I was also wondering how pre_buffer works. Will coalescing ColumnChunk
> ranges hurt parallelism? Or you can still parallelly read a huge range
> after coalescing? To me, coalescing and parallel reading seem like a
> tradeoff on S3?
>
> Thanks in advance


Re: Nested PyArrow scalar

2022-03-31 Thread Weston Pace
I think that would be a reasonable JIRA request.  In addition I think
it would also make sense for the `pa.array` function to recognize
scalars:

>>> pa.array([1])

[
  1
]
>>> pa.array([pa.scalar(1)])
# --ERR--

On Wed, Mar 30, 2022 at 7:01 PM Wenlei Xie  wrote:
>
> Hi,
>
> When play around PyArrow scalar, I found it seems to expect the input as a 
> "pure Python object", e.g. it cannot be a Python list of arrow scalar (such 
> as `[ pa.scalar(1) ]`:
>
> ```
> >>> import pyarrow as pa
> >>> pa.__version__
> '7.0.0'
>
> >>> pa.scalar([1])
> 
>
> >>> pa.scalar([pa.scalar(1)])
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "pyarrow/scalar.pxi", line 1040, in pyarrow.lib.scalar
>   File "pyarrow/error.pxi", line 143, in 
> pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
> pyarrow.lib.ArrowInvalid: Could not convert  with 
> type pyarrow.lib.Int64Scalar: did not recognize Python value type when 
> inferring an Arrow data type
> ```
> I am wondering if it's going to be supported in the future, or there is any 
> consideration not support that?
>
> Thanks!
>
> --
> Best Regards,
> Wenlei Xie
>
> Email: wenlei@gmail.com


Re: Evaluate expressions on a pyarrow table in-memory

2022-03-30 Thread Weston Pace
Yes and no :)  Disclaimer: this answer is a little out of my
wheelhouse as I've learned relational algebra through doing and so my
formal theory may be off.  Anyone is welcome to correct me.  Also,
this answer turned into a bit of ramble and is a bit scattershot.  You
may already be very familiar with some of these concepts.  I'm not
trying to be patronizing but I do tend to ramble.

TL;DR: Pyarrow has some "relational algebra interfaces" today but not
a lot of "dataframe interfaces".  What you're asking for is a little
bit more of a "dataframe" type question and you will probably need to
go beyond pyarrow to get exactly what you are asking for.  That being
said, pyarrow has a lot of the primitives that can solve parts and
pieces of your problem.

In relational algebra there is a special class of functions called
"scalar functions".  These are functions that create a single value
for each row.  For example, "less than", "addition", and "to upper
case".  A scalar expression is then an expression that consists only
of scalar functions.  Projections in dataset scanners can only contain
scalar expressions.  In pyarrow you can scan an in-memory table and
apply a scalar expression:

```
import pyarrow as pa
import pyarrow.dataset as ds

arr = pa.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
tab = pa.Table.from_arrays([arr], names=["x"])
expr = (ds.field("x") > 5) & (ds.field("x") < 8)
# This is a little bit unwieldy, we could maybe investigate a better
API for this kind of thing
ds.Scanner.from_batches(tab.to_batches(), schema=tab.schema,
columns={'y': expr3}).to_table()
# pyarrow.Table
# y: bool
# 
# y: [[false,false,false,false,false,true,true,false,false,false]]
```

However, the functions that you are listing (sum, avg) are not scalar
functions.  They do, however, fit a special class of functions known
as "aggregates" (functions that consume values from multiple rows to
create a single output).  Aggregates in relational algebra are used in
a "group by" node.  In pyarrow 7 we added the ability to run group_by
functions very easily on in-memory tables but it looks like it
requires at least one key column so that isn't going to help us here.

Both of these features are powered by the pyarrow compute functions.
These are slightly lower level compute primitives.  We can use these
here to get some of the values you want:

```
# Continuing from above example
pc.sum(tab.column("x"))
# 
pc.mean(tab.column("x"))
# 
```

But...while this does give you the capability to run functions, this
doesn't give you the capability to run "expressions".

Running expressions that contain both aggregate and scalar functions
is a little trickier than it may seem.  This is often done by creating
a relational algebra query from the expression.  For example, consider
the expression `sum(a * b)/sum(a)`.

We can create the query (this is totally shorthand pseudocode, the
Substrait text format isn't ready yet)

SCAN_TABLE(table) ->
  PROJECT({"a*b": multiply(field("a"), field("b"))}) ->
  GROUP_BY(keys=[], aggregates=[("a*b", "sum"), ("a", "sum")]) ->
  PROJECT({"sum(a*b)/sum(a)": divide(field("sum(a*b)"),field("sum(a)"))})

So if you wanted to create such a query plan then you could express it
in Substrait, which is a spec for expression query plans, and use our
"very new and not quite ready yet" Substrait consumer API to process
that query plan.  So if your goal is purely "how do I express a series
of compute operations as a string" then I will point out that SQL is a
very standard answer for that question and the Substrait text format
will be a new way to answer that question.

---

Ok, now everything I've written has been exclusive to pyarrow.  If I
step back and look at your original question "how can I evaluate a
dataframe-like expression against an Arrow table" I think the answer
is going to lie outside of pyarrow (some cool functions like table
group by are being added but I'm not aware of any developers whose
goal is to make a full fledged dataframe API inside of pyarrow).
Fortunately, the whole point of Arrow is to make it super easy for
libraries to add new functionality to your data.  There may be some
library out there that does this already, there are libraries out
there that solve similar problems like "how do I run this SQL query
against Arrow data", and there are probably libraries in development
to tackle this.  Unfortunately, I'm not knowledgeable nor qualified
enough to speak on any of these.

On Wed, Mar 30, 2022 at 1:16 AM Suresh V  wrote:
>
> Hi,
>
> Is there a way to evaluate mathematical expressions against columns of a 
> pyarrow table which is in memory already similar to how projections work for 
> dataset scanners?
>
> The goal is to have specify a bunch of strings like sum(a * b)/sum(a), or 
> avg(a[:10]) etc. Convert these into expressions and run against the table.
>
> Thanks
>
>


Re: Documentation of concurrency of the compute API?

2022-03-23 Thread Weston Pace
y table" is the time of just running the function 
>>>>>>>> on
>>>>>>>> the table concatenated from each slice."
>>>>>>>> So, I assume you are originally using a `vector>
>>>>>>>> slices`. For the former case, you are passing each slice to
>>>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling
>>>>>>>> arrow::Concatenate(slices) and passing the result as a single table?
>>>>>>>>
>>>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin  wrote:
>>>>>>>>
>>>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide,
>>>>>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being
>>>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the 
>>>>>>>>> other
>>>>>>>>> being a scalar.
>>>>>>>>>
>>>>>>>>> Aldrin Montana
>>>>>>>>> Computer Science PhD Student
>>>>>>>>> UC Santa Cruz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin  wrote:
>>>>>>>>>
>>>>>>>>>> Hi Niranda!
>>>>>>>>>>
>>>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the
>>>>>>>>>> function being called, and [2] is an example of a wrapper function 
>>>>>>>>>> (more in
>>>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more 
>>>>>>>>>> readable). But,
>>>>>>>>>> now that I look at [2] again, which I wrote before I really knew 
>>>>>>>>>> much about
>>>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead 
>>>>>>>>>> from
>>>>>>>>>> misusing C++ structures?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96
>>>>>>>>>> [2]:
>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18
>>>>>>>>>>
>>>>>>>>>> Aldrin Montana
>>>>>>>>>> Computer Science PhD Student
>>>>>>>>>> UC Santa Cruz
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera <
>>>>>>>>>> niranda.per...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Aldrin,
>>>>>>>>>>>
>>>>>>>>>>> It would be helpful to know what sort of compute operators you
>>>>>>>>>>> are using.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin  wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I will work on a reproducible example.
>>>>>>>>>>>>
>>>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in
>>>>>>>>>>>> gmail, see [1] for markdown version):
>>>>>>>>>>>>
>>>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms)
>>>>>>>>>>>> total; by slice Time (ms)
>>>>>>>>>>>> total; by table
>>>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410
>>>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953
>>>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088
>>>>>>>>>>>>
>>>>>>>>>>>> Where "b

Re: pyarrow segfault on exit due to aws-sdk-cpp bug (which is fixed in later versions)

2022-03-15 Thread Weston Pace
Procedurally, you should create a JIRA ticket with your request and
once the fix is complete and merged into master you can request a
release on the mailing list.  For an example, see [1].

> Is there a way to request that pyarrow/arrow gets built with the fixed 
> version of the AWS SDK for the builds published to pypi in the next week or 
> two or is this unrealistic?

It is possible that the JIRA is addressed in the next week or two,
especially if you are able to provide a pull request and help address
feedback on the PR and work to get it merged.  Note that we actually
target version 1.8.133 specifically to avoid a crash on shutdown bug
in AWS.  More details in [2].  Are you certain that you are installing
from pip?

It is probably unrealistic to expect that a release with this fix will
happen in the next week or two.  Releasing the python libraries
requires a considerable amount of manual work and we haven't generally
done this to rush fixes (typically patch releases have been to address
significant regressions or blocking issues).  The next major release
of the python/C++/R/java implementations will be sometime around April
and we are getting close enough to that date that I don't think there
would be much enthusiasm to undertake a patch release.

> If so, is the next best option for us to git clone the source and build with 
> the fixed version of the AWS SDK?

That is one option.  If your fix is merged then another option could
be to use the nightly wheels.  This process is described in more
detail at [3].

[1] https://lists.apache.org/thread/4dyhqjcdf066bz6fw2w2l86yfbh4h452
[2] https://issues.apache.org/jira/browse/ARROW-15141
[3] 
https://arrow.apache.org/docs/python/install.html#installing-nightly-packages

On Tue, Mar 15, 2022 at 7:07 AM Joseph Smith  wrote:
>
> We run our tests with some flags on linux that help discovery illegal memory 
> access (e.g. MALLOC_PERTURB_=90). Any test that imports pyarrow is 
> segfaulting because of an issue discovered and fixed in the AWS SDK. It is 
> fixed as of 1.9.214 but arrow is currently built with version 1.8.133 (as per 
> https://github.com/apache/arrow/blob/ea3480033e57947ae59e7862ed35b8bf3335ea9f/cpp/thirdparty/versions.txt).
>
> My question is more procedural than technical. Is there a way to request that 
> pyarrow/arrow gets built with the fixed version of the AWS SDK for the builds 
> published to pypi in the next week or two or is this unrealistic? If so, is 
> the next best option for us to git clone the source and build with the fixed 
> version of the AWS SDK?
>
> Thanks in advance for your advice,
> -Joe
>
> Technical details:
> - This occurs because of an incorrect resource handling that segfaults on 
> shutdown in the AWS SDK
> - pyarrow's fs module calls arrow::fs::InitializeS3() on import which sets up 
> the conditions to segfault on exit (depending on how the memory allocations 
> all work out, which is why this is easier to reproduce with MALLOC_PETURB_)
> - This fix for this is 
> (https://github.com/aws/aws-sdk-cpp/commit/a2512bd02addd77515430ac74d7ee5f37343ec99)
> - The first release tag I see for this commit in aws-sdk-cpp is in version 
> 1.9.214
>
>
>
>
>
>
>
> Sent via Superhuman
>


Re: Documentation of concurrency of the compute API?

2022-03-10 Thread Weston Pace
As far as I know (and my knowledge here may be dated) the compute
kernels themselves do not do any concurrency.  There are certainly
compute kernels that could benefit from concurrency in this manner
(many kernels naively so) and I think things are setup so that, if we
decide to tackle this feature, we could do so in a systematic way
(instead of writing something for each kernel).

I believe that kernels, if given a unique kernel context, should be thread safe.

The streaming compute engine, on the other hand, does support
concurrency.  It is mostly driven by the scanner at the moment (e.g.
each batch we fetch from the scanner gets a fresh thread task for
running through the execution plan) but there is some intra-node
concurrency in the hash join and (I think) the hash aggregate nodes.
This has been sufficient to saturate cores on the benchmarks we run.
I know there is ongoing interest in understanding and improving our
concurrency here.

The scanner supports concurrency.  It will typically fetch multiple
files at once and, for each file, it will fetch multiple batches at
once (assuming the file has more than one batch).

> I see a large difference between the total time to apply compute functions to 
> a single table (concatenated from many small tables) compared to applying 
> compute functions to each sub-table in the composition.

Which one is better?  Can you share a reproducible example?

On Thu, Mar 10, 2022 at 12:01 PM Aldrin  wrote:
>
> Hello!
>
> I'm wondering if there's any documentation that describes the 
> concurrency/parallelism architecture for the compute API. I'd also be 
> interested if there are recommended approaches for seeing performance of 
> threads used by Arrow--should I try to check a processor ID and infer 
> performance or are there particular tools that the community uses?
>
> Specifically, I am wondering if the concurrency is going to be different when 
> using a ChunkedArray as an input compared to an Array or for ChunkedArrays 
> with various chunk sizes (1 chunk vs tens or hundreds). I see a large 
> difference between the total time to apply compute functions to a single 
> table (concatenated from many small tables) compared to applying compute 
> functions to each sub-table in the composition. I'm trying to figure out 
> where that difference may come from and I'm wondering if it's related to 
> parallelism within Arrow.
>
> I tried using the github issues and JIRA issues (e.g.  [1]) as a way to 
> sleuth the info, but I couldn't find anything. The pyarrow API seems to have 
> functions I could try and use to figure it out (cpu_count and set_cpu_count), 
> but that seems like a vague road.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-12726
>
>
> Thank you!
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz


Re: [Python][Parquet]pq.ParquetFile.read faster than pq.read_table?

2022-03-08 Thread Weston Pace
The issue was a combination of python & C++ so it isn't something we'd
see in the micro benchmarks.  In the macro benchmarks this regression
actually did show up pretty clearly [1] but I didn't notice it in the
PR comment that conbench made.  Jonathan Keane raised [2] on the
conbench repo which would consider more salient reporting of
regressions.  We may also consider reviewing some of the largest
outstanding regressions as we approach a release or as a part of the
RC process.

[1] 
https://conbench.ursa.dev/compare/runs/c4d5e65d088243259e5198f4c0e219c9...5a1c693586c74471b7c8ba775005db54/
[2] https://github.com/conbench/conbench/issues/307

On Tue, Mar 8, 2022 at 9:05 AM Wes McKinney  wrote:
>
> Since this isn't the first time this specific issue has happened in a
> major release, is there a way that a test or benchmark regression
> check could be introduced to prevent this category of problem in the
> future?
>
> On Thu, Feb 24, 2022 at 9:48 PM Weston Pace  wrote:
> >
> > Thanks for reporting this.  It seems a regression crept into 7.0.0
> > that accidentally disabled parallel column decoding when
> > pyarrow.parquet.read_table is called with a single file.  I have filed
> > [1] and should have a fix for it before the next release.  As a
> > workaround you can use the datasets API directly, this is already what
> > pyarrow.parquet.read_table is using under the hood when
> > use_legacy_dataset=False.  Or you can continue using
> > use_legacy_dataset=True.
> >
> > import pyarrow.dataset as ds
> > table = ds.dataset('file.parquet', format='parquet').to_table()
> >
> > [1] https://issues.apache.org/jira/browse/ARROW-15784
> >
> > On Wed, Feb 23, 2022 at 10:59 PM Shawn Zeng  wrote:
> > >
> > > I am using a public benchmark. The origin file is 
> > > https://homepages.cwi.nl/~boncz/PublicBIbenchmark/Generico/Generico_1.csv.bz2
> > >  . I used pyarrow version 7.0.0 and pq.write_table api to write the csv 
> > > file as a parquet file, with compression=snappy and use_dictionary=true. 
> > > The data has ~20M rows and 43 columns. So there is only one row group 
> > > with row_group_size=64M as default. The OS is Ubuntu 20.04 and the file 
> > > is on local disk.
> > >
> > > Weston Pace  于2022年2月24日周四 16:45写道:
> > >>
> > >> That doesn't really solve it but just confirms that the problem is the 
> > >> newer datasets logic.  I need more information to really know what is 
> > >> going on as this still seems like a problem.
> > >>
> > >> How many row groups and how many columns does your file have?  Or do you 
> > >> have a sample parquet file that shows this issue?
> > >>
> > >> On Wed, Feb 23, 2022, 10:34 PM Shawn Zeng  wrote:
> > >>>
> > >>> use_legacy_dataset=True fixes the problem. Could you explain a little 
> > >>> about the reason? Thanks!
> > >>>
> > >>> Weston Pace  于2022年2月24日周四 13:44写道:
> > >>>>
> > >>>> What version of pyarrow are you using?  What's your OS?  Is the file 
> > >>>> on a local disk or S3?  How many row groups are in your file?
> > >>>>
> > >>>> A difference of that much is not expected.  However, they do use 
> > >>>> different infrastructure under the hood.  Do you also get the faster 
> > >>>> performance with pq.read_table(use_legacy_dataset=True) as well.
> > >>>>
> > >>>> On Wed, Feb 23, 2022, 7:07 PM Shawn Zeng  wrote:
> > >>>>>
> > >>>>> Hi all, I found that for the same parquet file, using 
> > >>>>> pq.ParquetFile(file_name).read() takes 6s while 
> > >>>>> pq.read_table(file_name) takes 17s. How do those two apis differ? I 
> > >>>>> thought they use the same internals but it seems not. The parquet 
> > >>>>> file is 865MB, snappy compression and enable dictionary. All other 
> > >>>>> settings are default, writing with pyarrow.


Re: Windows build of Arrow Flight and Flight SQL

2022-03-06 Thread Weston Pace
Thanks, sorry I missed that.  I agree, my interpretation of that error
is that it seems to be pulling in the release version of re2.lib.  Do
you know how your build is resolving re2.lib?  Is it coming from vcpkg
or is it perhaps coming from a system installed location somewhere?
Or is it falling back to a from-source build?  Apologies if the
question is nonsensical, I don't work with vcpkg or Windows much.

On Sun, Mar 6, 2022 at 9:55 AM James Duong  wrote:
>
> Hi Weston,
>
> The build log is attached in the email Alex sent out originally.
>
> Just had a glance and saw errors such as:
> re2.lib(regexp.cc.obj) : error LNK2038: mismatch detected for 
> 'RuntimeLibrary': value 'MTd_StaticDebug' doesn't match value 
> 'MDd_DynamicDebug' in array_base.obj
>
> Which usually indicate MSVC runtime mismatches (looks like one project is 
> building against the Static Debug runtime while another is using the Dynamic 
> Debug runtime)
>
>
> On Sun., Mar. 6, 2022, 11:46 Weston Pace,  wrote:
>>
>> You may need to share some specific error messages or a build log.
>>
>> Or, ideally, setup some kind of CI environment that mirrors your
>> environment and reproduce your failure.  Github has runners that have
>> support for visual studio 2022 and I believe there are actions to
>> install vcpkg.  You could create a fork of the arrow repo, add a new
>> CI job, reproduce the error, and then share the error with the mailing
>> list.  This will make it a lot easier for others to debug the root
>> cause and opens up the possibility of later adding that as a regular
>> nightly job.
>>
>> On Fri, Mar 4, 2022 at 1:47 PM David Li  wrote:
>> >
>> > Not too sure but perhaps try a Release build, Debug builds have been a 
>> > problem for others, e.g. see ARROW-15298 [1]
>> >
>> > I've personally not gotten a debug build to work before.
>> >
>> > [1]: https://issues.apache.org/jira/browse/ARROW-15298
>> >
>> > -David
>> >
>> > On Fri, Mar 4, 2022, at 17:55, Alex McRae (CW) wrote:
>> >
>> > Hi all, happy friday!
>> >
>> > I am having issues building arrow with flight and flightsql on a fresh 
>> > install of windows.
>> >
>> > My configuration uses VCPKG, static build, and visual studio 2022 
>> > generation. The generation is successful however running cmake --build 
>> > ./build --config Debug fails. With a bunch of different errors stemming 
>> > from winsock2.h, which I understand comes from the Windows SDK, and 
>> > resolve issues in the DLL. I have attached a log below.
>> >
>> > I run these commands in the cpp/build directory of the arrow repository. 
>> > As outlined https://arrow.apache.org/docs/developers/cpp/windows.html
>> >
>> > cmake .. -DARROW_FLIGHT=ON -DARROW_FLIGHT_SQL=ON 
>> > -DCMAKE_TOOLCHAIN_FILE=C:/tools/vcpkg/scripts/buildsystems/vcpkg.cmake 
>> > -DARROW_DEPENDENCY_SOURCE=VCPKG -DARROW_BUILD_STATIC=ON 
>> > -DVCPKG_TARGET_TRIPLET=x64-windows-static -A x64 
>> > -DARROW_DEPENDENCY_USE_SHARED=OFF
>> >
>> > cmake --build . --config Debug
>> >
>> > Let me know what you think,
>> >
>> > Alex McRae
>> >
>> >
>> > Attachments:
>> >
>> > log.txt
>> >
>> >


Re: Groupby performance between pyarrow and duckdb

2022-03-06 Thread Weston Pace
I don't think you need to do anything differently.  This looks like
the correct way to run this.  The arrow streaming engine is pretty new
and probably has some good low hanging performance improvements that
could be made.  I think most of the dedicated performance work has
been on either optimizing kernels or optimizing scanning from disk
(which isn't being measured here).

I notice you mention "scaling well", are you trying this query on
larger scale factors as well?  The lineitems table at SF1 is pretty
small (~6million rows if I recall) and I'd have to look into it more
but I would guess pyarrow is scanning the table in 1M chunks so you
won't see much parallelism beyond 6 cores.  If I had to guess I might
point to that as the root cause.

Another possibility is the suggestion I had at [1].  I would guess
that pyarrow is using a scanner source node instead of the table
source node (since the table source node was created after the pyarrow
group_by/aggregate functionality) and that adds a bit of overhead when
scanning something already in memory.

Beyond those two guesses I'm sure we are doing something less
efficient than we could.  This test case is helpful, I will jot it
down to look into further when I get some spare cycles.  In the
meantime I'd encourage you to dig in further.  A quick run through
perf could probably tell you if one approach or the other is utilizing
more cores (I'd first crank up the # of runs so the query execution
time is much greater than the setup time).

[1] https://github.com/duckdb/duckdb/issues/3138

On Sat, Mar 5, 2022 at 9:11 AM H G  wrote:
>
> Pyarrow is not scaling well when compared with duckdb. Is there something 
> that needs to be done differently?
>
> Minimal example:
>
> import pyarrow.parquet as pq
> lineitem = pq.read_table('lineitemsf1.snappy.parquet')
> con = duckdb.connect()
>
> %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", 
> "sum")])
> ungrouped_aggregate = '''SELECT SUM(l_extendedprice) FROM lineitem GROUP BY 
> l_returnflag'''
>
> %timeit con.execute(ungrouped_aggregate).fetch_arrow_table()
>
> Results
> %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", 
> "sum")])
> 207 ms ± 9.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>
> %timeit con.execute(ungrouped_aggregate).fetch_arrow_table()
> 71.8 ms ± 2.31 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Re: Windows build of Arrow Flight and Flight SQL

2022-03-06 Thread Weston Pace
You may need to share some specific error messages or a build log.

Or, ideally, setup some kind of CI environment that mirrors your
environment and reproduce your failure.  Github has runners that have
support for visual studio 2022 and I believe there are actions to
install vcpkg.  You could create a fork of the arrow repo, add a new
CI job, reproduce the error, and then share the error with the mailing
list.  This will make it a lot easier for others to debug the root
cause and opens up the possibility of later adding that as a regular
nightly job.

On Fri, Mar 4, 2022 at 1:47 PM David Li  wrote:
>
> Not too sure but perhaps try a Release build, Debug builds have been a 
> problem for others, e.g. see ARROW-15298 [1]
>
> I've personally not gotten a debug build to work before.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15298
>
> -David
>
> On Fri, Mar 4, 2022, at 17:55, Alex McRae (CW) wrote:
>
> Hi all, happy friday!
>
> I am having issues building arrow with flight and flightsql on a fresh 
> install of windows.
>
> My configuration uses VCPKG, static build, and visual studio 2022 generation. 
> The generation is successful however running cmake --build ./build --config 
> Debug fails. With a bunch of different errors stemming from winsock2.h, which 
> I understand comes from the Windows SDK, and resolve issues in the DLL. I 
> have attached a log below.
>
> I run these commands in the cpp/build directory of the arrow repository. As 
> outlined https://arrow.apache.org/docs/developers/cpp/windows.html
>
> cmake .. -DARROW_FLIGHT=ON -DARROW_FLIGHT_SQL=ON 
> -DCMAKE_TOOLCHAIN_FILE=C:/tools/vcpkg/scripts/buildsystems/vcpkg.cmake 
> -DARROW_DEPENDENCY_SOURCE=VCPKG -DARROW_BUILD_STATIC=ON 
> -DVCPKG_TARGET_TRIPLET=x64-windows-static -A x64 
> -DARROW_DEPENDENCY_USE_SHARED=OFF
>
> cmake --build . --config Debug
>
> Let me know what you think,
>
> Alex McRae
>
>
> Attachments:
>
> log.txt
>
>


Re: C++ version of Arrow slower than Python version

2022-03-01 Thread Weston Pace
Does setting UseAsync on the C++ end make a difference?  It's possible we
switched the default to async in python in 6.0.0 but not in C++.

On Tue, Mar 1, 2022, 11:35 Niranda Perera  wrote:

> Oh, I forgot to mention, had to fix LD_LIBRARY_PATH when running the c++
> executable.
> LD_LIBRARY_PATH=$CONDA_PREFIX/lib:$LD_LIBRARY_PATH ./dataset_bench
>
> On Tue, Mar 1, 2022 at 4:34 PM Niranda Perera 
> wrote:
>
>> @Jayeet,
>>
>> I ran your example in my desktop, and I don't see any timing issues
>> there. I used conda to install pyarrow==6.0.0
>> I used the following command
>> g++ -O3 -std=c++11 dataset_bench.cc -I"$CONDA_PREFIX"/include
>> -L"$CONDA_PREFIX"/lib -larrow -larrow_dataset -lparquet -o dataset_bench
>>
>> And I had to del the objects in the python file, because it was getting
>> killed due to OOM.
>> ```
>> ...
>> for i in range(10):
>> s = time.time()
>> dataset_ = ds.dataset("/home/niranda/flight_dataset",
>> format="parquet")
>> table = dataset_.to_table(use_threads=False)
>> e = time.time()
>> print(e - s)
>>
>> del table
>> del dataset_
>> gc.collect()
>> ```
>>
>> For me c++ takes around ~21s and python ~22s which is expected.
>>
>>
>> On Tue, Mar 1, 2022 at 2:19 PM Jayjeet Chakraborty <
>> jayjeetchakrabort...@gmail.com> wrote:
>>
>>> Hi Sasha,
>>>
>>> Thanks a lot for replying. I tried -O2 earlier but it didn't work. I
>>> tried it again (when compiling with PyArrow SO files) and unfortunately, it
>>> didn't improve the results.
>>>
>>> On Tue, Mar 1, 2022 at 11:14 AM Sasha Krassovsky <
>>> krassovskysa...@gmail.com> wrote:
>>>
 Hi Jayjeet,
 I noticed that you're not compiling dataset_bench with optimizations
 enabled. I'm not sure how much it will help, but it may be worth adding
 `-O2` to your g++ invocation.

 Sasha Krassovsky

 On Tue, Mar 1, 2022 at 11:11 AM Jayjeet Chakraborty <
 jayjeetchakrabort...@gmail.com> wrote:

> Hi Niranda, David,
>
> I ran my benchmarks again with the PyArrow .SO libraries which should
> be optimized. PyArrow version was 6.0.1 installed from pip. Here are my 
> new
> results [1]. Numbers didn't quite seem to improve. You can check my build
> config in the Makefile [2]. I created a README [3] to make it easy for you
> to reproduce on your end. Thanks.
>
> [1]
> https://github.com/JayjeetAtGithub/arrow-flight-benchmark/tree/main/dataset_bench/optimized
> [2]
> https://github.com/JayjeetAtGithub/arrow-flight-benchmark/blob/main/dataset_bench/Makefile
> [3]
> https://github.com/JayjeetAtGithub/arrow-flight-benchmark/blob/main/dataset_bench/README.md
>
> On Tue, Mar 1, 2022 at 10:04 AM Niranda Perera <
> niranda.per...@gmail.com> wrote:
>
>> Hi Jayeet,
>>
>> Could you try building your cpp project against the arrow.so in
>> pyarrow installation? It should be in the lib directory in your python
>> environment.
>>
>> Best
>>
>> On Tue, Mar 1, 2022 at 12:46 PM Jayjeet Chakraborty <
>> jayjeetchakrabort...@gmail.com> wrote:
>>
>>> Thanks for your reply, David.
>>>
>>> 1) I used PyArrow 6.0.1 for both C++ and Python.
>>> 2) The dataset was deployed using this [1] script.
>>> 3) For C++, Arrow was built from source in release mode. You can see
>>> the CMake config here [2].
>>>
>>> I think I need to test once with Arrow C++ installed from packages
>>> instead of me building it. That might be the issue.
>>>
>>> [1]
>>> https://github.com/JayjeetAtGithub/arrow-flight-benchmark/blob/main/common/deploy_data.sh
>>> [2]
>>> https://github.com/JayjeetAtGithub/arrow-flight-benchmark/tree/main/cpp
>>>
>>> Best,
>>> Jayjeet
>>>
>>>
>>>
>>>
>>> On Tue, Mar 1, 2022 at 5:04 AM David Li  wrote:
>>>
 Hi Jayjeet,

 That's odd since the Python API is just wrapping the C++ API, so
 they should be identical if everything is configured the same. (So is 
 the
 Java API, incidentally.) That's effectively what the SO question is 
 saying.

 What versions of PyArrow and Arrow are you using? Just to check the
 obvious things, was Arrow compiled with optimizations? And if we want 
 to
 replicate this, is it possible to get the dataset?

 -David

 On Tue, Mar 1, 2022, at 01:52, Jayjeet Chakraborty wrote:

 Hi Arrow community,

 I was working on a class project for benchmarking Apache Arrow
 Dataset API in different programming languages. I found out that for 
 some
 reason the C++ API example is slower than the Python API example. I 
 ran my
 benchmarks on a 5 GB dataset consisting of 300 16MB parquet files. I 
 tried
 my best to cross verify if all the parameters 

Re: [Python][Parquet]pq.ParquetFile.read faster than pq.read_table?

2022-02-24 Thread Weston Pace
Thanks for reporting this.  It seems a regression crept into 7.0.0
that accidentally disabled parallel column decoding when
pyarrow.parquet.read_table is called with a single file.  I have filed
[1] and should have a fix for it before the next release.  As a
workaround you can use the datasets API directly, this is already what
pyarrow.parquet.read_table is using under the hood when
use_legacy_dataset=False.  Or you can continue using
use_legacy_dataset=True.

import pyarrow.dataset as ds
table = ds.dataset('file.parquet', format='parquet').to_table()

[1] https://issues.apache.org/jira/browse/ARROW-15784

On Wed, Feb 23, 2022 at 10:59 PM Shawn Zeng  wrote:
>
> I am using a public benchmark. The origin file is 
> https://homepages.cwi.nl/~boncz/PublicBIbenchmark/Generico/Generico_1.csv.bz2 
> . I used pyarrow version 7.0.0 and pq.write_table api to write the csv file 
> as a parquet file, with compression=snappy and use_dictionary=true. The data 
> has ~20M rows and 43 columns. So there is only one row group with 
> row_group_size=64M as default. The OS is Ubuntu 20.04 and the file is on 
> local disk.
>
> Weston Pace  于2022年2月24日周四 16:45写道:
>>
>> That doesn't really solve it but just confirms that the problem is the newer 
>> datasets logic.  I need more information to really know what is going on as 
>> this still seems like a problem.
>>
>> How many row groups and how many columns does your file have?  Or do you 
>> have a sample parquet file that shows this issue?
>>
>> On Wed, Feb 23, 2022, 10:34 PM Shawn Zeng  wrote:
>>>
>>> use_legacy_dataset=True fixes the problem. Could you explain a little about 
>>> the reason? Thanks!
>>>
>>> Weston Pace  于2022年2月24日周四 13:44写道:
>>>>
>>>> What version of pyarrow are you using?  What's your OS?  Is the file on a 
>>>> local disk or S3?  How many row groups are in your file?
>>>>
>>>> A difference of that much is not expected.  However, they do use different 
>>>> infrastructure under the hood.  Do you also get the faster performance 
>>>> with pq.read_table(use_legacy_dataset=True) as well.
>>>>
>>>> On Wed, Feb 23, 2022, 7:07 PM Shawn Zeng  wrote:
>>>>>
>>>>> Hi all, I found that for the same parquet file, using 
>>>>> pq.ParquetFile(file_name).read() takes 6s while pq.read_table(file_name) 
>>>>> takes 17s. How do those two apis differ? I thought they use the same 
>>>>> internals but it seems not. The parquet file is 865MB, snappy compression 
>>>>> and enable dictionary. All other settings are default, writing with 
>>>>> pyarrow.


Re: [Python][Parquet]pq.ParquetFile.read faster than pq.read_table?

2022-02-24 Thread Weston Pace
That doesn't really solve it but just confirms that the problem is the
newer datasets logic.  I need more information to really know what is going
on as this still seems like a problem.

How many row groups and how many columns does your file have?  Or do you
have a sample parquet file that shows this issue?

On Wed, Feb 23, 2022, 10:34 PM Shawn Zeng  wrote:

> use_legacy_dataset=True fixes the problem. Could you explain a little
> about the reason? Thanks!
>
> Weston Pace  于2022年2月24日周四 13:44写道:
>
>> What version of pyarrow are you using?  What's your OS?  Is the file on a
>> local disk or S3?  How many row groups are in your file?
>>
>> A difference of that much is not expected.  However, they do use
>> different infrastructure under the hood.  Do you also get the faster
>> performance with pq.read_table(use_legacy_dataset=True) as well.
>>
>> On Wed, Feb 23, 2022, 7:07 PM Shawn Zeng  wrote:
>>
>>> Hi all, I found that for the same parquet file,
>>> using pq.ParquetFile(file_name).read() takes 6s while
>>> pq.read_table(file_name) takes 17s. How do those two apis differ? I thought
>>> they use the same internals but it seems not. The parquet file is 865MB,
>>> snappy compression and enable dictionary. All other settings are default,
>>> writing with pyarrow.
>>>
>>


Re: [Python][Parquet]pq.ParquetFile.read faster than pq.read_table?

2022-02-23 Thread Weston Pace
What version of pyarrow are you using?  What's your OS?  Is the file on a
local disk or S3?  How many row groups are in your file?

A difference of that much is not expected.  However, they do use different
infrastructure under the hood.  Do you also get the faster performance with
pq.read_table(use_legacy_dataset=True) as well.

On Wed, Feb 23, 2022, 7:07 PM Shawn Zeng  wrote:

> Hi all, I found that for the same parquet file,
> using pq.ParquetFile(file_name).read() takes 6s while
> pq.read_table(file_name) takes 17s. How do those two apis differ? I thought
> they use the same internals but it seems not. The parquet file is 865MB,
> snappy compression and enable dictionary. All other settings are default,
> writing with pyarrow.
>


Re: FIleNotFound Error on root directory with fsspec partitioned dataset

2022-02-23 Thread Weston Pace
I'm pretty sure GCS is similar to S3 in that there is no such thing as
a "directory".  Instead a directory is often emulated by an empty
file.  Note that the single file being detected is hires-sonde/ (with
a trailing slash).  I'm pretty sure this is the convention for
creating mock directories.  I'm guessing, if there were multiple
files, we would work ok because we just skip the empty files.

So perhaps this is a problem unique to gcsfs/fsspec and trying to read
an "empty directory".

You might also try the GCS filesystem (released with 7.0.0) instead of
going through fsspec.

On Wed, Feb 23, 2022 at 2:23 AM Joris Van den Bossche
 wrote:
>
>
> On Mon, 21 Feb 2022 at 00:04, Kelton Halbert  wrote:
>>
>> Hello,
>>
>> I’ve been learning and working with PyArrow recently for a project to store 
>> some atmospheric science data as part of a partitioned dataset, and recently 
>> the dataset class with the  fsspec/gcsfs filesystem has started producing a 
>> new error.
>
>
> Hi Kelton,
>
> One more question: you say that this started producing a new error, so I 
> suppose this worked a while ago? Do you know if you updated some packages (eg 
> gcsfs or fsspec) since then? Or something else that might have changed?
>
> Joris
>


Re: [Python][Parquet]Why the default row_group_size is 64M?

2022-02-22 Thread Weston Pace
These are all great points.  A few notes from my own experiments
(mostly confirming what others have said):

 1) 1M rows is the minimum safe size for row groups on HDD (and
perhaps a hair too low in some situations) if you are doing any kind
of column selection (i.e. projection pushdown).  As that number gets
lower the ratio of skips to reads increases to the point where it
starts to look too much like "random read" for the HDD and performance
suffers.
 2) 64M rows is too high for two (preventable) reasons in the C++
datasets API.  The first reason is that the datasets API does not
currently support sub-row-group streaming reads (e.g. we can only read
one row group at a time from the disk).  Large row groups leads to too
much initial latency (have to read an entire row group before we start
processing) and too much RAM usage.
 3) 64M rows is also typically too high for the C++ datasets API
because, as Micah pointed out, we don't yet have support in the
datasets API for page-level column indices.  This means that
statistics-based filtering is done at the row-group level and very
coarse grained.  The bigger the block the less likely it is that a
filter will eclipse the entire block.

Points 2 & 3 above are (I'm fairly certain) entirely fixable.  I've
found reasonable performance with 1M rows per row group and so I
haven't personally been as highly motivated to fix the latter two
issues but they are somewhat high up on my personal priority list.  If
anyone has time to devote to working on these issues I would be happy
to help someone get started.  Ideally, if we can fix those two issues,
then something like what Micah described (one row group per file) is
fine and we can help shield users from frustrating parameter tuning.

I have a draft of a partial fix for 2 at [1][2].  I expect I should be
able to get back to it before the 8.0.0 release.  I couldn't find an
issue for the more complete fix (scanning at page-resolution instead
of row-group-resolution) so I created [3].

A good read for the third point is at [4].  I couldn't find a JIRA
issue for this from a quick search but I feel that we probably have
some issues somewhere.

[1] https://github.com/apache/arrow/pull/12228
[2] https://issues.apache.org/jira/browse/ARROW-14510
[3] https://issues.apache.org/jira/browse/ARROW-15759
[4] 
https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/

On Tue, Feb 22, 2022 at 5:07 PM Shawn Zeng  wrote:
>
> Hi, thank you for your reply. The confusion comes from what Micah mentions: 
> the row_group_size in pyarrow is 64M rows, instead of 64MB. In that case it 
> does not align will Hadoop block size unless you have only 1B per row. So in 
> most case the row group will be very large than 64MB. I think this parameter 
> uses num of rows instead of size already brings confusion when I read the doc 
> and change the parameter.
>
> I dont understand clearly why the current thinking is to have 1 row-group per 
> file? Could you explain more?
>
> Micah Kornfield  于2022年2月23日周三 03:52写道:
>>>
>>> What is the reason for this? Do you plan to change the default?
>>
>>
>> I think there is some confusion, I do believe this is the number of rows but 
>> I'd guess it was set to 64M because it wasn't carefully adapted from 
>> parquet-mr which I would guess uses byte size and therefore it aligns well 
>> with the HDFS block size.
>>
>> I don't recall seeing any open issues to change it.  It looks like for 
>> datasets [1] the default is 1 million, so maybe we should try to align these 
>> two.  I don't have a strong opinion here, but my impression is that the 
>> current thinking is to generally have 1 row-group per file, and eliminate 
>> entire files.  For sub-file pruning, I think column indexes are a better 
>> solution but they have not been implemented in pyarrow yet.
>>
>> [1] 
>> https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html#pyarrow.dataset.write_dataset
>>
>> On Tue, Feb 22, 2022 at 4:50 AM Marnix van den Broek 
>>  wrote:
>>>
>>> hi Shawn,
>>>
>>> I expect this is the default because Parquet comes from the Hadoop 
>>> ecosystem, and the Hadoop block size is usually set to 64MB. Why would you 
>>> need a different default? You can set it to the size that fits your use 
>>> case best, right?
>>>
>>> Marnix
>>>
>>>
>>>
>>> On Tue, Feb 22, 2022 at 1:42 PM Shawn Zeng  wrote:

 For a clarification, I am referring to pyarrow.parquet.write_table

 Shawn Zeng  于2022年2月22日周二 20:40写道:
>
> Hi,
>
> The default row_group_size is really large, which means a large table 
> smaller than 64M rows will not get the benefits of row group level 
> statistics. What is the reason for this? Do you plan to change the 
> default?
>
> Thanks,
> Shawn


Re: [Python] Implementing own Filesystem Subclass in PyArrow v3.0.0

2022-02-10 Thread Weston Pace
> 3. Given our limited API sets, what would you recommend?

The filesystem interface is already rather minimal.  We generally
don't put a function in there if we aren't using it somewhere.  That
being said, you can often get away with a mock implementation.  From a
quick rundown:

GetFileInfo/OpenInputStream/OpenOutputStream/OpenInputFile - These are
used almost everywhere
CreateDir/DeleteDir/DeleteDirContents - These are used when writing
datasets (and so you will need it if you want to write partitioned
parquet)
DeleteFile/Move/CopyFile - I think these may only be used in our unit
tests, you could maybe get by without them

> - CRUD bucket
> - CRUD objects
> Currently, there is no support for streaming or working with any type of file 
> handle. I've already looked into how s3fs.cc was implemented but was not sure 
> I could apply it in my situation.

Some thoughts:

 * Do you support empty directories?

This is a tricky one.  We do rely on empty directories in some of our
datasets APIs.  For example, we CreateDir and then put files in it.
There is some discussion on [1] about how we might emulate this in GCS
but I don't know what exactly got implemented.

 * No support for streaming?

Does this mean you need to download an entire file at a time (e.g. you
can't stream the file or do a partial read of the file)?  In this case
you can mock it by downloading the file and then wrapping it with
arrow::io::BufferReader.  That provides the input stream and readable
file interfaces on top of an in-memory buffer.  You can also probably
use arrow::io::BufferedOutputStream to collect all writes in memory
and then override the Close method to actually persist the write.
This being said, you will of course use considerably more memory than
you need to.  So you'll need to make sure your files are small enough
to fit into memory.

[1] https://issues.apache.org/jira/browse/ARROW-1231

On Thu, Feb 10, 2022 at 1:31 AM Joris Van den Bossche
 wrote:
>
> HI Jae,
>
> Mainly providing an answer on your first question:
>
> On Thu, 10 Feb 2022 at 05:06, Jae Lee  wrote:
>>
>> Hi Team,
>>
>> I would like to implement a custom subclass of pyarrow.filesystem.FileSystem 
>> (or perhaps pyarrow.fs.FileSystem) and was hoping to leverage the full 
>> potential of what pyarrow provides with parquet files - partitioning, 
>> filter, etc. The underneath storage is cloud-based and not S3 compatible. 
>> Our API only provides support for
>> - CRUD bucket
>> - CRUD objects
>> Currently, there is no support for streaming or working with any type of 
>> file handle. I've already looked into how s3fs.cc was implemented but was 
>> not sure I could apply it in my situation.
>>
>> Questions:
>> 1. What Filesystem class do I need to implement to take full advantage of 
>> what arrow provides in terms of dealing with parquet files? 
>> (pyarrow.filesystem.FileSystem vs pyarrow.fs.FileSystem)
>
>
> The pyarrow.filesystem module is deprecated, so you should look at pyarrow.fs 
> FileSystems. Those filesystems are mostly implemented in C++ and can't be 
> directly subclassed in Python (only in C++), but there is a dedicated 
> mechanism to implement a FileSystem in Python, using the PyFileSystem class 
> and the FileSystemHandler class (see 
> https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations).
> You would need to implement your own FileSystemHandler, and then you can 
> create a filesystem object that will be recognized by pyarrow functions with 
> `fs = PyFileSystem(my_handler)`.
>
> We don't really have documentation about this (apart from the API docs for 
> FileSystemHandler), but it might probably be best to look at an example. And 
> we have an actual use case of this in our own code base to wrap 
> fsspec-compatible python filesystems that can be used as example: see 
> https://github.com/apache/arrow/blob/c0bae8daea2ace51c64f6db38cfb3d04c5bed657/python/pyarrow/fs.py#L254-L406
>
>>
>> 2. Is there any example of implementation of cloud-based non-s3 compatible 
>> filesystem?
>
>
> I am not aware of one in Python (in C++, we now also have a Google Cloud 
> Storage filesystem, but I suppose that has an extensive API). The Python 
> fsspec package (which can be used in pyarrow through the above mentioned 
> handler) implements some filesystems for "cloud" storage (eg for http, ftp), 
> but I am not familiar with the implementation details.
>
>>
>> 3. Given our limited API sets, what would you recommend?
>>
>> Initially, I was thinking to download the entire parquet file/directory to a 
>> local file system and provide a handle but was curious if there would be an 
>> any better way to handle this.
>>
>> Thank you in advance!
>> Jae


Re: Multi-threaded reads via ParquetFile API

2022-02-03 Thread Weston Pace
If you are aiming to speed up processing / consumption of batches then
you can use a queue.  For example:

# Once per client
pqf = pq.ParquetFile(f, memory_map=True)
# maxsize will control how much you all buffer in RAM
queue = queue.Queue(maxsize=32)
start_workers(queue)
for batch in pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True):
  queue.put(batch)
queue.join()
stop_workers()

# Each worker would do
while not stopped:
next_batch = queue.get()
process_batch(next_batch)
queue.task_done()

On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen  wrote:
>
> We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader is 
> specific to the model being trained.  So multiple Beam processes can't share 
> the same model (until we move into DGL distributed mode later this year).
>
> We're trying to optimize throughput of the GraphDataLoader consume/process 
> these Parquet files.
>
> On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen  wrote:
>>
>> The use case is for a GraphDataLoader to run w/ multiple threads.  
>> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to 
>> convert into DGL-compatible graph objects.
>>
>>
>> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield  
>> wrote:
>>>
>>> Hi Cindy,

 I'd like to ingest  batches within a Parquet file in parallel.
>>>
>>> What is the motivation here?  Is it speeding up Parquet reading or 
>>> processing after the fact?
>>>
>>>
>>> Side note, the size of your row groups seems quite small (it might be right 
>>> for your specific use-case).
>>>
>>> Cheers,
>>> Micah
>>>
>>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen  wrote:

 Maybe -- will give it a try.  Thanks for the suggestion.

 On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta  wrote:
>
> There is a parameter to iter_batches where you can pass in the row_group 
> number, or a list of row groups. Would this help to read the Parquet file 
> in parallel?
>
> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen  
> wrote:
>>
>> Hi -
>>
>> I'd like to ingest  batches within a Parquet file in parallel.  The 
>> client (DGLDataset) needs to be thread-safe.  What's the best API for me 
>> to use  to do so?
>>
>> Here's the metadata for one example file:
>>
>>   
>>   created_by: parquet-mr version 1.12.0 (build 
>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>   num_columns: 4
>>   num_rows: 100
>>   num_row_groups: 9997
>>   format_version: 1.0
>>   serialized_size: 17824741
>>
>> I want the consumption of batches to be distributed among multiple 
>> workers.  I'm currently trying something like this:
>>
>> # Once per client
>>
>> pqf = pq.ParquetFile(f, memory_map=True)
>>
>>
>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not 
>> thread-safe.  This makes intuitive sense.
>> pq_batches = pqf.iter_batches(self.rows_per_batch, 
>> use_pandas_metadata=True)
>>
>>
>>
>> My workaround is to buffer these ParquetFile batches into DataFrame [], 
>> but this is memory-intensive, so will not scale to multiple of these 
>> input files.
>>
>> What's a better PyArrow pattern to use so I can distribute batches to my 
>> workers in a thread-safe manner?
>>
>> Thanks --
>>
>>
>>
>>
>>
>
>
> --
> Partha Dutta
> partha.du...@gmail.com


Re: pyarrow write_dataset Illegal Instruction

2022-01-27 Thread Weston Pace
ndardArgs.cmake:137 
> (message):
>   Could NOT find Parquet (missing: PARQUET_INCLUDE_DIR PARQUET_LIB_DIR
>   PARQUET_SO_VERSION)
> Call Stack (most recent call first):
>   /usr/share/cmake-3.13/Modules/FindPackageHandleStandardArgs.cmake:378 
> (_FPHSA_FAILURE_MESSAGE)
>   cmake_modules/FindParquet.cmake:115 (find_package_handle_standard_args)
>   CMakeLists.txt:447 (find_package)
>
>
> -- Configuring incomplete, errors occurred!
> See also 
> "~/build_arrow/arrow/python/build/temp.linux-x86_64-3.7/CMakeFiles/CMakeOutput.log".
> error: command '/usr/bin/cmake' failed with exit code 1
>
> On Tue, Jan 25, 2022 at 12:27 AM Weston Pace  wrote:
>>
>> Your problem is probably old hardware, specifically an older CPU.  Pip 
>> builds rely on popcnt (which I think is SSE4.1?)
>>
>> I'm pretty sure you are right that you can compile from source and be ok.  
>> It's a performance / portability tradeoff that has to be made when packaging 
>> prebuilt binaries.
>>
>> On Mon, Jan 24, 2022, 6:18 PM Chris Nyland  wrote:
>>>
>>> Hello,
>>>
>>> I was just taking a look at pyarrow in my off hours. I was trying to write 
>>> a partitioned data set based on the birthdays example in the pyarrow cook 
>>> book. However when I run the script I get no data written and a "Illegal 
>>> Instruction" message prints to screen, no exception is raised. I installed 
>>> the pyarrow manylinux x86_64 version 6.0.1 wheel via pip for Python 3.7 
>>> using a virtual environment. I suspect that if I build pyarrow myself it 
>>> would work, it doesn't look too terribly difficult, but it is still kind of 
>>> a drag since I was looking to make some quick progress on an off hours 
>>> project.
>>>
>>> If anyone has any ideas on what else it would be I would like to try it 
>>> before building the library myself. Also is this a pretty typical issue to 
>>> run into? At work I primarily do Python on Windows and really haven't had 
>>> any build issues there since the Python 2.7 days.
>>>
>>> Thanks
>>>
>>> Chris


Re: pyarrow write_dataset Illegal Instruction

2022-01-24 Thread Weston Pace
Your problem is probably old hardware, specifically an older CPU.  Pip
builds rely on popcnt (which I think is SSE4.1?)

I'm pretty sure you are right that you can compile from source and be ok.
It's a performance / portability tradeoff that has to be made when
packaging prebuilt binaries.

On Mon, Jan 24, 2022, 6:18 PM Chris Nyland  wrote:

> Hello,
>
> I was just taking a look at pyarrow in my off hours. I was trying to write
> a partitioned data set based on the birthdays example in the pyarrow cook
> book. However when I run the script I get no data written and a "Illegal
> Instruction" message prints to screen, no exception is raised. I installed
> the pyarrow manylinux x86_64 version 6.0.1 wheel via pip for Python 3.7
> using a virtual environment. I suspect that if I build pyarrow myself it
> would work, it doesn't look too terribly difficult, but it is still kind of
> a drag since I was looking to make some quick progress on an off hours
> project.
>
> If anyone has any ideas on what else it would be I would like to try it
> before building the library myself. Also is this a pretty typical issue to
> run into? At work I primarily do Python on Windows and really haven't had
> any build issues there since the Python 2.7 days.
>
> Thanks
>
> Chris
>


Re: [Python] add a new column to a table during dataset consolidation

2022-01-24 Thread Weston Pace
. You are looking for a row-wise 
> mean, isn't it! I don't think there's an API for that pyarrow.compute. Sorry 
> my bad.
> You could call `add` for each column and manually create the mean (this would 
> be a vectorized operation column-wise. But this would create 2 additional 
> length-sized memory allocations at least AFAIU, because arrow doesn't have 
> mutable methods).
> I wasn't aware that pyarrow API didnt have an add_column method (sorry 
> again!). It's available in C++ API. But for that also, you could simply 
> create a list with the existing columns.
> Following would be my suggestion (not tested). But I agree, this is not as 
> pretty as the pandas solution! :-)
> ```
> def calc_mean(batch, cols):
>res = batch[cols[0]]
>
>   if len(cols) == 1:
>   return res
>
>for c in cols[1:]:
>  res = pa.compute.add(sum, batch[c])
>
>   return pa.compute.divide(res, len(cols))
>
> ...
>
> for batch in scanner.to_batches():
> new_cols = batch.columns
> new_cols.append(calc_mean(batch, cols))
>
> new_batch = pa.record_batch(data=new_cols,
>schema=batch.schema.append(pa.field('mean', pa.float64(
> ...
> ```
>
>
>
> On Mon, Jan 24, 2022 at 9:11 AM Antonino Ingargiola  
> wrote:
>>
>> Hi Niranda,
>>
>> On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera  
>> wrote:
>>>
>>> Did you try using `pyarrow.compute` options? Inside that batch iterator 
>>> loop you can call the compute mean function and then call the add_column 
>>> method for record batches.
>>
>>
>> I cannot find how to pass multiple columns to be aggregated to 
>> pyarrow.compute functions. As far as I understand pyarrow.compute functions 
>> only accept a single 1D pyarrow.array as input. Maybe you had something else 
>> in mind.
>>
>> Besides, I don't see any add_column or append_column method for 
>> pyarrow.RecordBatch[1]
>>
>> [1] https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html
>>
>> The only solution I see is calling the compute function for each row of the 
>> RecordBatch (transforming each row to a pyarrow.array somehow). But this 
>> would be quite inefficient. On the contrary, pandas can compute the 
>> aggregation across columns in a vectorized way (at the additional cost of 
>> pyarrow <-> pandas roundtrip conversion).
>>
>>> In the latest arrow code base might have support for 'projection', that 
>>> could do this without having to iterate through record batches. @Weston 
>>> Pace WDYT?
>>
>>
>> If this is possible it would be great!
>>
>> Best,
>> Antonio
>
>
>
> --
> Niranda Perera
> https://niranda.dev/
> @n1r44
>


Re: [PyArrow] DictionaryArray isDelta Support

2022-01-14 Thread Weston Pace
ve a CPU cost, this code path is required
>   //  for the IPC file format)
>   continue;
> }
> ```
>
> If either of these equality conditions trigger, then we continue on to
> check
> the next dictionary.
>
> If it hasn't been previously output then we might either be able to write a
> replacement or a delta.
>
> Currently the code throws an error in both of these cases if we're writing
> to
> the IPC file format - this goes against the spec.
>
> If the current dictionary is "just" an extension of the previous
> dictionary and
> deltas are enabled then it marks where the new starting position is.
>
> Given this, it creates a payload - delta or not - and writes it, ensuring
> to
> update the stats data structure.
>
> ```
>   IpcPayload payload;
>   if (delta_start) {
> RETURN_NOT_OK(GetDictionaryPayload(dictionary_id,
> /*is_delta=*/true,
>dictionary->Slice(delta_start),
> options_,
>));
>   } else {
> RETURN_NOT_OK(
> GetDictionaryPayload(dictionary_id, dictionary, options_,
> ));
>   }
>   RETURN_NOT_OK(WritePayload(payload));
>   ++stats_.num_dictionary_batches;
>   if (dictionary_exists) {
> if (delta_start) {
>   ++stats_.num_dictionary_deltas;
> } else {
>   ++stats_.num_replaced_dictionaries;
> }
>   }
> ```
>
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> Apache Arrow is a multi-language toolbox for accelerated data interchange
> and in-memory processing - arrow/writer.cc at
> 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> github.com
> **
>
> --
> *From:* Weston Pace 
> *Sent:* 27 July 2021 19:52
> *To:* user@arrow.apache.org 
> *Subject:* Re: [PyArrow] DictionaryArray isDelta Support
>
> I'm able to verify what Sam is seeing. It appears Arrow does not
> support dictionary deltas in the file format. However, from my
> reading of the spec it does indeed seem proper deltas should be
> allowed. A small patch[1] allowed me to write delta dictionaries in
> the file format but then reading fails at [2] which seems more
> explicit that dictionary deltas of any kind were not originally
> supported with the file format. I think the fix for read will
> probably be a bit more involved and require some good testing. I've
> opened ARROW-13467 for further discussion.
>
> [1]
> https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b
> [2]
> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062
> [3] https://issues.apache.org/jira/browse/ARROW-13467
>
> On Sun, Jul 25, 2021 at 10:12 PM Sam Davis 
> wrote:
> >
> > Hi Wes,
> >
> > Yes, that is exactly it. For the file format, the spec dictates that it
> should be possible to output deltas but currently this is not possible. An
> `ArrowInvalid` error is thrown.
> >
> > Example code:
> >
> > ```
> > import pandas as pd
> > import pyarrow as pa
> >
> > print(pa.__version__)
> >
> > schema = pa.schema([
> > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > ])
> >
> > pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> >
> > pd2 = pd.DataFrame({"foo": pd.Categorical(["a", ""],
> categories=["a", "b", ""])})
> > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> >
> > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> >
> > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema,
> options=options) as writer:
> > writer.write(b1)
> >
> > writer.write(b2)
> > ```
> >
> > Best,
> >
> > Sam
> > 
> > From: Wes McKinney 
> > Sent: 24 July 2021 01:43
> > To: user@arrow.apache.org 
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > If I'm interpreting you correctly, the issue is that every dictionary
> > must be a prefix of a common dictionary for the delta logic to work.
> > So if the first bat

Re: [Python] Does write_file() finish when returning?

2022-01-06 Thread Weston Pace
I'm guessing you mean write_table?  Assuming you are passing a
filename / string (and not an open output stream) to write_table I
would expect that any files opened during the call have been closed
before the call returns.

Pedantically, this is not quite the same thing as "finished writing on
disk" but more accurately, "finished writing to the OS".  A power
outage shortly after a call to write_table completes could lead to
partial loss of a file.

However, this should not matter for your case if I am understanding
your problem statement in that reddit post.  As long as you open that
file handle to read after you have finished the call to write_table
you should see all of the contents immediately.

There is always the opportunity for bugs but many of our unit tests
write files and then turn around and immediately read them and we
don't typically have trouble here.  I'm assuming your reader & writer
are on the same thread & process?  If you open a reader it's possible
your read task is running while your write task is running and then no
guarantees would be made.

On Thu, Jan 6, 2022 at 12:47 PM Brandon Chinn  wrote:
>
> When `pyarrow.parquet.write_file()` returns, is the parquet file finished 
> writing on disk, or is it still writing?
>
> Context: 
> https://www.reddit.com/r/learnpython/comments/rxmq43/help_with_python_file_flakily_not_returning_full/hrj99tq/?context=3
>
> Thanks!
> Brandon Chinn


Re: [Python] weird memory usage issue when reading a parquet file.

2022-01-03 Thread Weston Pace
Wes' theory seems sound.  Perhaps the easiest way to test that theory would
be to put a five second sleep after the to_table call and before you run
show_mem.  In theory 1s is long enough but 5s is nice to remove any doubt.

If there is a filter (that cannot be serviced by parquet row group
statistics) there will be more total allocation.  This is because we first
need to read in the full row group and then we need to filter it which is a
copy operation to a (hopefully) smaller sized row group.

The filtering should happen after the column pruning but if the filter is
referencing any columns that are not included in the final result then we
will need to load in those additional columns, use them for the filter, and
then drop them.  This is another way you might end up with more total
allocation if you use a filter.

-Weston

On Mon, Jan 3, 2022 at 3:10 AM Wes McKinney  wrote:

> By default we use jemalloc as our memory allocator which empirically has
> been seen to yield better application performance. jemalloc does not
> release memory to the operating system right away, this can be altered by
> using a different default allocator (for example, the system allocator may
> return memory to the OS right away):
>
>
> https://arrow.apache.org/docs/cpp/memory.html#overriding-the-default-memory-pool
>
> I expect that the reason that psutil-reported allocated memory is higher
> in the last case is because some temporary allocations made during the
> filtering process are raising the "high water mark". I believe can see what
> is reported as the peak memory allocation by looking at
> pyarrow.default_memory_pool().max_memory()
>
> On Mon, Dec 20, 2021 at 5:10 AM Yp Xie  wrote:
>
>> Hi guys,
>>
>> I'm getting this weird memory usage info when I tried to start using
>> pyarrow to read a parquet file.
>>
>> I wrote a simple script to show how much memory is consumed after each
>> step.
>> the result is illustrated in the table:
>>
>> row number pa.total_allocated_bytes memory usage by psutil
>> without filters 5131100 177M 323M
>> with field filter 57340 2041K 323M
>> with column pruning 5131100 48M 154M
>> with both field filter and column pruning 57340 567K 204M
>>
>> the weird part is: the total memory usage when I apply both field filter
>> and column pruning is *larger* than only column pruning applied.
>>
>> I don't know how that happened, do you guys know the reason for this?
>>
>> thanks.
>>
>> env info:
>>
>> platform: Linux-5.4.0-91-generic-x86_64-with-glibc2.10
>> distro info: ('Ubuntu', '20.04', 'focal')
>> pyarrow: 6.0.1
>>
>>
>> script code:
>>
>> import pyarrow as pa
>> import psutil
>> import os
>> import pyarrow.dataset as ds
>>
>> pid = os.getpid()
>>
>> def show_mem(action: str) -> None:
>> mem = psutil.Process(pid).memory_info()[0] >> 20
>> print(f"*** memory usage after {action} **")
>> print(f"*   {mem}M*")
>> print(f"**")
>>
>> dataset = ds.dataset("tmp/uber.parquet", format="parquet")
>> show_mem("read dataset")
>> projection = {
>> "Dispatching_base_num": ds.field("Dispatching_base_num")
>> }
>> filter = ds.field("locationID") == 100
>> table = dataset.to_table(
>> filter=filter,
>> columns=projection
>> )
>> print(f"table row number: {table.num_rows}")
>> print(f"total bytes: {pa.total_allocated_bytes() >> 10}K")
>> show_mem("dataset.to_table")
>>
>


  1   2   >