This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 16c7f1a0bb GH-14932: [Python] Add python bindings for JSON streaming 
reader (#45084)
16c7f1a0bb is described below

commit 16c7f1a0bbcfad20eba7c63bc86d3da784d1db34
Author: Xuchen Pan <[email protected]>
AuthorDate: Thu Feb 6 21:47:52 2025 +0800

    GH-14932: [Python] Add python bindings for JSON streaming reader (#45084)
    
    ### Rationale for this change
    
    The C++ arrow has a JSON streaming reader which is not exposed on the 
Python interface.
    
    ### What changes are included in this PR?
    
    This PR is based on #33761. It adds the `open_json` method to open a 
streaming reader for a JSON file.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    Yes. A new `open_json` method has been added to the Python interface, 
located at `pyarrow.json.open_json`, and its parameters are the same as the 
`pyarrow.json.read_json`
    
    * GitHub Issue: #14932
    
    Lead-authored-by: pxc <[email protected]>
    Co-authored-by: Akshay Subramanian <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 docs/source/python/api/formats.rst   |   1 +
 docs/source/python/json.rst          |  12 ++
 python/pyarrow/_csv.pyx              |   2 +-
 python/pyarrow/_json.pyx             |  78 ++++++++-
 python/pyarrow/includes/libarrow.pxd |   7 +
 python/pyarrow/json.py               |   2 +-
 python/pyarrow/tests/test_csv.py     |   2 +-
 python/pyarrow/tests/test_json.py    | 330 +++++++++++++++++++++++++++++++----
 8 files changed, 396 insertions(+), 38 deletions(-)

diff --git a/docs/source/python/api/formats.rst 
b/docs/source/python/api/formats.rst
index 86e2585ac2..a4cf3bbcdd 100644
--- a/docs/source/python/api/formats.rst
+++ b/docs/source/python/api/formats.rst
@@ -66,6 +66,7 @@ JSON Files
 
    ReadOptions
    ParseOptions
+   open_json
    read_json
 
 .. _api.parquet:
diff --git a/docs/source/python/json.rst b/docs/source/python/json.rst
index eff6135d89..277b8e1349 100644
--- a/docs/source/python/json.rst
+++ b/docs/source/python/json.rst
@@ -115,3 +115,15 @@ and pass it to :func:`read_json`.  For example, you can 
pass an explicit
 
 Similarly, you can choose performance settings by passing a
 :class:`ReadOptions` instance to :func:`read_json`.
+
+
+Incremental reading
+-------------------
+
+For memory-constrained environments, it is also possible to read a JSON file
+one batch at a time, using :func:`open_json`.
+
+In this case, type inference is done on the first block and types are frozen 
afterwards.
+To make sure the right data types are inferred, either set
+:attr:`ReadOptions.block_size` to a large enough value, or use
+:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 508488c0c3..e53c6d1847 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None, 
parse_options=None,
         Options for converting CSV data
         (see pyarrow.csv.ConvertOptions constructor for defaults)
     memory_pool : MemoryPool, optional
-        Pool to allocate Table memory from
+        Pool to allocate RecordBatch memory from
 
     Returns
     -------
diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx
index d36dad67ab..c023baeec1 100644
--- a/python/pyarrow/_json.pyx
+++ b/python/pyarrow/_json.pyx
@@ -21,7 +21,9 @@
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
-from pyarrow.lib cimport (_Weakrefable, MemoryPool,
+
+from pyarrow.lib cimport (_Weakrefable, Schema,
+                          RecordBatchReader, MemoryPool,
                           maybe_unbox_memory_pool,
                           get_input_stream, pyarrow_wrap_table,
                           pyarrow_wrap_schema, pyarrow_unwrap_schema)
@@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options, 
CJSONParseOptions* out):
         out[0] = parse_options.options
 
 
+cdef class JSONStreamingReader(RecordBatchReader):
+    """An object that reads record batches incrementally from a JSON file.
+
+    Should not be instantiated directly by user code.
+    """
+    cdef readonly:
+        Schema schema
+
+    def __init__(self):
+        raise TypeError(f"Do not call {self.__class__.__name__}'s "
+                        "constructor directly, "
+                        "use pyarrow.json.open_json() instead.")
+
+    cdef _open(self, shared_ptr[CInputStream] stream,
+               CJSONReadOptions c_read_options,
+               CJSONParseOptions c_parse_options,
+               MemoryPool memory_pool):
+        cdef:
+            shared_ptr[CSchema] c_schema
+            CIOContext io_context
+
+        io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))
+
+        with nogil:
+            self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
+                CJSONStreamingReader.Make(stream, move(c_read_options),
+                                          move(c_parse_options), io_context))
+            c_schema = self.reader.get().schema()
+
+        self.schema = pyarrow_wrap_schema(c_schema)
+
+
 def read_json(input_file, read_options=None, parse_options=None,
               MemoryPool memory_pool=None):
     """
@@ -308,3 +342,45 @@ def read_json(input_file, read_options=None, 
parse_options=None,
         table = GetResultValue(reader.get().Read())
 
     return pyarrow_wrap_table(table)
+
+
+def open_json(input_file, read_options=None, parse_options=None,
+              MemoryPool memory_pool=None):
+    """
+    Open a streaming reader of JSON data.
+
+    Reading using this function is always single-threaded.
+
+    Parameters
+    ----------
+    input_file : string, path or file-like object
+        The location of JSON data.  If a string or path, and if it ends
+        with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
+        the data is automatically decompressed when reading.
+    read_options : pyarrow.json.ReadOptions, optional
+        Options for the JSON reader (see pyarrow.json.ReadOptions constructor
+        for defaults)
+    parse_options : pyarrow.json.ParseOptions, optional
+        Options for the JSON parser
+        (see pyarrow.json.ParseOptions constructor for defaults)
+    memory_pool : MemoryPool, optional
+        Pool to allocate RecordBatch memory from
+
+    Returns
+    -------
+    :class:`pyarrow.json.JSONStreamingReader`
+    """
+    cdef:
+        shared_ptr[CInputStream] stream
+        CJSONReadOptions c_read_options
+        CJSONParseOptions c_parse_options
+        JSONStreamingReader reader
+
+    _get_reader(input_file, &stream)
+    _get_read_options(read_options, &c_read_options)
+    _get_parse_options(parse_options, &c_parse_options)
+
+    reader = JSONStreamingReader.__new__(JSONStreamingReader)
+    reader._open(stream, move(c_read_options), move(c_parse_options),
+                 memory_pool)
+    return reader
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 8361dcbd2a..d4e34e0a84 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2176,6 +2176,13 @@ cdef extern from "arrow/json/reader.h" namespace 
"arrow::json" nogil:
 
         CResult[shared_ptr[CTable]] Read()
 
+    cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"(
+            CRecordBatchReader):
+        @staticmethod
+        CResult[shared_ptr[CJSONStreamingReader]] Make(
+            shared_ptr[CInputStream],
+            CJSONReadOptions, CJSONParseOptions, CIOContext)
+
 
 cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil:
 
diff --git a/python/pyarrow/json.py b/python/pyarrow/json.py
index a864f5d998..24e6046135 100644
--- a/python/pyarrow/json.py
+++ b/python/pyarrow/json.py
@@ -16,4 +16,4 @@
 # under the License.
 
 
-from pyarrow._json import ReadOptions, ParseOptions, read_json  # noqa
+from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json  # 
noqa
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 6a36b41daf..239ae55f2f 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -387,7 +387,7 @@ class BaseTestCSV(abc.ABC):
         """
         :param b: bytes to be parsed
         :param kwargs: arguments passed on to open the csv file
-        :return: b parsed as a single RecordBatch
+        :return: b parsed as a single Table
         """
         raise NotImplementedError
 
diff --git a/python/pyarrow/tests/test_json.py 
b/python/pyarrow/tests/test_json.py
index 978c92307a..c3f9fe333b 100644
--- a/python/pyarrow/tests/test_json.py
+++ b/python/pyarrow/tests/test_json.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import abc
 from collections import OrderedDict
 from decimal import Decimal
 import io
@@ -30,7 +31,7 @@ except ImportError:
 import pytest
 
 import pyarrow as pa
-from pyarrow.json import read_json, ReadOptions, ParseOptions
+from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions
 
 
 def generate_col_names():
@@ -111,26 +112,20 @@ def test_parse_options(pickle_module):
                                  unexpected_field_behavior="ignore")
 
 
-class BaseTestJSONRead:
-
+class BaseTestJSON(abc.ABC):
+    @abc.abstractmethod
     def read_bytes(self, b, **kwargs):
-        return self.read_json(pa.py_buffer(b), **kwargs)
+        """
+        :param b: bytes to be parsed
+        :param kwargs: arguments passed on to open the json file
+        :return: b parsed as a single Table
+        """
+        raise NotImplementedError
 
     def check_names(self, table, names):
         assert table.num_columns == len(names)
         assert [c.name for c in table.columns] == names
 
-    def test_file_object(self):
-        data = b'{"a": 1, "b": 2}\n'
-        expected_data = {'a': [1], 'b': [2]}
-        bio = io.BytesIO(data)
-        table = self.read_json(bio)
-        assert table.to_pydict() == expected_data
-        # Text files not allowed
-        sio = io.StringIO(data.decode())
-        with pytest.raises(TypeError):
-            self.read_json(sio)
-
     def test_block_sizes(self):
         rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}'
         read_options = ReadOptions()
@@ -229,25 +224,6 @@ class BaseTestJSONRead:
         assert table.num_columns == 0
         assert table.num_rows == 2
 
-    def test_reconcile_across_blocks(self):
-        # ARROW-12065: reconciling inferred types across blocks
-        first_row = b'{                               }\n'
-        read_options = ReadOptions(block_size=len(first_row))
-        for next_rows, expected_pylist in [
-            (b'{"a": 0}', [None, 0]),
-            (b'{"a": []}', [None, []]),
-            (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
-            (b'{"a": {}}', [None, {}]),
-            (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
-             [None, {"b": None}, {"b": {"c": 1}}]),
-        ]:
-            table = self.read_bytes(first_row + next_rows,
-                                    read_options=read_options)
-            expected = {"a": expected_pylist}
-            assert table.to_pydict() == expected
-            # Check that the issue was exercised
-            assert table.column("a").num_chunks > 1
-
     def test_explicit_schema_decimal(self):
         rows = (b'{"a": 1}\n'
                 b'{"a": 1.45}\n'
@@ -339,6 +315,281 @@ class BaseTestJSONRead:
                         assert table.to_pydict() == expected.to_pydict()
 
 
+class BaseTestJSONRead(BaseTestJSON):
+
+    def read_bytes(self, b, **kwargs):
+        return self.read_json(pa.py_buffer(b), **kwargs)
+
+    def test_file_object(self):
+        data = b'{"a": 1, "b": 2}\n'
+        expected_data = {'a': [1], 'b': [2]}
+        bio = io.BytesIO(data)
+        table = self.read_json(bio)
+        assert table.to_pydict() == expected_data
+        # Text files not allowed
+        sio = io.StringIO(data.decode())
+        with pytest.raises(TypeError):
+            self.read_json(sio)
+
+    def test_reconcile_across_blocks(self):
+        # ARROW-12065: reconciling inferred types across blocks
+        first_row = b'{                               }\n'
+        read_options = ReadOptions(block_size=len(first_row))
+        for next_rows, expected_pylist in [
+            (b'{"a": 0}', [None, 0]),
+            (b'{"a": []}', [None, []]),
+            (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
+            (b'{"a": {}}', [None, {}]),
+            (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
+             [None, {"b": None}, {"b": {"c": 1}}]),
+        ]:
+            table = self.read_bytes(first_row + next_rows,
+                                    read_options=read_options)
+            expected = {"a": expected_pylist}
+            assert table.to_pydict() == expected
+            # Check that the issue was exercised
+            assert table.column("a").num_chunks > 1
+
+
+class BaseTestStreamingJSONRead(BaseTestJSON):
+    def open_json(self, json, *args, **kwargs):
+        """
+        Reads the JSON file into memory using pyarrow's open_json
+        json The JSON bytes
+        args Positional arguments to be forwarded to pyarrow's open_json
+        kwargs Keyword arguments to be forwarded to pyarrow's open_json
+        """
+        read_options = kwargs.setdefault('read_options', ReadOptions())
+        read_options.use_threads = self.use_threads
+        return open_json(json, *args, **kwargs)
+
+    def open_bytes(self, b, **kwargs):
+        return self.open_json(pa.py_buffer(b), **kwargs)
+
+    def check_reader(self, reader, expected_schema, expected_data):
+        assert reader.schema == expected_schema
+        batches = list(reader)
+        assert len(batches) == len(expected_data)
+        for batch, expected_batch in zip(batches, expected_data):
+            batch.validate(full=True)
+            assert batch.schema == expected_schema
+            assert batch.to_pydict() == expected_batch
+
+    def read_bytes(self, b, **kwargs):
+        return self.open_bytes(b, **kwargs).read_all()
+
+    def test_file_object(self):
+        data = b'{"a": 1, "b": 2}\n'
+        expected_data = {'a': [1], 'b': [2]}
+        bio = io.BytesIO(data)
+        reader = self.open_json(bio)
+        expected_schema = pa.schema([('a', pa.int64()),
+                                     ('b', pa.int64())])
+        self.check_reader(reader, expected_schema, [expected_data])
+
+    def test_bad_first_chunk(self):
+        bad_first_chunk = b'{"i": 0            }\n{"i": 1}'
+        read_options = ReadOptions()
+        read_options.block_size = 3
+        with pytest.raises(
+            pa.ArrowInvalid,
+            match="straddling object straddles two block boundaries*"
+        ):
+            self.open_bytes(bad_first_chunk, read_options=read_options)
+
+    def test_bad_middle_chunk(self):
+        bad_middle_chunk = b'{"i": 0}\n{"i":     1}\n{"i": 2}'
+        read_options = ReadOptions()
+        read_options.block_size = 10
+        expected_schema = pa.schema([('i', pa.int64())])
+
+        reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == {
+            'i': [0]
+        }
+        with pytest.raises(
+            pa.ArrowInvalid,
+            match="straddling object straddles two block boundaries*"
+        ):
+            reader.read_next_batch()
+
+        with pytest.raises(StopIteration):
+            reader.read_next_batch()
+
+    def test_bad_first_parse(self):
+        bad_first_block = b'{"n": }\n{"n": 10000}'
+        read_options = ReadOptions()
+        read_options.block_size = 16
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error: Invalid value.*"):
+            self.open_bytes(bad_first_block, read_options=read_options)
+
+    def test_bad_middle_parse_after_empty(self):
+        bad_first_block = b'{            }{"n": }\n{"n": 10000}'
+        read_options = ReadOptions()
+        read_options.block_size = 16
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error: Invalid value.*"):
+            self.open_bytes(bad_first_block, read_options=read_options)
+
+    def test_bad_middle_parse(self):
+        bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}'
+        read_options = ReadOptions()
+        read_options.block_size = 10
+        expected_schema = pa.schema([('n', pa.int64())])
+
+        reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == {
+            'n': [1000]
+        }
+        with pytest.raises(
+            pa.ArrowInvalid,
+            match="JSON parse error:\
+ Missing a comma or '}' after an object member*"
+        ):
+            reader.read_next_batch()
+
+        with pytest.raises(StopIteration):
+            reader.read_next_batch()
+
+    def test_non_linewise_chunker_first_block(self):
+        bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+        read_options = ReadOptions(block_size=10)
+        parse_options = ParseOptions(newlines_in_values=True)
+        expected_schema = pa.schema([('n', pa.int64())])
+
+        reader = self.open_bytes(
+            bad_middle_chunk,
+            read_options=read_options,
+            parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == {
+            'n': [0]
+        }
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error *"):
+            reader.read_next_batch()
+
+        with pytest.raises(StopIteration):
+            reader.read_next_batch()
+
+    def test_non_linewise_chunker_bad_first_block(self):
+        bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+        read_options = ReadOptions(block_size=10)
+        parse_options = ParseOptions(newlines_in_values=True)
+        expected_schema = pa.schema([('n', pa.int64())])
+
+        reader = self.open_bytes(
+            bad_middle_chunk,
+            read_options=read_options,
+            parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == {
+            'n': [0]
+        }
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error *"):
+            reader.read_next_batch()
+
+        with pytest.raises(StopIteration):
+            reader.read_next_batch()
+
+    def test_non_linewise_chunker_bad_middle_block(self):
+        bad_middle_chunk = b'{"n": 0}\n{"n":    1}\n{}"n":2}\n{"n": 3}'
+        read_options = ReadOptions(block_size=10)
+        parse_options = ParseOptions(newlines_in_values=True)
+        expected_schema = pa.schema([('n', pa.int64())])
+
+        reader = self.open_bytes(
+            bad_middle_chunk,
+            read_options=read_options,
+            parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == {
+            'n': [0]
+        }
+        assert reader.read_next_batch().to_pydict() == {
+            'n': [1]
+        }
+
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error *"):
+            reader.read_next_batch()
+
+        with pytest.raises(StopIteration):
+            reader.read_next_batch()
+
+    def test_ignore_leading_empty_blocks(self):
+        leading_empty_chunk = b'    \n{"b": true, "s": "foo"}'
+        explicit_schema = pa.schema([
+            ('b', pa.bool_()),
+            ('s', pa.utf8())
+        ])
+        read_options = ReadOptions(block_size=24)
+        parse_options = ParseOptions(explicit_schema=explicit_schema)
+        expected_data = {
+            'b': [True], 's': ["foo"]
+        }
+
+        reader = self.open_bytes(
+            leading_empty_chunk,
+            read_options=read_options,
+            parse_options=parse_options)
+        self.check_reader(reader, explicit_schema, [expected_data])
+
+    def test_inference(self):
+        rows = b'{"a": 0, "b": "foo"    }\n\
+        {"a": 1, "c": true  }\n{"a": 2, "d": 4.0}'
+        expected_schema = pa.schema([
+            ('a', pa.int64()),
+            ('b', pa.utf8())
+        ])
+        expected_data = {'a': [0], 'b': ["foo"]}
+
+        read_options = ReadOptions(block_size=32)
+        parse_options = ParseOptions(unexpected_field_behavior="infer")
+        reader = self.open_bytes(
+            rows,
+            read_options=read_options,
+            parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == expected_data
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error: unexpected field"):
+            reader.read_next_batch()
+
+        expected_schema = pa.schema([
+            ('a', pa.int64()),
+            ('b', pa.utf8()),
+            ('c', pa.bool_()),
+        ])
+        expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]}
+        read_options = ReadOptions(block_size=64)
+        reader = self.open_bytes(rows, read_options=read_options,
+                                 parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == expected_data
+        with pytest.raises(pa.ArrowInvalid,
+                           match="JSON parse error: unexpected field"):
+            reader.read_next_batch()
+
+        expected_schema = pa.schema([
+            ('a', pa.int64()),
+            ('b', pa.utf8()),
+            ('c', pa.bool_()),
+            ('d', pa.float64()),
+        ])
+        expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None],
+                         'c': [None, True, None], 'd': [None, None, 4.0]}
+        read_options = ReadOptions(block_size=96)
+        reader = self.open_bytes(rows, read_options=read_options,
+                                 parse_options=parse_options)
+        assert reader.schema == expected_schema
+        assert reader.read_next_batch().to_pydict() == expected_data
+
+
 class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase):
 
     def read_json(self, *args, **kwargs):
@@ -357,3 +608,14 @@ class TestParallelJSONRead(BaseTestJSONRead, 
unittest.TestCase):
         table = read_json(*args, **kwargs)
         table.validate(full=True)
         return table
+
+
+class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, 
unittest.TestCase):
+
+    use_threads = False
+
+
[email protected]
+class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, 
unittest.TestCase):
+
+    use_threads = True

Reply via email to