> You are looking for a row-wise mean, isn't it! I don't think there's an API > for that pyarrow.compute.
Right, I don't think this is in there today either. The C++ compute infrastructure itself can create functions that run on record batches (instead of just arrays). An example of this is drop_null (which will drop each row if any value in that row is null). So it should be possible for you to create a kernel that does this but, today, you would have to write the kernel in C++. > I wasn't aware that pyarrow API didnt have an add_column method (sorry > again!). Until this is fixed you can work around this by wrapping the batch with a table. It's a metadata-only (zero-copy) operation so shouldn't take much time: def add_column_to_batch(batch, arr, name): tab = pa.Table.from_batches([batch]) field = pa.field(name, arr.type) new_tab = tab.append_column(field, arr) return new_tab.to_batches()[0] > Do I need to write a loop batch by batch as above or is there a way to apply > the transformation through the scanner API like in [1]? The write_dataset method (and most places that accept a scanner I think) can accept an iterable of batches (although you need to supply a schema in that case). Using python generators you can quickly create an iterable of batches from a scanner. So for your example it would look something like: schema = table.schema schema = schema.append(pa.field('mean', pa.float64())) def transform_scanner(scanner): for batch in scanner.to_batches(): df = batch.to_pandas() df['mean'] = df[cols].mean(axis=1) new_batch = pa.RecordBatch.from_pandas(df) yield new_batch ds.write_dataset( transform_scanner(scanner), out_arrow_path, schema=schema, format='parquet', partitioning=['partition'], partitioning_flavor='hive', existing_data_behavior='delete_matching', basename_template=f'{uuid4()}-{{i}}.parquet' ) It is probably a good idea to do this because, by calling write_dataset once, the dataset writer will be able to group small batches (which are often created as a result of partitioning) into larger row groups before it writes them to disk. One more thing I would suggest is to select the columns you want before you convert to pandas. Not every data type is zero-copy into pandas. So if you have a string column for example you will be paying the cost of converting that to pandas for no reason. That would look like... def transform_scanner(scanner): for batch in scanner.to_batches(): # 'select' is another method that hasn't been added to # pa.RecordBatch yet so we'll wrap in a table again table = pa.Table.from_batches([batch]) table = table.select(cols) df = table.to_pandas() df['mean'] = df.mean(axis=1) # Only convert the new array back to arrow mean_arr = pa.Array.from_pandas(df['mean']) new_batch = add_column_to_batch(batch, mean_arr, 'mean') yield new_batch > In the latest arrow code base might have support for 'projection' This was very recently added (thanks Joris!) in [1]. I'm not sure if that is part of the current 7.0.0 RC or not. This won't work, as you pointed out, because there is no compute function for row-wise mean. But if we had one it would look something like: add_val = pc.add(ds.field('a'), ds.field('b')) scanner = dataset.scanner(columns = { 'a': ds.field('a'), 'b': ds.field('b'), 'c': add_val }) # In the future There is some ongoing work that is related to what you are doing here. ## Tensors When you have a lot of columns with the same data type and you want to operate on them rowwise it brings tensors (matrices) to mind. I'm not sure if this would be applicable to your use case or not. For your example this would mean treating the columns b, c as a 1x2 tensor so you have a single column "bc" of tensors. That being said, while there is some support for encoding and decoding tensors into arrow, there are not yet any compute functions for tensors. So until that happens I don't know that you would be much better off. ## Python UDFs At some point I expect we will have support for using python UDFs which would basically be a slightly altered version of what we did above with the transform_batches method where we are still letting pandas do the heavy lifting. At the end of the day this wouldn't be a lot different than what you are doing today (in terms of performance) but we wouldn't have to rely on write_dataset accepting an iterable of batches (because your scanner output would already be what you want). [1] https://issues.apache.org/jira/browse/ARROW-12060 On Mon, Jan 24, 2022 at 6:19 AM Niranda Perera <niranda.per...@gmail.com> wrote: > > Hi Antonio, > Sorry I think I misunderstood your question. You are looking for a row-wise > mean, isn't it! I don't think there's an API for that pyarrow.compute. Sorry > my bad. > You could call `add` for each column and manually create the mean (this would > be a vectorized operation column-wise. But this would create 2 additional > length-sized memory allocations at least AFAIU, because arrow doesn't have > mutable methods). > I wasn't aware that pyarrow API didnt have an add_column method (sorry > again!). It's available in C++ API. But for that also, you could simply > create a list with the existing columns. > Following would be my suggestion (not tested). But I agree, this is not as > pretty as the pandas solution! :-) > ``` > def calc_mean(batch, cols): > res = batch[cols[0]] > > if len(cols) == 1: > return res > > for c in cols[1:]: > res = pa.compute.add(sum, batch[c]) > > return pa.compute.divide(res, len(cols)) > > ... > > for batch in scanner.to_batches(): > new_cols = batch.columns > new_cols.append(calc_mean(batch, cols)) > > new_batch = pa.record_batch(data=new_cols, > schema=batch.schema.append(pa.field('mean', pa.float64()))) > ... > ``` > > > > On Mon, Jan 24, 2022 at 9:11 AM Antonino Ingargiola <trite...@gmail.com> > wrote: >> >> Hi Niranda, >> >> On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera <niranda.per...@gmail.com> >> wrote: >>> >>> Did you try using `pyarrow.compute` options? Inside that batch iterator >>> loop you can call the compute mean function and then call the add_column >>> method for record batches. >> >> >> I cannot find how to pass multiple columns to be aggregated to >> pyarrow.compute functions. As far as I understand pyarrow.compute functions >> only accept a single 1D pyarrow.array as input. Maybe you had something else >> in mind. >> >> Besides, I don't see any add_column or append_column method for >> pyarrow.RecordBatch[1] >> >> [1] https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html >> >> The only solution I see is calling the compute function for each row of the >> RecordBatch (transforming each row to a pyarrow.array somehow). But this >> would be quite inefficient. On the contrary, pandas can compute the >> aggregation across columns in a vectorized way (at the additional cost of >> pyarrow <-> pandas roundtrip conversion). >> >>> In the latest arrow code base might have support for 'projection', that >>> could do this without having to iterate through record batches. @Weston >>> Pace WDYT? >> >> >> If this is possible it would be great! >> >> Best, >> Antonio > > > > -- > Niranda Perera > https://niranda.dev/ > @n1r44 >