This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 951663a41c ARROW-13763: [Python] Close files in ParquetFile &
ParquetDatasetPiece (#13821)
951663a41c is described below
commit 951663a41c183c8fec5a4da9a8f9daf45ed85451
Author: Miles Granger <[email protected]>
AuthorDate: Wed Aug 17 15:39:57 2022 +0200
ARROW-13763: [Python] Close files in ParquetFile & ParquetDatasetPiece
(#13821)
Will fix [ARROW-13763](https://issues.apache.org/jira/browse/ARROW-13763)
A separate Jira issue will be made to address closing files in V2
ParquetDataset, which needs to be handled in the C++ layer.
Adds context manager to `pq.ParquetFile` to close input file, and ensure
reads within `pq.ParquetDataset` and `pq.read_table` are closed.
```python
# user opened file-like object will not be closed
with open('file.parquet', 'rb') as f:
with pq.ParquetFile(f) as p:
table = p.read()
assert not f.closed # did not inadvertently close the open file
assert not p.closed
assert not f.closed # parquet context exit didn't close it
assert not p.closed # references the input file status
assert f.closed # normal context exit close
assert p.closed # ...
# path-like will be closed upon exit or `ParquetFile.close`
with pq.ParquetFile('file.parquet') as p:
table = p.read()
assert not p.closed
assert p.closed
```
Authored-by: Miles Granger <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
python/pyarrow/_parquet.pyx | 19 +++++++--
python/pyarrow/parquet/core.py | 25 +++++++++--
python/pyarrow/tests/parquet/test_parquet_file.py | 52 +++++++++++++++++++++++
3 files changed, 90 insertions(+), 6 deletions(-)
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 66ed7db997..c17e855aa5 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1159,6 +1159,7 @@ cdef class ParquetReader(_Weakrefable):
CMemoryPool* pool
unique_ptr[FileReader] reader
FileMetaData _metadata
+ shared_ptr[CRandomAccessFile] rd_handle
cdef public:
_column_idx_map
@@ -1175,7 +1176,6 @@ cdef class ParquetReader(_Weakrefable):
thrift_string_size_limit=None,
thrift_container_size_limit=None):
cdef:
- shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
@@ -1221,10 +1221,10 @@ cdef class ParquetReader(_Weakrefable):
string_to_timeunit(coerce_int96_timestamp_unit))
self.source = source
+ get_reader(source, use_memory_map, &self.rd_handle)
- get_reader(source, use_memory_map, &rd_handle)
with nogil:
- check_status(builder.Open(rd_handle, properties, c_metadata))
+ check_status(builder.Open(self.rd_handle, properties, c_metadata))
# Set up metadata
with nogil:
@@ -1435,6 +1435,19 @@ cdef class ParquetReader(_Weakrefable):
.ReadColumn(column_index, &out))
return pyarrow_wrap_chunked_array(out)
+ def close(self):
+ if not self.closed:
+ with nogil:
+ check_status(self.rd_handle.get().Close())
+
+ @property
+ def closed(self):
+ if self.rd_handle == NULL:
+ return True
+ with nogil:
+ closed = self.rd_handle.get().closed()
+ return closed
+
cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index d3a1451dcb..fd51829f50 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -294,9 +294,16 @@ class ParquetFile:
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
)
+ self._close_source = getattr(source, 'closed', True)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
def _build_nested_paths(self):
paths = self.reader.column_paths
@@ -376,6 +383,14 @@ class ParquetFile:
"""
return self.reader.num_row_groups
+ def close(self, force: bool = False):
+ if self._close_source or force:
+ self.reader.close()
+
+ @property
+ def closed(self) -> bool:
+ return self.reader.closed
+
def read_row_group(self, i, columns=None, use_threads=True,
use_pandas_metadata=False):
"""
@@ -1129,8 +1144,8 @@ class ParquetDatasetPiece:
-------
metadata : FileMetaData
"""
- f = self.open()
- return f.metadata
+ with self.open() as parquet:
+ return parquet.metadata
def open(self):
"""
@@ -1204,6 +1219,9 @@ class ParquetDatasetPiece:
arr = pa.DictionaryArray.from_arrays(indices, dictionary)
table = table.append_column(name, arr)
+ # To ParquetFile the source looked like it was already open, so won't
+ # actually close it without overriding.
+ reader.close(force=True)
return table
@@ -1890,7 +1908,8 @@ Examples
"""
tables = []
for piece in self._pieces:
- table = piece.read(columns=columns, use_threads=use_threads,
+ table = piece.read(columns=columns,
+ use_threads=use_threads,
partitions=self._partitions,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)
diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py
b/python/pyarrow/tests/parquet/test_parquet_file.py
index 9b528b1859..4fb9335961 100644
--- a/python/pyarrow/tests/parquet/test_parquet_file.py
+++ b/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -17,6 +17,7 @@
import io
import os
+from unittest import mock
import pytest
@@ -277,3 +278,54 @@ def test_pre_buffer(pre_buffer):
buf.seek(0)
pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
assert pf.read().num_rows == N
+
+
+def test_parquet_file_explicitly_closed(tempdir):
+ """
+ Unopened files should be closed explicitly after use,
+ and previously opened files should be left open.
+ Applies to read_table, ParquetDataset, and ParquetFile
+ """
+ # create test parquet file
+ fn = tempdir.joinpath('file.parquet')
+ table = pa.table({'col1': [0, 1], 'col2': [0, 1]})
+ pq.write_table(table, fn)
+
+ # read_table (legacy) with opened file (will leave open)
+ with open(fn, 'rb') as f:
+ pq.read_table(f, use_legacy_dataset=True)
+ assert not f.closed # Didn't close it internally after read_table
+
+ # read_table (legacy) with unopened file (will close)
+ with mock.patch.object(pq.ParquetFile, "close") as mock_close:
+ pq.read_table(fn, use_legacy_dataset=True)
+ mock_close.assert_called()
+
+ # ParquetDataset test (legacy) with unopened file (will close)
+ with mock.patch.object(pq.ParquetFile, "close") as mock_close:
+ pq.ParquetDataset(fn, use_legacy_dataset=True).read()
+ mock_close.assert_called()
+
+ # ParquetDataset test (legacy) with opened file (will leave open)
+ with open(fn, 'rb') as f:
+ # ARROW-8075: support ParquetDataset from file-like, not just path-like
+ with pytest.raises(TypeError, match='not a path-like object'):
+ pq.ParquetDataset(f, use_legacy_dataset=True).read()
+ assert not f.closed
+
+ # ParquetFile with opened file (will leave open)
+ with open(fn, 'rb') as f:
+ with pq.ParquetFile(f) as p:
+ p.read()
+ assert not f.closed
+ assert not p.closed
+ assert not f.closed # opened input file was not closed
+ assert not p.closed # parquet file obj reports as not closed
+ assert f.closed
+ assert p.closed # parquet file being closed reflects underlying file
+
+ # ParquetFile with unopened file (will close)
+ with pq.ParquetFile(fn) as p:
+ p.read()
+ assert not p.closed
+ assert p.closed # parquet file obj reports as closed