This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 951663a41c ARROW-13763: [Python] Close files in ParquetFile & 
ParquetDatasetPiece (#13821)
951663a41c is described below

commit 951663a41c183c8fec5a4da9a8f9daf45ed85451
Author: Miles Granger <[email protected]>
AuthorDate: Wed Aug 17 15:39:57 2022 +0200

    ARROW-13763: [Python] Close files in ParquetFile & ParquetDatasetPiece 
(#13821)
    
    Will fix [ARROW-13763](https://issues.apache.org/jira/browse/ARROW-13763)
    
    A separate Jira issue will be made to address closing files in V2 
ParquetDataset, which needs to be handled in the C++ layer.
    
    Adds context manager to `pq.ParquetFile` to close input file, and ensure 
reads within `pq.ParquetDataset` and `pq.read_table` are closed.
    
    ```python
    
    # user opened file-like object will not be closed
    with open('file.parquet', 'rb') as f:
        with pq.ParquetFile(f) as p:
            table = p.read()
            assert not f.closed  # did not inadvertently close the open file
            assert not p.closed
        assert not f.closed      # parquet context exit didn't close it
        assert not p.closed      # references the input file status
    assert f.closed              # normal context exit close
    assert p.closed              # ...
    
    # path-like will be closed upon exit or `ParquetFile.close`
    with pq.ParquetFile('file.parquet') as p:
        table = p.read()
        assert not p.closed
    assert p.closed
    ```
    
    Authored-by: Miles Granger <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 python/pyarrow/_parquet.pyx                       | 19 +++++++--
 python/pyarrow/parquet/core.py                    | 25 +++++++++--
 python/pyarrow/tests/parquet/test_parquet_file.py | 52 +++++++++++++++++++++++
 3 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 66ed7db997..c17e855aa5 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1159,6 +1159,7 @@ cdef class ParquetReader(_Weakrefable):
         CMemoryPool* pool
         unique_ptr[FileReader] reader
         FileMetaData _metadata
+        shared_ptr[CRandomAccessFile] rd_handle
 
     cdef public:
         _column_idx_map
@@ -1175,7 +1176,6 @@ cdef class ParquetReader(_Weakrefable):
              thrift_string_size_limit=None,
              thrift_container_size_limit=None):
         cdef:
-            shared_ptr[CRandomAccessFile] rd_handle
             shared_ptr[CFileMetaData] c_metadata
             CReaderProperties properties = default_reader_properties()
             ArrowReaderProperties arrow_props = (
@@ -1221,10 +1221,10 @@ cdef class ParquetReader(_Weakrefable):
                 string_to_timeunit(coerce_int96_timestamp_unit))
 
         self.source = source
+        get_reader(source, use_memory_map, &self.rd_handle)
 
-        get_reader(source, use_memory_map, &rd_handle)
         with nogil:
-            check_status(builder.Open(rd_handle, properties, c_metadata))
+            check_status(builder.Open(self.rd_handle, properties, c_metadata))
 
         # Set up metadata
         with nogil:
@@ -1435,6 +1435,19 @@ cdef class ParquetReader(_Weakrefable):
                          .ReadColumn(column_index, &out))
         return pyarrow_wrap_chunked_array(out)
 
+    def close(self):
+        if not self.closed:
+            with nogil:
+                check_status(self.rd_handle.get().Close())
+
+    @property
+    def closed(self):
+        if self.rd_handle == NULL:
+            return True
+        with nogil:
+            closed = self.rd_handle.get().closed()
+        return closed
+
 
 cdef shared_ptr[WriterProperties] _create_writer_properties(
         use_dictionary=None,
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index d3a1451dcb..fd51829f50 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -294,9 +294,16 @@ class ParquetFile:
             thrift_string_size_limit=thrift_string_size_limit,
             thrift_container_size_limit=thrift_container_size_limit,
         )
+        self._close_source = getattr(source, 'closed', True)
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
     def _build_nested_paths(self):
         paths = self.reader.column_paths
 
@@ -376,6 +383,14 @@ class ParquetFile:
         """
         return self.reader.num_row_groups
 
+    def close(self, force: bool = False):
+        if self._close_source or force:
+            self.reader.close()
+
+    @property
+    def closed(self) -> bool:
+        return self.reader.closed
+
     def read_row_group(self, i, columns=None, use_threads=True,
                        use_pandas_metadata=False):
         """
@@ -1129,8 +1144,8 @@ class ParquetDatasetPiece:
         -------
         metadata : FileMetaData
         """
-        f = self.open()
-        return f.metadata
+        with self.open() as parquet:
+            return parquet.metadata
 
     def open(self):
         """
@@ -1204,6 +1219,9 @@ class ParquetDatasetPiece:
                 arr = pa.DictionaryArray.from_arrays(indices, dictionary)
                 table = table.append_column(name, arr)
 
+        # To ParquetFile the source looked like it was already open, so won't
+        # actually close it without overriding.
+        reader.close(force=True)
         return table
 
 
@@ -1890,7 +1908,8 @@ Examples
         """
         tables = []
         for piece in self._pieces:
-            table = piece.read(columns=columns, use_threads=use_threads,
+            table = piece.read(columns=columns,
+                               use_threads=use_threads,
                                partitions=self._partitions,
                                use_pandas_metadata=use_pandas_metadata)
             tables.append(table)
diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py 
b/python/pyarrow/tests/parquet/test_parquet_file.py
index 9b528b1859..4fb9335961 100644
--- a/python/pyarrow/tests/parquet/test_parquet_file.py
+++ b/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -17,6 +17,7 @@
 
 import io
 import os
+from unittest import mock
 
 import pytest
 
@@ -277,3 +278,54 @@ def test_pre_buffer(pre_buffer):
     buf.seek(0)
     pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
     assert pf.read().num_rows == N
+
+
+def test_parquet_file_explicitly_closed(tempdir):
+    """
+    Unopened files should be closed explicitly after use,
+    and previously opened files should be left open.
+    Applies to read_table, ParquetDataset, and ParquetFile
+    """
+    # create test parquet file
+    fn = tempdir.joinpath('file.parquet')
+    table = pa.table({'col1': [0, 1], 'col2': [0, 1]})
+    pq.write_table(table, fn)
+
+    # read_table (legacy) with opened file (will leave open)
+    with open(fn, 'rb') as f:
+        pq.read_table(f, use_legacy_dataset=True)
+        assert not f.closed  # Didn't close it internally after read_table
+
+    # read_table (legacy) with unopened file (will close)
+    with mock.patch.object(pq.ParquetFile, "close") as mock_close:
+        pq.read_table(fn, use_legacy_dataset=True)
+        mock_close.assert_called()
+
+    # ParquetDataset test (legacy) with unopened file (will close)
+    with mock.patch.object(pq.ParquetFile, "close") as mock_close:
+        pq.ParquetDataset(fn, use_legacy_dataset=True).read()
+        mock_close.assert_called()
+
+    # ParquetDataset test (legacy) with opened file (will leave open)
+    with open(fn, 'rb') as f:
+        # ARROW-8075: support ParquetDataset from file-like, not just path-like
+        with pytest.raises(TypeError, match='not a path-like object'):
+            pq.ParquetDataset(f, use_legacy_dataset=True).read()
+            assert not f.closed
+
+    # ParquetFile with opened file (will leave open)
+    with open(fn, 'rb') as f:
+        with pq.ParquetFile(f) as p:
+            p.read()
+            assert not f.closed
+            assert not p.closed
+        assert not f.closed  # opened input file was not closed
+        assert not p.closed  # parquet file obj reports as not closed
+    assert f.closed
+    assert p.closed  # parquet file being closed reflects underlying file
+
+    # ParquetFile with unopened file (will close)
+    with pq.ParquetFile(fn) as p:
+        p.read()
+        assert not p.closed
+    assert p.closed  # parquet file obj reports as closed

Reply via email to