Jonathan Kulzick created ARROW-2891:
---------------------------------------

             Summary: Preserve schema in write_to_dataset
                 Key: ARROW-2891
                 URL: https://issues.apache.org/jira/browse/ARROW-2891
             Project: Apache Arrow
          Issue Type: Bug
          Components: Python
    Affects Versions: 0.9.0
            Reporter: Jonathan Kulzick


When using `pyarrow.parquet.write_to_dataset` with `partition_cols` set, the 
schema of the `table` passed into the function is not enforced when iterating 
over the `subgroup` to create the `subtable`. See 
[here](https://github.com/apache/arrow/blob/master/python/pyarrow/parquet.py#L1146).

Since pandas is used to generate the subtables, there is a risk that some 
specificity is lost from the original `table.schema` due to the data types 
supported by pandas and some of the internal type conversions pandas performs. 
It would be ideal if a `subschema` was generated from `table.schema` and passed 
to `Table` when instantiating the `subtable` to allow the user to enforce the 
original schema.

Here is a simple example of where we are running into issues while trying to 
preserve a valid schema. This use case is more likely to occur when working 
with sparse data sets.

```
>>> from io import StringIO
>>> import pandas as pd
>>> import numpy as np
>>> import pyarrow as pa
>>> import parquet as pq
>>> import pyarrow.parquet as pq

# in csv col2 has no NaNs and in csv_nan col2 only has NaNs
>>> csv = StringIO('"1","10","100"')
>>> csv_nan = StringIO('"2","","200"')

# read in col2 as a float since pandas does not support NaNs in ints
>>> pd_dtype = \{'col1': np.int32, 'col2': np.float32, 'col3': np.int32}
>>> df = pd.read_csv(csv, header=None, names=['col1', 'col2', 'col3'], 
>>> dtype=pd_dtype)
>>> df_nan = pd.read_csv(csv_nan, header=None, names=['col1', 'col2', 'col3'], 
>>> dtype=pd_dtype)

# verify both dfs and their dtypes
>>> df
 col1 col2 col3
0 1 10.0 100

>>> df.dtypes
col1 int32
col2 float32
col3 int32
dtype: object

>>> df_nan
 col1 col2 col3
0 2 NaN 200

>>> df_nan.dtypes
col1 int32
col2 float32
col3 int32
dtype: object

# define col2 as an int32 since pyarrow does support NaNs in ints
# we want to preserve the original schema we started with and not
# upcast just because we're using pandas to go from csv to pyarrow
>>> schema = pa.schema([pa.field('col1', type=pa.int32()),
 pa.field('col2', type=pa.int32()),
 pa.field('col3', type=pa.int32())])

# verify schema
>>> schema
col1: int32
col2: int32
col3: int32

# create tables
>>> table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
>>> table_nan = pa.Table.from_pandas(df_nan, schema=schema, 
>>> preserve_index=False)

# verify table schemas and metadata
# col2 has pandas_type int32 and numpy_type float32 in both tables
>>> table
pyarrow.Table
col1: int32
col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
 b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
 b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
 b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
 b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
 b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
 b'rsion": "0.22.0"}'}

>>> table_nan
pyarrow.Table
col1: int32
col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
 b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
 b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
 b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
 b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
 b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
 b'rsion": "0.22.0"}'}

# write both tables to local filesystem
>>> pq.write_to_dataset(table, '/Users/jkulzick/pyarrow_example',
 partition_cols=['col1'],
 preserve_index=False)
>>> pq.write_to_dataset(table_nan, '/Users/jkulzick/pyarrow_example',
 partition_cols=['col1'],
 preserve_index=False)

# read parquet files into a ParquetDataset to validate the schemas
# the metadata and schemas for both files is different from their original 
tables
# table now has pandas_type int32 and numpy_type int32 (was float32) for col2
# table_nan now has pandas_type float64 (was int32) and numpy_type int64 (was 
float32) for col2
>>> ds = pq.ParquetDataset('/Users/jkulzick/pyarrow_example')
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File 
"/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py",
 line 745, in __init__
 self.validate_schemas()
 File 
"/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py",
 line 775, in validate_schemas
 dataset_schema))
ValueError: Schema in partition[col1=1] 
/Users/jkulzick/pyarrow_example/col1=2/b7b42ce9de6a46a786a5361c42d28731.parquet 
was different. 
col2: double
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
 b' "col2", "field_name": "col2", "pandas_type": "float64", "numpy_'
 b'type": "float64", "metadata": null}, {"name": "col3", "field_nam'
 b'e": "col3", "pandas_type": "int32", "numpy_type": "int32", "meta'
 b'data": null}], "pandas_version": "0.22.0"}'}

vs

col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
 b' "col2", "field_name": "col2", "pandas_type": "int32", "numpy_ty'
 b'pe": "int32", "metadata": null}, {"name": "col3", "field_name": '
 b'"col3", "pandas_type": "int32", "numpy_type": "int32", "metadata'
 b'": null}], "pandas_version": "0.22.0"}'}
```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to