You may be able to meet this use case using the tabular datasets[1]
feature of pyarrow.  A few thoughts:

1. The easiest way to get an "append" workflow with
pyarrow.dataset.write_dataset is to use a unique basename_template for
each write_dataset operation.   A uuid is helpful here.
2. As you mentioned, if your writes generate a bunch of small files,
you will want to periodically compact your partitions.
3. Reads should not happen at the same time as writes or else you risk
reading partial / incomplete files.

Example:

import pyarrow as pa
import pyarrow.dataset as ds

import tempfile
from glob import glob
from uuid import uuid4

tab1 = pa.Table.from_pydict({'partition': [1, 1, 2, 2], 'value': [1, 2, 3, 4]})
tab2 = pa.Table.from_pydict({'partition': [1, 1, 2, 2], 'value': [5, 6, 7, 8]})

with tempfile.TemporaryDirectory() as dataset_dir:
    ds.write_dataset(tab1, dataset_dir, format='parquet',
                     partitioning=['partition'],
                     partitioning_flavor='hive',
                     existing_data_behavior='overwrite_or_ignore',
                     basename_template=f'{uuid4()}-{{i}}')
    ds.write_dataset(tab2, dataset_dir, format='parquet',
                     partitioning=['partition'],
                     partitioning_flavor='hive',
                     existing_data_behavior='overwrite_or_ignore',
                     basename_template=f'{uuid4()}-{{i}}')

    print('\n'.join(glob(f'{dataset_dir}/**/*')))

    dataset = ds.dataset(dataset_dir)

    print(dataset.to_table().to_pandas())

[1] https://arrow.apache.org/docs/python/dataset.html

On Tue, Dec 14, 2021 at 12:17 AM Antonino Ingargiola <[email protected]> wrote:
>
> Hi arrow community,
>
> I just subscribed to this mailing list. First, let me thank all the 
> contributors for this great project!
>
> I have a question on which pyarrow API to use on a specific use-case. I need 
> to update/append data to a large partitioned parquet dataset using pyarrow. I 
> receive the data in small batches that are transformed in small pandas 
> dataframes. All the new dataframes have the same schema. The data can be 
> saved locally or on a cloud object store (s3).
>
> When I receive a new batch, I need to update the parquet dataset with the new 
> rows in the pandas dataframe. Essentially, I need to save additional 
> xyz.parquet files in the appropriate partition subfolders, without removing 
> or overwriting pre-existing .parquet files in the same partition folder.
>
> My goal is ending up with a dataset like this:
>
> parquet_dataset/
>     partition=1/
>         a.parquet
>         b.parquet
>         c.parquet
>     partition=2/
>         a.parquet
>         b.parquet
>
> where each individual parquet file contains a single batch of data (actually 
> a single batch may be splitted in 2 or more partitions).
>
> Is there a preferred API to achieve this continuous update in pyarrow?
>
> I can implement all this logic manually, but, ideally, I would like to defer 
> to pyarrow the task of splitting the input dataframe in partitions and saving 
> each chunk in the appropriate subfolder, generating a filename that will not 
> conflict with existing files. Is this possible with the current pyarrow?
>
> PS: I understand that this fragmentation is not ideal for reading/quering, 
> but it allows to handle the update process quickly. And anyway, I 
> periodically save a consolidated copy of the dataset with one file per 
> partition to improve the read performance.
>
> Thanks in advance,
> Antonio
>
>
>

Reply via email to