[ 
https://issues.apache.org/jira/browse/ARROW-571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215131#comment-16215131
 ] 

ASF GitHub Bot commented on ARROW-571:
--------------------------------------

wesm closed pull request #1218: ARROW-571: [Python] Add unit test for 
incremental Parquet file building, improve docs
URL: https://github.com/apache/arrow/pull/1218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 26ccb98ed..6bceba3c6 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -312,6 +312,7 @@ Apache Parquet
 
    ParquetDataset
    ParquetFile
+   ParquetWriter
    read_table
    read_metadata
    read_pandas
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 1584b849a..b6a7b1244 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -202,17 +202,47 @@ def _sanitize_table(table, new_schema, flavor):
         return table
 
 
+_parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
+    The Parquet format version, defaults to 1.0
+use_dictionary : bool or list
+    Specify if we should use dictionary encoding in general or only for
+    some columns.
+use_deprecated_int96_timestamps : boolean, default None
+    Write nanosecond resolution timestamps to INT96 Parquet
+    format. Defaults to False unless enabled by flavor argument
+coerce_timestamps : string, default None
+    Cast timestamps a particular resolution.
+    Valid values: {None, 'ms', 'us'}
+compression : str or dict
+    Specify the compression codec, either on a general basis or per-column.
+flavor : {'spark'}, default None
+    Sanitize schema or set other compatibility options for compatibility"""
+
+
 class ParquetWriter(object):
-    """
 
-    Parameters
-    ----------
-    where
-    schema
-    flavor : {'spark', ...}
-        Set options for compatibility with a particular reader
-    """
-    def __init__(self, where, schema, flavor=None, **options):
+    __doc__ = """
+Class for incrementally building a Parquet file for Arrow tables
+
+Parameters
+----------
+where : path or file-like object
+schema : arrow Schema
+{0}
+""".format(_parquet_writer_arg_docs)
+
+    def __init__(self, where, schema, flavor=None,
+                 version='1.0',
+                 use_dictionary=True,
+                 compression='snappy',
+                 use_deprecated_int96_timestamps=None, **options):
+        if use_deprecated_int96_timestamps is None:
+            # Use int96 timestamps for Spark
+            if flavor is not None and 'spark' in flavor:
+                use_deprecated_int96_timestamps = True
+            else:
+                use_deprecated_int96_timestamps = False
+
         self.flavor = flavor
         if flavor is not None:
             schema, self.schema_changed = _sanitize_schema(schema, flavor)
@@ -220,15 +250,29 @@ def __init__(self, where, schema, flavor=None, **options):
             self.schema_changed = False
 
         self.schema = schema
-        self.writer = _parquet.ParquetWriter(where, schema, **options)
+        self.writer = _parquet.ParquetWriter(
+            where, schema,
+            version=version,
+            compression=compression,
+            use_dictionary=use_dictionary,
+            use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+            **options)
+        self.is_open = True
+
+    def __del__(self):
+        if self.is_open:
+            self.close()
 
     def write_table(self, table, row_group_size=None):
         if self.schema_changed:
             table = _sanitize_table(table, self.schema, self.flavor)
+        assert self.is_open
         self.writer.write_table(table, row_group_size=row_group_size)
 
     def close(self):
-        self.writer.close()
+        if self.is_open:
+            self.writer.close()
+            self.is_open = False
 
 
 def _get_pandas_index_columns(keyvalues):
@@ -857,52 +901,19 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
                 use_deprecated_int96_timestamps=None,
                 coerce_timestamps=None,
                 flavor=None, **kwargs):
-    """
-    Write a Table to Parquet format
-
-    Parameters
-    ----------
-    table : pyarrow.Table
-    where: string or pyarrow.io.NativeFile
-    row_group_size : int, default None
-        The maximum number of rows in each Parquet RowGroup. As a default,
-        we will write a single RowGroup per file.
-    version : {"1.0", "2.0"}, default "1.0"
-        The Parquet format version, defaults to 1.0
-    use_dictionary : bool or list
-        Specify if we should use dictionary encoding in general or only for
-        some columns.
-    use_deprecated_int96_timestamps : boolean, default None
-        Write nanosecond resolution timestamps to INT96 Parquet
-        format. Defaults to False unless enabled by flavor argument
-    coerce_timestamps : string, default None
-        Cast timestamps a particular resolution.
-        Valid values: {None, 'ms', 'us'}
-    compression : str or dict
-        Specify the compression codec, either on a general basis or per-column.
-    flavor : {'spark'}, default None
-        Sanitize schema or set other compatibility options for compatibility
-    """
-    row_group_size = kwargs.get('chunk_size', row_group_size)
-
-    if use_deprecated_int96_timestamps is None:
-        # Use int96 timestamps for Spark
-        if flavor is not None and 'spark' in flavor:
-            use_deprecated_int96_timestamps = True
-        else:
-            use_deprecated_int96_timestamps = False
-
-    options = dict(
-        use_dictionary=use_dictionary,
-        compression=compression,
-        version=version,
-        use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
-        coerce_timestamps=coerce_timestamps)
+    row_group_size = kwargs.pop('chunk_size', row_group_size)
 
     writer = None
     try:
-        writer = ParquetWriter(where, table.schema, flavor=flavor,
-                               **options)
+        writer = ParquetWriter(
+            where, table.schema,
+            version=version,
+            flavor=flavor,
+            use_dictionary=use_dictionary,
+            coerce_timestamps=coerce_timestamps,
+            compression=compression,
+            use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+            **kwargs)
         writer.write_table(table, row_group_size=row_group_size)
     except:
         if writer is not None:
@@ -917,6 +928,17 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
         writer.close()
 
 
+write_table.__doc__ = """
+Write a Table to Parquet format
+
+Parameters
+----------
+table : pyarrow.Table
+where: string or pyarrow.io.NativeFile
+{0}
+""".format(_parquet_writer_arg_docs)
+
+
 def write_to_dataset(table, root_path, partition_cols=None,
                      filesystem=None, preserve_index=True, **kwargs):
     """
@@ -1013,12 +1035,10 @@ def write_metadata(schema, where, version='1.0',
         Cast timestamps a particular resolution.
         Valid values: {None, 'ms', 'us'}
     """
-    options = dict(
-        version=version,
+    writer = ParquetWriter(
+        where, schema, version=version,
         use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
-        coerce_timestamps=coerce_timestamps
-    )
-    writer = ParquetWriter(where, schema, **options)
+        coerce_timestamps=coerce_timestamps)
     writer.close()
 
 
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index deb4b3f35..09184cc05 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -301,6 +301,35 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
 
 
 @parquet
+def test_parquet_incremental_file_build(tmpdir):
+    import pyarrow.parquet as pq
+
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
+
+    frames = []
+    for i in range(10):
+        df['unique_id'] = i
+        arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+        writer.write_table(arrow_table)
+
+        frames.append(df.copy())
+
+    writer.close()
+
+    buf = out.get_result()
+    result = _read_table(pa.BufferReader(buf))
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@parquet
 def test_read_pandas_column_subset(tmpdir):
     import pyarrow.parquet as pq
 
diff --git a/python/pyarrow/tests/test_serialization.py 
b/python/pyarrow/tests/test_serialization.py
index 9321ebc34..67798ac31 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -266,30 +266,36 @@ def test_default_dict_serialization(large_memory_map):
 
 def test_numpy_serialization(large_memory_map):
     with pa.memory_map(large_memory_map, mode="r+") as mmap:
-        for t in ["bool", "int8", "uint8", "int16", "uint16", "int32", 
"uint32",
-                  "float16", "float32", "float64"]:
+        for t in ["bool", "int8", "uint8", "int16", "uint16", "int32",
+                  "uint32", "float16", "float32", "float64"]:
             obj = np.random.randint(0, 10, size=(100, 100)).astype(t)
             serialization_roundtrip(obj, mmap)
 
 
 def test_datetime_serialization(large_memory_map):
-    data = [# Principia Mathematica published
-            datetime.datetime(year=1687, month=7, day=5),
-            # Some random date
-            datetime.datetime(year=1911, month=6, day=3, hour=4,
-                              minute=55, second=44),
-            # End of WWI
-            datetime.datetime(year=1918, month=11, day=11),
-            # Beginning of UNIX time
-            datetime.datetime(year=1970, month=1, day=1),
-            # The Berlin wall falls
-            datetime.datetime(year=1989, month=11, day=9),
-            # Another random date
-            datetime.datetime(year=2011, month=6, day=3, hour=4,
-                              minute=0, second=3),
-            # Another random date
-            datetime.datetime(year=1970, month=1, day=3, hour=4,
-                              minute=0, second=0)]
+    data = [
+        #  Principia Mathematica published
+        datetime.datetime(year=1687, month=7, day=5),
+
+        # Some random date
+        datetime.datetime(year=1911, month=6, day=3, hour=4,
+                          minute=55, second=44),
+        # End of WWI
+        datetime.datetime(year=1918, month=11, day=11),
+
+        # Beginning of UNIX time
+        datetime.datetime(year=1970, month=1, day=1),
+
+        # The Berlin wall falls
+        datetime.datetime(year=1989, month=11, day=9),
+
+        # Another random date
+        datetime.datetime(year=2011, month=6, day=3, hour=4,
+                          minute=0, second=3),
+        # Another random date
+        datetime.datetime(year=1970, month=1, day=3, hour=4,
+                          minute=0, second=0)
+    ]
     with pa.memory_map(large_memory_map, mode="r+") as mmap:
         for d in data:
             serialization_roundtrip(d, mmap)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Python] Add APIs to build Parquet files incrementally from Arrow tables
> ------------------------------------------------------------------------
>
>                 Key: ARROW-571
>                 URL: https://issues.apache.org/jira/browse/ARROW-571
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: Python
>            Reporter: Wes McKinney
>            Assignee: Wes McKinney
>              Labels: pull-request-available
>             Fix For: 0.8.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to