Hi Antonio,

Did you try using `pyarrow.compute` options? Inside that batch iterator
loop you can call the compute mean function and then call the add_column
method for record batches.
In the latest arrow code base might have support for 'projection', that
could do this without having to iterate through record batches. @Weston Pace
<[email protected]> WDYT?


On Mon, Jan 24, 2022 at 7:39 AM Antonino Ingargiola <[email protected]>
wrote:

> Hi list,
>
> I am looking for a way to add a new column to an existing table that is
> computed as the sum/mean of other columns.  From the docs, I understand
> that pyarrow compute functions operate on arrays (i.e. columns) but I
> cannot find if it is possible to aggregate through columns in some way.
>
> In addition, I use a scanner API to load and re-save a parquet dataset for
> consolidating the small files in each partition, following the example from
> the docs[1]. But, now I would like to extend the consolidation step adding
> new columns before saving the new dataset.
>
> Here is an example of what I want to achieve. In this case I am using
> pandas and scanner.to_batches():
>
> import pyarrow.dataset as ds
> import pyarrow as pa
> from glob import glob
> from uuid import uuid4
>
> # Create the input dataset
> data_dict = {'partition': [1, 1, 2, 2],
> 'a': [1, 2, 3, 4],
> 'b': [2, 4, 6, 8]}
> table = pa.Table.from_pydict(data_dict)
>
> in_arrow_path = 'example_input'
> out_arrow_path = 'example_output'
>
> ds.write_dataset(table, in_arrow_path, format='parquet',
> partitioning=['partition'],
> partitioning_flavor='hive',
> existing_data_behavior='delete_matching',
> basename_template=f'{uuid4()}-{{i}}')
>
> print('\n'.join(glob(f'{in_arrow_path}/**/*')))
> dataset = ds.dataset(in_arrow_path, partitioning='hive')
> print(dataset.to_table().to_pandas())
>
> # Re-save the input dataset adding a new column ("consolidation")
> scanner = dataset.scanner()
> cols = ['a', 'b']
>
> for batch in scanner.to_batches():
> df = batch.to_pandas()
> df['mean'] = df[cols].mean(axis=1)
> new_batch = pa.RecordBatch.from_pandas(df)
> ds.write_dataset(
> new_batch, out_arrow_path,
> format='parquet',
> partitioning=['partition'],
> partitioning_flavor='hive',
> existing_data_behavior='delete_matching',
> basename_template=f'{uuid4()}-{{i}}.parquet'
> )
>
> print('\n'.join(glob(f'{out_arrow_path}/**/*')))
> new_dataset = ds.dataset(out_arrow_path, partitioning='hive')
> print(new_dataset.to_table().to_pandas())
>
> So the questions are:
>
> 1. Can I add the new column (mean) directly in pyarrow?
> 2. Do I need to write a loop batch by batch as above or is there a way to
> apply the transformation through the scanner API like in [1]?
>
> Thanks for any advice.
>
> Antonio
>
> [1]
> https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data
>
>

-- 
Niranda Perera
https://niranda.dev/
@n1r44 <https://twitter.com/N1R44>

Reply via email to