This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 16c7f1a0bb GH-14932: [Python] Add python bindings for JSON streaming
reader (#45084)
16c7f1a0bb is described below
commit 16c7f1a0bbcfad20eba7c63bc86d3da784d1db34
Author: Xuchen Pan <[email protected]>
AuthorDate: Thu Feb 6 21:47:52 2025 +0800
GH-14932: [Python] Add python bindings for JSON streaming reader (#45084)
### Rationale for this change
The C++ arrow has a JSON streaming reader which is not exposed on the
Python interface.
### What changes are included in this PR?
This PR is based on #33761. It adds the `open_json` method to open a
streaming reader for a JSON file.
### Are these changes tested?
Yes
### Are there any user-facing changes?
Yes. A new `open_json` method has been added to the Python interface,
located at `pyarrow.json.open_json`, and its parameters are the same as the
`pyarrow.json.read_json`
* GitHub Issue: #14932
Lead-authored-by: pxc <[email protected]>
Co-authored-by: Akshay Subramanian <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
docs/source/python/api/formats.rst | 1 +
docs/source/python/json.rst | 12 ++
python/pyarrow/_csv.pyx | 2 +-
python/pyarrow/_json.pyx | 78 ++++++++-
python/pyarrow/includes/libarrow.pxd | 7 +
python/pyarrow/json.py | 2 +-
python/pyarrow/tests/test_csv.py | 2 +-
python/pyarrow/tests/test_json.py | 330 +++++++++++++++++++++++++++++++----
8 files changed, 396 insertions(+), 38 deletions(-)
diff --git a/docs/source/python/api/formats.rst
b/docs/source/python/api/formats.rst
index 86e2585ac2..a4cf3bbcdd 100644
--- a/docs/source/python/api/formats.rst
+++ b/docs/source/python/api/formats.rst
@@ -66,6 +66,7 @@ JSON Files
ReadOptions
ParseOptions
+ open_json
read_json
.. _api.parquet:
diff --git a/docs/source/python/json.rst b/docs/source/python/json.rst
index eff6135d89..277b8e1349 100644
--- a/docs/source/python/json.rst
+++ b/docs/source/python/json.rst
@@ -115,3 +115,15 @@ and pass it to :func:`read_json`. For example, you can
pass an explicit
Similarly, you can choose performance settings by passing a
:class:`ReadOptions` instance to :func:`read_json`.
+
+
+Incremental reading
+-------------------
+
+For memory-constrained environments, it is also possible to read a JSON file
+one batch at a time, using :func:`open_json`.
+
+In this case, type inference is done on the first block and types are frozen
afterwards.
+To make sure the right data types are inferred, either set
+:attr:`ReadOptions.block_size` to a large enough value, or use
+:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 508488c0c3..e53c6d1847 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None,
parse_options=None,
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool : MemoryPool, optional
- Pool to allocate Table memory from
+ Pool to allocate RecordBatch memory from
Returns
-------
diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx
index d36dad67ab..c023baeec1 100644
--- a/python/pyarrow/_json.pyx
+++ b/python/pyarrow/_json.pyx
@@ -21,7 +21,9 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-from pyarrow.lib cimport (_Weakrefable, MemoryPool,
+
+from pyarrow.lib cimport (_Weakrefable, Schema,
+ RecordBatchReader, MemoryPool,
maybe_unbox_memory_pool,
get_input_stream, pyarrow_wrap_table,
pyarrow_wrap_schema, pyarrow_unwrap_schema)
@@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options,
CJSONParseOptions* out):
out[0] = parse_options.options
+cdef class JSONStreamingReader(RecordBatchReader):
+ """An object that reads record batches incrementally from a JSON file.
+
+ Should not be instantiated directly by user code.
+ """
+ cdef readonly:
+ Schema schema
+
+ def __init__(self):
+ raise TypeError(f"Do not call {self.__class__.__name__}'s "
+ "constructor directly, "
+ "use pyarrow.json.open_json() instead.")
+
+ cdef _open(self, shared_ptr[CInputStream] stream,
+ CJSONReadOptions c_read_options,
+ CJSONParseOptions c_parse_options,
+ MemoryPool memory_pool):
+ cdef:
+ shared_ptr[CSchema] c_schema
+ CIOContext io_context
+
+ io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))
+
+ with nogil:
+ self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
+ CJSONStreamingReader.Make(stream, move(c_read_options),
+ move(c_parse_options), io_context))
+ c_schema = self.reader.get().schema()
+
+ self.schema = pyarrow_wrap_schema(c_schema)
+
+
def read_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
@@ -308,3 +342,45 @@ def read_json(input_file, read_options=None,
parse_options=None,
table = GetResultValue(reader.get().Read())
return pyarrow_wrap_table(table)
+
+
+def open_json(input_file, read_options=None, parse_options=None,
+ MemoryPool memory_pool=None):
+ """
+ Open a streaming reader of JSON data.
+
+ Reading using this function is always single-threaded.
+
+ Parameters
+ ----------
+ input_file : string, path or file-like object
+ The location of JSON data. If a string or path, and if it ends
+ with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
+ the data is automatically decompressed when reading.
+ read_options : pyarrow.json.ReadOptions, optional
+ Options for the JSON reader (see pyarrow.json.ReadOptions constructor
+ for defaults)
+ parse_options : pyarrow.json.ParseOptions, optional
+ Options for the JSON parser
+ (see pyarrow.json.ParseOptions constructor for defaults)
+ memory_pool : MemoryPool, optional
+ Pool to allocate RecordBatch memory from
+
+ Returns
+ -------
+ :class:`pyarrow.json.JSONStreamingReader`
+ """
+ cdef:
+ shared_ptr[CInputStream] stream
+ CJSONReadOptions c_read_options
+ CJSONParseOptions c_parse_options
+ JSONStreamingReader reader
+
+ _get_reader(input_file, &stream)
+ _get_read_options(read_options, &c_read_options)
+ _get_parse_options(parse_options, &c_parse_options)
+
+ reader = JSONStreamingReader.__new__(JSONStreamingReader)
+ reader._open(stream, move(c_read_options), move(c_parse_options),
+ memory_pool)
+ return reader
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 8361dcbd2a..d4e34e0a84 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2176,6 +2176,13 @@ cdef extern from "arrow/json/reader.h" namespace
"arrow::json" nogil:
CResult[shared_ptr[CTable]] Read()
+ cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"(
+ CRecordBatchReader):
+ @staticmethod
+ CResult[shared_ptr[CJSONStreamingReader]] Make(
+ shared_ptr[CInputStream],
+ CJSONReadOptions, CJSONParseOptions, CIOContext)
+
cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil:
diff --git a/python/pyarrow/json.py b/python/pyarrow/json.py
index a864f5d998..24e6046135 100644
--- a/python/pyarrow/json.py
+++ b/python/pyarrow/json.py
@@ -16,4 +16,4 @@
# under the License.
-from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa
+from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json #
noqa
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 6a36b41daf..239ae55f2f 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -387,7 +387,7 @@ class BaseTestCSV(abc.ABC):
"""
:param b: bytes to be parsed
:param kwargs: arguments passed on to open the csv file
- :return: b parsed as a single RecordBatch
+ :return: b parsed as a single Table
"""
raise NotImplementedError
diff --git a/python/pyarrow/tests/test_json.py
b/python/pyarrow/tests/test_json.py
index 978c92307a..c3f9fe333b 100644
--- a/python/pyarrow/tests/test_json.py
+++ b/python/pyarrow/tests/test_json.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import abc
from collections import OrderedDict
from decimal import Decimal
import io
@@ -30,7 +31,7 @@ except ImportError:
import pytest
import pyarrow as pa
-from pyarrow.json import read_json, ReadOptions, ParseOptions
+from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions
def generate_col_names():
@@ -111,26 +112,20 @@ def test_parse_options(pickle_module):
unexpected_field_behavior="ignore")
-class BaseTestJSONRead:
-
+class BaseTestJSON(abc.ABC):
+ @abc.abstractmethod
def read_bytes(self, b, **kwargs):
- return self.read_json(pa.py_buffer(b), **kwargs)
+ """
+ :param b: bytes to be parsed
+ :param kwargs: arguments passed on to open the json file
+ :return: b parsed as a single Table
+ """
+ raise NotImplementedError
def check_names(self, table, names):
assert table.num_columns == len(names)
assert [c.name for c in table.columns] == names
- def test_file_object(self):
- data = b'{"a": 1, "b": 2}\n'
- expected_data = {'a': [1], 'b': [2]}
- bio = io.BytesIO(data)
- table = self.read_json(bio)
- assert table.to_pydict() == expected_data
- # Text files not allowed
- sio = io.StringIO(data.decode())
- with pytest.raises(TypeError):
- self.read_json(sio)
-
def test_block_sizes(self):
rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}'
read_options = ReadOptions()
@@ -229,25 +224,6 @@ class BaseTestJSONRead:
assert table.num_columns == 0
assert table.num_rows == 2
- def test_reconcile_across_blocks(self):
- # ARROW-12065: reconciling inferred types across blocks
- first_row = b'{ }\n'
- read_options = ReadOptions(block_size=len(first_row))
- for next_rows, expected_pylist in [
- (b'{"a": 0}', [None, 0]),
- (b'{"a": []}', [None, []]),
- (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
- (b'{"a": {}}', [None, {}]),
- (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
- [None, {"b": None}, {"b": {"c": 1}}]),
- ]:
- table = self.read_bytes(first_row + next_rows,
- read_options=read_options)
- expected = {"a": expected_pylist}
- assert table.to_pydict() == expected
- # Check that the issue was exercised
- assert table.column("a").num_chunks > 1
-
def test_explicit_schema_decimal(self):
rows = (b'{"a": 1}\n'
b'{"a": 1.45}\n'
@@ -339,6 +315,281 @@ class BaseTestJSONRead:
assert table.to_pydict() == expected.to_pydict()
+class BaseTestJSONRead(BaseTestJSON):
+
+ def read_bytes(self, b, **kwargs):
+ return self.read_json(pa.py_buffer(b), **kwargs)
+
+ def test_file_object(self):
+ data = b'{"a": 1, "b": 2}\n'
+ expected_data = {'a': [1], 'b': [2]}
+ bio = io.BytesIO(data)
+ table = self.read_json(bio)
+ assert table.to_pydict() == expected_data
+ # Text files not allowed
+ sio = io.StringIO(data.decode())
+ with pytest.raises(TypeError):
+ self.read_json(sio)
+
+ def test_reconcile_across_blocks(self):
+ # ARROW-12065: reconciling inferred types across blocks
+ first_row = b'{ }\n'
+ read_options = ReadOptions(block_size=len(first_row))
+ for next_rows, expected_pylist in [
+ (b'{"a": 0}', [None, 0]),
+ (b'{"a": []}', [None, []]),
+ (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
+ (b'{"a": {}}', [None, {}]),
+ (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
+ [None, {"b": None}, {"b": {"c": 1}}]),
+ ]:
+ table = self.read_bytes(first_row + next_rows,
+ read_options=read_options)
+ expected = {"a": expected_pylist}
+ assert table.to_pydict() == expected
+ # Check that the issue was exercised
+ assert table.column("a").num_chunks > 1
+
+
+class BaseTestStreamingJSONRead(BaseTestJSON):
+ def open_json(self, json, *args, **kwargs):
+ """
+ Reads the JSON file into memory using pyarrow's open_json
+ json The JSON bytes
+ args Positional arguments to be forwarded to pyarrow's open_json
+ kwargs Keyword arguments to be forwarded to pyarrow's open_json
+ """
+ read_options = kwargs.setdefault('read_options', ReadOptions())
+ read_options.use_threads = self.use_threads
+ return open_json(json, *args, **kwargs)
+
+ def open_bytes(self, b, **kwargs):
+ return self.open_json(pa.py_buffer(b), **kwargs)
+
+ def check_reader(self, reader, expected_schema, expected_data):
+ assert reader.schema == expected_schema
+ batches = list(reader)
+ assert len(batches) == len(expected_data)
+ for batch, expected_batch in zip(batches, expected_data):
+ batch.validate(full=True)
+ assert batch.schema == expected_schema
+ assert batch.to_pydict() == expected_batch
+
+ def read_bytes(self, b, **kwargs):
+ return self.open_bytes(b, **kwargs).read_all()
+
+ def test_file_object(self):
+ data = b'{"a": 1, "b": 2}\n'
+ expected_data = {'a': [1], 'b': [2]}
+ bio = io.BytesIO(data)
+ reader = self.open_json(bio)
+ expected_schema = pa.schema([('a', pa.int64()),
+ ('b', pa.int64())])
+ self.check_reader(reader, expected_schema, [expected_data])
+
+ def test_bad_first_chunk(self):
+ bad_first_chunk = b'{"i": 0 }\n{"i": 1}'
+ read_options = ReadOptions()
+ read_options.block_size = 3
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="straddling object straddles two block boundaries*"
+ ):
+ self.open_bytes(bad_first_chunk, read_options=read_options)
+
+ def test_bad_middle_chunk(self):
+ bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}'
+ read_options = ReadOptions()
+ read_options.block_size = 10
+ expected_schema = pa.schema([('i', pa.int64())])
+
+ reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'i': [0]
+ }
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="straddling object straddles two block boundaries*"
+ ):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_bad_first_parse(self):
+ bad_first_block = b'{"n": }\n{"n": 10000}'
+ read_options = ReadOptions()
+ read_options.block_size = 16
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: Invalid value.*"):
+ self.open_bytes(bad_first_block, read_options=read_options)
+
+ def test_bad_middle_parse_after_empty(self):
+ bad_first_block = b'{ }{"n": }\n{"n": 10000}'
+ read_options = ReadOptions()
+ read_options.block_size = 16
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: Invalid value.*"):
+ self.open_bytes(bad_first_block, read_options=read_options)
+
+ def test_bad_middle_parse(self):
+ bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}'
+ read_options = ReadOptions()
+ read_options.block_size = 10
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [1000]
+ }
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="JSON parse error:\
+ Missing a comma or '}' after an object member*"
+ ):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_first_block(self):
+ bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_bad_first_block(self):
+ bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_bad_middle_block(self):
+ bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [1]
+ }
+
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_ignore_leading_empty_blocks(self):
+ leading_empty_chunk = b' \n{"b": true, "s": "foo"}'
+ explicit_schema = pa.schema([
+ ('b', pa.bool_()),
+ ('s', pa.utf8())
+ ])
+ read_options = ReadOptions(block_size=24)
+ parse_options = ParseOptions(explicit_schema=explicit_schema)
+ expected_data = {
+ 'b': [True], 's': ["foo"]
+ }
+
+ reader = self.open_bytes(
+ leading_empty_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ self.check_reader(reader, explicit_schema, [expected_data])
+
+ def test_inference(self):
+ rows = b'{"a": 0, "b": "foo" }\n\
+ {"a": 1, "c": true }\n{"a": 2, "d": 4.0}'
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8())
+ ])
+ expected_data = {'a': [0], 'b': ["foo"]}
+
+ read_options = ReadOptions(block_size=32)
+ parse_options = ParseOptions(unexpected_field_behavior="infer")
+ reader = self.open_bytes(
+ rows,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: unexpected field"):
+ reader.read_next_batch()
+
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8()),
+ ('c', pa.bool_()),
+ ])
+ expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]}
+ read_options = ReadOptions(block_size=64)
+ reader = self.open_bytes(rows, read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: unexpected field"):
+ reader.read_next_batch()
+
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8()),
+ ('c', pa.bool_()),
+ ('d', pa.float64()),
+ ])
+ expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None],
+ 'c': [None, True, None], 'd': [None, None, 4.0]}
+ read_options = ReadOptions(block_size=96)
+ reader = self.open_bytes(rows, read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+
+
class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase):
def read_json(self, *args, **kwargs):
@@ -357,3 +608,14 @@ class TestParallelJSONRead(BaseTestJSONRead,
unittest.TestCase):
table = read_json(*args, **kwargs)
table.validate(full=True)
return table
+
+
+class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead,
unittest.TestCase):
+
+ use_threads = False
+
+
[email protected]
+class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead,
unittest.TestCase):
+
+ use_threads = True