> You are looking for a row-wise mean, isn't it! I don't think there's an API 
> for that pyarrow.compute.

Right, I don't think this is in there today either.  The C++ compute
infrastructure itself can create functions that run on record batches
(instead of just arrays).  An example of this is drop_null (which will
drop each row if any value in that row is null).  So it should be
possible for you to create a kernel that does this but, today, you
would have to write the kernel in C++.

> I wasn't aware that pyarrow API didnt have an add_column method (sorry 
> again!).

Until this is fixed you can work around this by wrapping the batch
with a table.  It's a metadata-only (zero-copy) operation so shouldn't
take much time:

    def add_column_to_batch(batch, arr, name):
        tab = pa.Table.from_batches([batch])
        field = pa.field(name, arr.type)
        new_tab = tab.append_column(field, arr)
        return new_tab.to_batches()[0]

> Do I need to write a loop batch by batch as above or is there a way to apply 
> the transformation through the scanner API like in [1]?

The write_dataset method (and most places that accept a scanner I
think) can accept an iterable of batches (although you need to supply
a schema in that case).  Using python generators you can quickly
create an iterable of batches from a scanner.  So for your example it
would look something like:

    schema = table.schema
    schema = schema.append(pa.field('mean', pa.float64()))

    def transform_scanner(scanner):
        for batch in scanner.to_batches():
            df = batch.to_pandas()
            df['mean'] = df[cols].mean(axis=1)
            new_batch = pa.RecordBatch.from_pandas(df)
            yield new_batch

    ds.write_dataset(
        transform_scanner(scanner), out_arrow_path,
        schema=schema,
        format='parquet',
        partitioning=['partition'],
        partitioning_flavor='hive',
        existing_data_behavior='delete_matching',
        basename_template=f'{uuid4()}-{{i}}.parquet'
    )

It is probably a good idea to do this because, by calling
write_dataset once, the dataset writer will be able to group small
batches (which are often created as a result of partitioning) into
larger row groups before it writes them to disk.

One more thing I would suggest is to select the columns you want
before you convert to pandas.  Not every data type is zero-copy into
pandas.  So if you have a string column for example you will be paying
the cost of converting that to pandas for no reason.  That would look
like...

    def transform_scanner(scanner):
        for batch in scanner.to_batches():
            # 'select' is another method that hasn't been added to
            # pa.RecordBatch yet so we'll wrap in a table again
            table = pa.Table.from_batches([batch])
            table = table.select(cols)
            df = table.to_pandas()
            df['mean'] = df.mean(axis=1)
            # Only convert the new array back to arrow
            mean_arr = pa.Array.from_pandas(df['mean'])
            new_batch = add_column_to_batch(batch, mean_arr, 'mean')
            yield new_batch

> In the latest arrow code base might have support for 'projection'

This was very recently added (thanks Joris!) in [1].  I'm not sure if
that is part of the current 7.0.0 RC or not.  This won't work, as you
pointed out, because there is no compute function for row-wise mean.
But if we had one it would look something like:

    add_val = pc.add(ds.field('a'), ds.field('b'))
    scanner = dataset.scanner(columns = {
        'a': ds.field('a'),
        'b': ds.field('b'),
        'c': add_val
    })

# In the future

There is some ongoing work that is related to what you are doing here.

## Tensors

When you have a lot of columns with the same data type and you want to
operate on them rowwise it brings tensors (matrices) to mind.  I'm not
sure if this would be applicable to your use case or not.  For your
example this would mean treating the columns b, c as a 1x2 tensor so
you have a single column "bc" of tensors.

That being said, while there is some support for encoding and decoding
tensors into arrow, there are not yet any compute functions for
tensors.  So until that happens I don't know that you would be much
better off.

## Python UDFs

At some point I expect we will have support for using python UDFs
which would basically be a slightly altered version of what we did
above with the transform_batches method where we are still letting
pandas do the heavy lifting.  At the end of the day this wouldn't be a
lot different than what you are doing today (in terms of performance)
but we wouldn't have to rely on write_dataset accepting an iterable of
batches (because your scanner output would already be what you want).

[1] https://issues.apache.org/jira/browse/ARROW-12060

On Mon, Jan 24, 2022 at 6:19 AM Niranda Perera <niranda.per...@gmail.com> wrote:
>
> Hi Antonio,
> Sorry I think I misunderstood your question. You are looking for a row-wise 
> mean, isn't it! I don't think there's an API for that pyarrow.compute. Sorry 
> my bad.
> You could call `add` for each column and manually create the mean (this would 
> be a vectorized operation column-wise. But this would create 2 additional 
> length-sized memory allocations at least AFAIU, because arrow doesn't have 
> mutable methods).
> I wasn't aware that pyarrow API didnt have an add_column method (sorry 
> again!). It's available in C++ API. But for that also, you could simply 
> create a list with the existing columns.
> Following would be my suggestion (not tested). But I agree, this is not as 
> pretty as the pandas solution! :-)
> ```
> def calc_mean(batch, cols):
>    res = batch[cols[0]]
>
>   if len(cols) == 1:
>   return res
>
>    for c in cols[1:]:
>      res = pa.compute.add(sum, batch[c])
>
>   return pa.compute.divide(res, len(cols))
>
> ...
>
> for batch in scanner.to_batches():
>     new_cols = batch.columns
>     new_cols.append(calc_mean(batch, cols))
>
>     new_batch = pa.record_batch(data=new_cols,
>        schema=batch.schema.append(pa.field('mean', pa.float64())))
>     ...
> ```
>
>
>
> On Mon, Jan 24, 2022 at 9:11 AM Antonino Ingargiola <trite...@gmail.com> 
> wrote:
>>
>> Hi Niranda,
>>
>> On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera <niranda.per...@gmail.com> 
>> wrote:
>>>
>>> Did you try using `pyarrow.compute` options? Inside that batch iterator 
>>> loop you can call the compute mean function and then call the add_column 
>>> method for record batches.
>>
>>
>> I cannot find how to pass multiple columns to be aggregated to 
>> pyarrow.compute functions. As far as I understand pyarrow.compute functions 
>> only accept a single 1D pyarrow.array as input. Maybe you had something else 
>> in mind.
>>
>> Besides, I don't see any add_column or append_column method for 
>> pyarrow.RecordBatch[1]
>>
>> [1] https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html
>>
>> The only solution I see is calling the compute function for each row of the 
>> RecordBatch (transforming each row to a pyarrow.array somehow). But this 
>> would be quite inefficient. On the contrary, pandas can compute the 
>> aggregation across columns in a vectorized way (at the additional cost of 
>> pyarrow <-> pandas roundtrip conversion).
>>
>>> In the latest arrow code base might have support for 'projection', that 
>>> could do this without having to iterate through record batches. @Weston 
>>> Pace WDYT?
>>
>>
>> If this is possible it would be great!
>>
>> Best,
>> Antonio
>
>
>
> --
> Niranda Perera
> https://niranda.dev/
> @n1r44
>

Reply via email to