This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ea936e3506 GH-37910: [Java][Integration] Implement C Data Interface 
integration testing (#38248)
ea936e3506 is described below

commit ea936e3506e5b408ff39a2ef762ab5fa7aba72ae
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 19 14:00:28 2023 +0200

    GH-37910: [Java][Integration] Implement C Data Interface integration 
testing (#38248)
    
    
    
    ### Rationale for this change
    
    ### What changes are included in this PR?
    
    ### Are these changes tested?
    
    ### Are there any user-facing changes?
    
    * Closes: #37910
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 ci/scripts/integration_arrow.sh                    |   4 +-
 dev/archery/archery/integration/cdata.py           |  18 ++-
 dev/archery/archery/integration/datagen.py         |   1 -
 dev/archery/archery/integration/runner.py          |  23 +--
 dev/archery/archery/integration/tester.py          | 104 ++++++------
 dev/archery/archery/integration/tester_cpp.py      |  11 --
 dev/archery/archery/integration/tester_csharp.py   |  19 ++-
 dev/archery/archery/integration/tester_go.py       |  18 +--
 dev/archery/archery/integration/tester_java.py     | 177 ++++++++++++++++++++-
 docker-compose.yml                                 |  11 +-
 .../apache/arrow/c/BufferImportTypeVisitor.java    |   4 +-
 .../c/src/main/java/org/apache/arrow/c/Format.java |   4 +
 .../java/org/apache/arrow/c/SchemaImporter.java    |   2 +-
 .../java/org/apache/arrow/c/DictionaryTest.java    |   4 +-
 .../test/java/org/apache/arrow/c/StreamTest.java   |   2 +-
 .../java/org/apache/arrow/vector/NullVector.java   |   1 +
 .../arrow/vector/compare/RangeEqualsVisitor.java   |   6 +-
 .../vector/dictionary/DictionaryProvider.java      |  29 +++-
 .../apache/arrow/vector/ipc/JsonFileReader.java    |  38 +++--
 .../arrow/vector/ipc/message/ArrowRecordBatch.java |   4 +-
 .../org/apache/arrow/vector/util/Validator.java    |  26 +++
 21 files changed, 372 insertions(+), 134 deletions(-)

diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 289d376a4d..2861b1c09d 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -23,8 +23,8 @@ arrow_dir=${1}
 gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
 
 pip install -e $arrow_dir/dev/archery[integration]
-# For C# C Data Interface testing
-pip install pythonnet
+# For C Data Interface testing
+pip install jpype1 pythonnet
 
 # Get more detailed context on crashes
 export PYTHONFAULTHANDLER=1
diff --git a/dev/archery/archery/integration/cdata.py 
b/dev/archery/archery/integration/cdata.py
index c201f5f867..8e5550fcdb 100644
--- a/dev/archery/archery/integration/cdata.py
+++ b/dev/archery/archery/integration/cdata.py
@@ -80,6 +80,15 @@ def ffi() -> cffi.FFI:
     return ffi
 
 
+def _release_memory_steps(exporter: CDataExporter, importer: CDataImporter):
+    yield
+    for i in range(max(exporter.required_gc_runs, importer.required_gc_runs)):
+        importer.run_gc()
+        yield
+        exporter.run_gc()
+        yield
+
+
 @contextmanager
 def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
     """
@@ -96,12 +105,13 @@ def check_memory_released(exporter: CDataExporter, 
importer: CDataImporter):
     if do_check:
         before = exporter.record_allocation_state()
     yield
-    # We don't use a `finally` clause: if the enclosed block raised an
-    # exception, no need to add another one.
+    # Only check for memory state if `yield` didn't raise.
     if do_check:
-        ok = exporter.compare_allocation_state(before, importer.gc_until)
-        if not ok:
+        for _ in _release_memory_steps(exporter, importer):
             after = exporter.record_allocation_state()
+            if after == before:
+                break
+        if after != before:
             raise RuntimeError(
                 f"Memory was not released correctly after roundtrip: "
                 f"before = {before}, after = {after} (should have been equal)")
diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index f229012366..7635cfd98f 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1722,7 +1722,6 @@ def generate_dictionary_unsigned_case():
 
     # TODO: JavaScript does not support uint64 dictionary indices, so disabled
     # for now
-
     # dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
     fields = [
         DictionaryField('f0', get_field('', 'uint8'), dict0),
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index eb2e26951c..841633f94c 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -421,17 +421,18 @@ class IntegrationRunner(object):
         # Serial execution is required for proper memory accounting
         serial = True
 
-        exporter = producer.make_c_data_exporter()
-        importer = consumer.make_c_data_importer()
-
-        case_runner = partial(self._run_c_schema_test_case, producer, consumer,
-                              exporter, importer)
-        self._run_test_cases(case_runner, self.json_files, serial=serial)
-
-        if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER:
-            case_runner = partial(self._run_c_array_test_cases, producer, 
consumer,
-                                  exporter, importer)
-            self._run_test_cases(case_runner, self.json_files, serial=serial)
+        with producer.make_c_data_exporter() as exporter:
+            with consumer.make_c_data_importer() as importer:
+                case_runner = partial(self._run_c_schema_test_case,
+                                      producer, consumer,
+                                      exporter, importer)
+                self._run_test_cases(case_runner, self.json_files, 
serial=serial)
+
+                if producer.C_DATA_ARRAY_EXPORTER and 
consumer.C_DATA_ARRAY_IMPORTER:
+                    case_runner = partial(self._run_c_array_test_cases,
+                                          producer, consumer,
+                                          exporter, importer)
+                    self._run_test_cases(case_runner, self.json_files, 
serial=serial)
 
     def _run_c_schema_test_case(self,
                                 producer: Tester, consumer: Tester,
diff --git a/dev/archery/archery/integration/tester.py 
b/dev/archery/archery/integration/tester.py
index 6cde20e61b..eadb953a61 100644
--- a/dev/archery/archery/integration/tester.py
+++ b/dev/archery/archery/integration/tester.py
@@ -68,52 +68,52 @@ class CDataExporter(ABC):
         Whether the implementation is able to release memory deterministically.
 
         Here, "release memory" means that, after the `release` callback of
-        a C Data Interface export is called, `compare_allocation_state` is
-        able to trigger the deallocation of the memory underlying the export
-        (for example buffer data).
+        a C Data Interface export is called, `run_gc` is able to trigger
+        the deallocation of the memory underlying the export (such as buffer 
data).
 
-        If false, then `record_allocation_state` and `compare_allocation_state`
-        are allowed to raise NotImplementedError.
+        If false, then `record_allocation_state` is allowed to raise
+        NotImplementedError.
         """
 
     def record_allocation_state(self) -> object:
         """
-        Record the current memory allocation state.
+        Return the current memory allocation state.
 
         Returns
         -------
         state : object
-            Opaque object representing the allocation state,
-            for example the number of allocated bytes.
+            Equality-comparable object representing the allocation state,
+            for example the number of allocated or exported bytes.
         """
         raise NotImplementedError
 
-    def compare_allocation_state(self, recorded: object,
-                                 gc_until: typing.Callable[[_Predicate], bool]
-                                 ) -> bool:
+    def run_gc(self):
         """
-        Compare the current memory allocation state with the recorded one.
+        Run the GC if necessary.
 
-        Parameters
-        ----------
-        recorded : object
-            The previous allocation state returned by
-            `record_allocation_state()`
-        gc_until : callable
-            A callable itself accepting a callable predicate, and
-            returning a boolean.
-            `gc_until` should try to release memory until the predicate
-            becomes true, or until it decides to give up. The final value
-            of the predicate should be returned.
-            `gc_until` is typically provided by the C Data Interface importer.
+        This should ensure that any temporary objects and data created by
+        previous exporter calls are collected.
+        """
 
-        Returns
-        -------
-        success : bool
-            Whether memory allocation state finally reached its previously
-            recorded value.
+    @property
+    def required_gc_runs(self):
         """
-        raise NotImplementedError
+        The maximum number of calls to `run_gc` that need to be issued to
+        ensure proper deallocation. Some implementations may require this
+        to be greater than one.
+        """
+        return 1
+
+    def close(self):
+        """
+        Final cleanup after usage.
+        """
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc):
+        self.close()
 
 
 class CDataImporter(ABC):
@@ -163,32 +163,40 @@ class CDataImporter(ABC):
         """
         Whether the implementation is able to release memory deterministically.
 
-        Here, "release memory" means calling the `release` callback of
-        a C Data Interface export (which should then trigger a deallocation
-        mechanism on the exporter).
+        Here, "release memory" means `run_gc()` is able to trigger the
+        `release` callback of a C Data Interface export (which would then
+        induce a deallocation mechanism on the exporter).
+        """
 
-        If false, then `gc_until` is allowed to raise NotImplementedError.
+    def run_gc(self):
         """
+        Run the GC if necessary.
 
-    def gc_until(self, predicate: _Predicate):
+        This should ensure that any imported data has its release callback 
called.
         """
-        Try to release memory until the predicate becomes true, or fail.
 
-        Depending on the CDataImporter implementation, this may for example
-        try once, or run a garbage collector a given number of times, or
-        any other implementation-specific strategy for releasing memory.
+    @property
+    def required_gc_runs(self):
+        """
+        The maximum number of calls to `run_gc` that need to be issued to
+        ensure release callbacks are triggered. Some implementations may
+        require this to be greater than one.
+        """
+        return 1
 
-        The running time should be kept reasonable and compatible with
-        execution of multiple C Data integration tests.
+    def close(self):
+        """
+        Final cleanup after usage.
+        """
 
-        This should not raise if `supports_releasing_memory` is true.
+    def __enter__(self):
+        return self
 
-        Returns
-        -------
-        success : bool
-            The final value of the predicate.
-        """
-        raise NotImplementedError
+    def __exit__(self, *exc):
+        # Make sure any exported data is released.
+        for i in range(self.required_gc_runs):
+            self.run_gc()
+        self.close()
 
 
 class Tester:
diff --git a/dev/archery/archery/integration/tester_cpp.py 
b/dev/archery/archery/integration/tester_cpp.py
index 866fc225d2..658e713301 100644
--- a/dev/archery/archery/integration/tester_cpp.py
+++ b/dev/archery/archery/integration/tester_cpp.py
@@ -223,13 +223,6 @@ class CppCDataExporter(CDataExporter, _CDataBase):
     def record_allocation_state(self):
         return self.dll.ArrowCpp_BytesAllocated()
 
-    def compare_allocation_state(self, recorded, gc_until):
-        def pred():
-            # No GC on our side, so just compare allocation state
-            return self.record_allocation_state() == recorded
-
-        return gc_until(pred)
-
 
 class CppCDataImporter(CDataImporter, _CDataBase):
 
@@ -247,7 +240,3 @@ class CppCDataImporter(CDataImporter, _CDataBase):
     @property
     def supports_releasing_memory(self):
         return True
-
-    def gc_until(self, predicate):
-        # No GC on our side, so can evaluate predicate immediately
-        return predicate()
diff --git a/dev/archery/archery/integration/tester_csharp.py 
b/dev/archery/archery/integration/tester_csharp.py
index 83b07495f9..7dca525673 100644
--- a/dev/archery/archery/integration/tester_csharp.py
+++ b/dev/archery/archery/integration/tester_csharp.py
@@ -16,7 +16,6 @@
 # under the License.
 
 from contextlib import contextmanager
-import gc
 import os
 
 from . import cdata
@@ -82,6 +81,10 @@ class _CDataBase:
         schema = jf.Schema.ToArrow()
         return schema, jf.Batches[num_batch].ToArrow(schema)
 
+    def _run_gc(self):
+        from Apache.Arrow.IntegrationTest import CDataInterface
+        CDataInterface.RunGC()
+
 
 class CSharpCDataExporter(CDataExporter, _CDataBase):
 
@@ -105,6 +108,9 @@ class CSharpCDataExporter(CDataExporter, _CDataBase):
         # XXX the C# GC doesn't give reliable allocation measurements
         return False
 
+    def run_gc(self):
+        self._run_gc()
+
 
 class CSharpCDataImporter(CDataImporter, _CDataBase):
 
@@ -134,15 +140,8 @@ class CSharpCDataImporter(CDataImporter, _CDataBase):
     def supports_releasing_memory(self):
         return True
 
-    def gc_until(self, predicate):
-        from Apache.Arrow.IntegrationTest import CDataInterface
-        for i in range(3):
-            if predicate():
-                return True
-            # Collect any C# objects hanging around through Python
-            gc.collect()
-            CDataInterface.RunGC()
-        return predicate()
+    def run_gc(self):
+        self._run_gc()
 
 
 class CSharpTester(Tester):
diff --git a/dev/archery/archery/integration/tester_go.py 
b/dev/archery/archery/integration/tester_go.py
index b7af233f5d..2b3dc3a1be 100644
--- a/dev/archery/archery/integration/tester_go.py
+++ b/dev/archery/archery/integration/tester_go.py
@@ -200,9 +200,6 @@ class _CDataBase:
             finally:
                 self.dll.ArrowGo_FreeError(go_error)
 
-    def _run_gc(self):
-        self.dll.ArrowGo_RunGC()
-
 
 class GoCDataExporter(CDataExporter, _CDataBase):
     # Note: the Arrow Go C Data export functions expect their output
@@ -225,14 +222,10 @@ class GoCDataExporter(CDataExporter, _CDataBase):
         return True
 
     def record_allocation_state(self):
-        self._run_gc()
         return self.dll.ArrowGo_BytesAllocated()
 
-    def compare_allocation_state(self, recorded, gc_until):
-        def pred():
-            return self.record_allocation_state() == recorded
-
-        return gc_until(pred)
+    # Note: no need to call the Go GC anywhere thanks to Arrow Go's
+    # explicit refcounting.
 
 
 class GoCDataImporter(CDataImporter, _CDataBase):
@@ -252,10 +245,3 @@ class GoCDataImporter(CDataImporter, _CDataBase):
     @property
     def supports_releasing_memory(self):
         return True
-
-    def gc_until(self, predicate):
-        for i in range(10):
-            if predicate():
-                return True
-            self._run_gc()
-        return False
diff --git a/dev/archery/archery/integration/tester_java.py 
b/dev/archery/archery/integration/tester_java.py
index 45855079eb..5684798d79 100644
--- a/dev/archery/archery/integration/tester_java.py
+++ b/dev/archery/archery/integration/tester_java.py
@@ -16,10 +16,12 @@
 # under the License.
 
 import contextlib
+import functools
 import os
 import subprocess
 
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
 from .util import run_cmd, log
 from ..utils.source import ARROW_ROOT_DEFAULT
 
@@ -32,6 +34,8 @@ def load_version_from_pom():
     return version_tag.text
 
 
+# XXX Should we add "-Darrow.memory.debug.allocator=true"? It adds a couple
+# minutes to total CPU usage of the integration test suite.
 _JAVA_OPTS = [
     "-Dio.netty.tryReflectionSetAccessible=true",
     "-Darrow.struct.conflict.policy=CONFLICT_APPEND",
@@ -42,18 +46,25 @@ _ARROW_TOOLS_JAR = os.environ.get(
     "ARROW_JAVA_INTEGRATION_JAR",
     os.path.join(
         ARROW_ROOT_DEFAULT,
-        "java/tools/target/arrow-tools-{}-"
-        "jar-with-dependencies.jar".format(_arrow_version),
-    ),
+        "java/tools/target",
+        f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar"
+    )
+)
+_ARROW_C_DATA_JAR = os.environ.get(
+    "ARROW_C_DATA_JAVA_INTEGRATION_JAR",
+    os.path.join(
+        ARROW_ROOT_DEFAULT,
+        "java/c/target",
+        f"arrow-c-data-{_arrow_version}.jar"
+    )
 )
 _ARROW_FLIGHT_JAR = os.environ.get(
     "ARROW_FLIGHT_JAVA_INTEGRATION_JAR",
     os.path.join(
         ARROW_ROOT_DEFAULT,
-        "java/flight/flight-integration-tests/target/"
-        "flight-integration-tests-{}-jar-with-dependencies.jar".format(
-            _arrow_version),
-    ),
+        "java/flight/flight-integration-tests/target",
+        f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar"
+    )
 )
 _ARROW_FLIGHT_SERVER = (
     "org.apache.arrow.flight.integration.tests.IntegrationTestServer"
@@ -63,11 +74,155 @@ _ARROW_FLIGHT_CLIENT = (
 )
 
 
[email protected]_cache
+def setup_jpype():
+    import jpype
+    jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}"
+    # XXX Didn't manage to tone down the logging level here (DEBUG -> INFO)
+    jpype.startJVM(jpype.getDefaultJVMPath(),
+                   "-Djava.class.path=" + jar_path, *_JAVA_OPTS)
+
+
+class _CDataBase:
+
+    def __init__(self, debug, args):
+        import jpype
+        self.debug = debug
+        self.args = args
+        self.ffi = cdata.ffi()
+        setup_jpype()
+        # JPype pointers to java.io, org.apache.arrow...
+        self.java_io = jpype.JPackage("java").io
+        self.java_arrow = jpype.JPackage("org").apache.arrow
+        self.java_allocator = self._make_java_allocator()
+
+    def _pointer_to_int(self, c_ptr):
+        return int(self.ffi.cast('uintptr_t', c_ptr))
+
+    def _wrap_c_schema_ptr(self, c_schema_ptr):
+        return self.java_arrow.c.ArrowSchema.wrap(
+            self._pointer_to_int(c_schema_ptr))
+
+    def _wrap_c_array_ptr(self, c_array_ptr):
+        return self.java_arrow.c.ArrowArray.wrap(
+            self._pointer_to_int(c_array_ptr))
+
+    def _make_java_allocator(self):
+        # Return a new allocator
+        return self.java_arrow.memory.RootAllocator()
+
+    def _assert_schemas_equal(self, expected, actual):
+        # XXX This is fragile for dictionaries, as Schema.equals compares
+        # dictionary ids.
+        self.java_arrow.vector.util.Validator.compareSchemas(
+            expected, actual)
+
+    def _assert_batches_equal(self, expected, actual):
+        self.java_arrow.vector.util.Validator.compareVectorSchemaRoot(
+            expected, actual)
+
+    def _assert_dict_providers_equal(self, expected, actual):
+        self.java_arrow.vector.util.Validator.compareDictionaryProviders(
+            expected, actual)
+
+    # Note: no need to call the Java GC anywhere thanks to AutoCloseable
+
+
+class JavaCDataExporter(CDataExporter, _CDataBase):
+
+    def export_schema_from_json(self, json_path, c_schema_ptr):
+        json_file = self.java_io.File(json_path)
+        with self.java_arrow.vector.ipc.JsonFileReader(
+                json_file, self.java_allocator) as json_reader:
+            schema = json_reader.start()
+            dict_provider = json_reader
+            self.java_arrow.c.Data.exportSchema(
+                self.java_allocator, schema, dict_provider,
+                self._wrap_c_schema_ptr(c_schema_ptr)
+            )
+
+    def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+        json_file = self.java_io.File(json_path)
+        with self.java_arrow.vector.ipc.JsonFileReader(
+                json_file, self.java_allocator) as json_reader:
+            json_reader.start()
+            if num_batch > 0:
+                actually_skipped = json_reader.skip(num_batch)
+                assert actually_skipped == num_batch
+            with json_reader.read() as batch:
+                dict_provider = json_reader
+                self.java_arrow.c.Data.exportVectorSchemaRoot(
+                    self.java_allocator, batch, dict_provider,
+                    self._wrap_c_array_ptr(c_array_ptr))
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def record_allocation_state(self):
+        return self.java_allocator.getAllocatedMemory()
+
+    def close(self):
+        self.java_allocator.close()
+
+
+class JavaCDataImporter(CDataImporter, _CDataBase):
+
+    def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+        json_file = self.java_io.File(json_path)
+        with self.java_arrow.vector.ipc.JsonFileReader(
+                json_file, self.java_allocator) as json_reader:
+            json_schema = json_reader.start()
+            with self.java_arrow.c.CDataDictionaryProvider() as dict_provider:
+                imported_schema = self.java_arrow.c.Data.importSchema(
+                    self.java_allocator,
+                    self._wrap_c_schema_ptr(c_schema_ptr),
+                    dict_provider)
+                self._assert_schemas_equal(json_schema, imported_schema)
+
+    def import_batch_and_compare_to_json(self, json_path, num_batch,
+                                         c_array_ptr):
+        json_file = self.java_io.File(json_path)
+        with self.java_arrow.vector.ipc.JsonFileReader(
+                json_file, self.java_allocator) as json_reader:
+            schema = json_reader.start()
+            if num_batch > 0:
+                actually_skipped = json_reader.skip(num_batch)
+                assert actually_skipped == num_batch
+            with json_reader.read() as batch:
+                with self.java_arrow.vector.VectorSchemaRoot.create(
+                        schema, self.java_allocator) as imported_batch:
+                    # We need to pass a dict provider primed with dictionary 
ids
+                    # matching those in the schema, hence an empty
+                    # CDataDictionaryProvider would not work here.
+                    dict_provider = (self.java_arrow.vector.dictionary
+                                     
.DictionaryProvider.MapDictionaryProvider())
+                    dict_provider.copyStructureFrom(json_reader, 
self.java_allocator)
+                    with dict_provider:
+                        self.java_arrow.c.Data.importIntoVectorSchemaRoot(
+                            self.java_allocator,
+                            self._wrap_c_array_ptr(c_array_ptr),
+                            imported_batch, dict_provider)
+                        self._assert_batches_equal(batch, imported_batch)
+                        self._assert_dict_providers_equal(json_reader, 
dict_provider)
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def close(self):
+        self.java_allocator.close()
+
+
 class JavaTester(Tester):
     PRODUCER = True
     CONSUMER = True
     FLIGHT_SERVER = True
     FLIGHT_CLIENT = True
+    C_DATA_SCHEMA_EXPORTER = True
+    C_DATA_SCHEMA_IMPORTER = True
+    C_DATA_ARRAY_EXPORTER = True
+    C_DATA_ARRAY_IMPORTER = True
 
     name = 'Java'
 
@@ -186,3 +341,9 @@ class JavaTester(Tester):
         finally:
             server.kill()
             server.wait(5)
+
+    def make_c_data_exporter(self):
+        return JavaCDataExporter(self.debug, self.args)
+
+    def make_c_data_importer(self):
+        return JavaCDataImporter(self.debug, self.args)
diff --git a/docker-compose.yml b/docker-compose.yml
index 0e5034346e..e54c609e54 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1730,16 +1730,21 @@ services:
     volumes: *conda-volumes
     environment:
       <<: [*common, *ccache]
-      # tell archery where the arrow binaries are located
+      ARCHERY_INTEGRATION_WITH_RUST: 0
+      # Tell Archery where the arrow C++ binaries are located
       ARROW_CPP_EXE_PATH: /build/cpp/debug
       ARROW_GO_INTEGRATION: 1
-      ARCHERY_INTEGRATION_WITH_RUST: 0
+      ARROW_JAVA_CDATA: "ON"
+      JAVA_JNI_CMAKE_ARGS: >-
+        -DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF
+        -DARROW_JAVA_JNI_ENABLE_C=ON
     command:
       ["/arrow/ci/scripts/rust_build.sh /arrow /build &&
         /arrow/ci/scripts/cpp_build.sh /arrow /build &&
         /arrow/ci/scripts/csharp_build.sh /arrow /build &&
         /arrow/ci/scripts/go_build.sh /arrow &&
-        /arrow/ci/scripts/java_build.sh /arrow /build &&
+        /arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build 
/tmp/dist/java/$$(arch) &&
+        /arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java &&
         /arrow/ci/scripts/js_build.sh /arrow /build &&
         /arrow/ci/scripts/integration_arrow.sh /arrow /build"]
 
diff --git 
a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java 
b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
index 7408bf7113..cd2a464f4f 100644
--- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
+++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
@@ -165,9 +165,9 @@ class BufferImportTypeVisitor implements 
ArrowType.ArrowTypeVisitor<List<ArrowBu
         return Collections.singletonList(importFixedBytes(type, 0, 
UnionVector.TYPE_WIDTH));
       case Dense:
         return Arrays.asList(importFixedBytes(type, 0, 
DenseUnionVector.TYPE_WIDTH),
-            importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH));
+            importFixedBytes(type, 1, DenseUnionVector.OFFSET_WIDTH));
       default:
-        throw new UnsupportedOperationException("Importing buffers for type: " 
+ type);
+        throw new UnsupportedOperationException("Importing buffers for union 
type: " + type);
     }
   }
 
diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java 
b/java/c/src/main/java/org/apache/arrow/c/Format.java
index 315d3caad7..2875e46f74 100644
--- a/java/c/src/main/java/org/apache/arrow/c/Format.java
+++ b/java/c/src/main/java/org/apache/arrow/c/Format.java
@@ -138,6 +138,8 @@ final class Format {
             return "tiD";
           case YEAR_MONTH:
             return "tiM";
+          case MONTH_DAY_NANO:
+            return "tin";
           default:
             throw new UnsupportedOperationException(
                 String.format("Interval type with unit %s is unsupported", 
type.getUnit()));
@@ -277,6 +279,8 @@ final class Format {
         return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
       case "tiD":
         return new ArrowType.Interval(IntervalUnit.DAY_TIME);
+      case "tin":
+        return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO);
       case "+l":
         return new ArrowType.List();
       case "+L":
diff --git a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java 
b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
index 21d88f6cd4..09a6afafa0 100644
--- a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
+++ b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
@@ -44,7 +44,7 @@ final class SchemaImporter {
   private static final Logger logger = 
LoggerFactory.getLogger(SchemaImporter.class);
 
   private static final int MAX_IMPORT_RECURSION_LEVEL = 64;
-  private long nextDictionaryID = 1L;
+  private long nextDictionaryID = 0L;
 
   private final BufferAllocator allocator;
 
diff --git a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java 
b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
index 3f793f836d..9dcb262af4 100644
--- a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
@@ -100,7 +100,7 @@ public class DictionaryTest {
     dictVector.setSafe(2, "cc".getBytes());
     dictVector.setValueCount(3);
 
-    Dictionary dictionary = new Dictionary(dictVector, new 
DictionaryEncoding(1L, false, /* indexType= */null));
+    Dictionary dictionary = new Dictionary(dictVector, new 
DictionaryEncoding(0L, false, /* indexType= */null));
     provider.put(dictionary);
 
     // create vector and encode it
@@ -169,7 +169,7 @@ public class DictionaryTest {
       dictVector.setSafe(3, "dd".getBytes());
       dictVector.setSafe(4, "ee".getBytes());
       dictVector.setValueCount(5);
-      Dictionary dictionary = new Dictionary(dictVector, new 
DictionaryEncoding(1L, false, /* indexType= */null));
+      Dictionary dictionary = new Dictionary(dictVector, new 
DictionaryEncoding(0L, false, /* indexType= */null));
       provider.put(dictionary);
 
       Schema schema = new Schema(Collections.singletonList(vector.getField()));
diff --git a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java 
b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
index 06401687a5..68d4fc2a81 100644
--- a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
@@ -135,7 +135,7 @@ final class StreamTest {
   @Test
   public void roundtripDictionary() throws Exception {
     final ArrowType.Int indexType = new ArrowType.Int(32, true);
-    final DictionaryEncoding encoding = new DictionaryEncoding(1L, false, 
indexType);
+    final DictionaryEncoding encoding = new DictionaryEncoding(0L, false, 
indexType);
     final Schema schema = new Schema(Collections.singletonList(
         new Field("dict", new FieldType(/*nullable=*/true, indexType, 
encoding), Collections.emptyList())));
     final List<ArrowRecordBatch> batches = new ArrayList<>();
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
index 6e4c2764bd..1badf4b4ca 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
@@ -192,6 +192,7 @@ public class NullVector implements FieldVector {
   @Override
   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
     Preconditions.checkArgument(ownBuffers.isEmpty(), "Null vector has no 
buffers");
+    valueCount = fieldNode.getLength();
   }
 
   @Override
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
index 698ddac466..5323ddda83 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
@@ -121,9 +121,11 @@ public class RangeEqualsVisitor implements 
VectorVisitor<Boolean, Range> {
         "rightStart %s must be non negative.", range.getRightStart());
 
     Preconditions.checkArgument(range.getRightStart() + range.getLength() <= 
right.getValueCount(),
-        "(rightStart + length) %s out of range[0, %s].", 0, 
right.getValueCount());
+        "(rightStart + length) %s out of range[0, %s].",
+        range.getRightStart() + range.getLength(), right.getValueCount());
     Preconditions.checkArgument(range.getLeftStart() + range.getLength() <= 
left.getValueCount(),
-        "(leftStart + length) %s out of range[0, %s].", 0, 
left.getValueCount());
+        "(leftStart + length) %s out of range[0, %s].",
+        range.getLeftStart() + range.getLength(), left.getValueCount());
 
     return left.accept(this, range);
   }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
index 76e1eb9f66..f64c32be0f 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
@@ -21,6 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.VisibleForTesting;
+
 /**
  * A manager for association of dictionary IDs to their corresponding {@link 
Dictionary}.
  */
@@ -35,7 +38,7 @@ public interface DictionaryProvider {
   /**
    * Implementation of {@link DictionaryProvider} that is backed by a hash-map.
    */
-  class MapDictionaryProvider implements DictionaryProvider {
+  class MapDictionaryProvider implements AutoCloseable, DictionaryProvider {
 
     private final Map<Long, Dictionary> map;
 
@@ -49,6 +52,23 @@ public interface DictionaryProvider {
       }
     }
 
+    /**
+     * Initialize the map structure from another provider, but with empty 
vectors.
+     *
+     * @param other the {@link DictionaryProvider} to copy the ids and fields 
from
+     * @param allocator allocator to create the empty vectors
+     */
+    // This is currently called using JPype by the integration tests.
+    @VisibleForTesting
+    public void copyStructureFrom(DictionaryProvider other, BufferAllocator 
allocator) {
+      for (Long id : other.getDictionaryIds()) {
+        Dictionary otherDict = other.lookup(id);
+        Dictionary newDict = new 
Dictionary(otherDict.getVector().getField().createVector(allocator),
+                otherDict.getEncoding());
+        put(newDict);
+      }
+    }
+
     public void put(Dictionary dictionary) {
       map.put(dictionary.getEncoding().getId(), dictionary);
     }
@@ -62,5 +82,12 @@ public interface DictionaryProvider {
     public Dictionary lookup(long id) {
       return map.get(id);
     }
+
+    @Override
+    public void close() {
+      for (Dictionary dictionary : map.values()) {
+        dictionary.getVector().close();
+      }
+    }
   }
 }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index 742daeef25..0c23a664f6 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -237,6 +237,28 @@ public class JsonFileReader implements AutoCloseable, 
DictionaryProvider {
     }
   }
 
+  /**
+   * Skips a number of record batches in the file.
+   *
+   * @param numBatches the number of batches to skip
+   * @return the actual number of skipped batches.
+   */
+  // This is currently called using JPype by the integration tests.
+  public int skip(int numBatches) throws IOException {
+    for (int i = 0; i < numBatches; ++i) {
+      JsonToken t = parser.nextToken();
+      if (t == START_OBJECT) {
+        parser.skipChildren();
+        assert parser.getCurrentToken() == END_OBJECT;
+      } else if (t == END_ARRAY) {
+        return i;
+      } else {
+        throw new IllegalArgumentException("Invalid token: " + t);
+      }
+    }
+    return numBatches;
+  }
+
   private abstract class BufferReader {
     protected abstract ArrowBuf read(BufferAllocator allocator, int count) 
throws IOException;
 
@@ -692,7 +714,8 @@ public class JsonFileReader implements AutoCloseable, 
DictionaryProvider {
   }
 
   private void readFromJsonIntoVector(Field field, FieldVector vector) throws 
JsonParseException, IOException {
-    TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType());
+    ArrowType type = field.getType();
+    TypeLayout typeLayout = TypeLayout.getTypeLayout(type);
     List<BufferType> vectorTypes = typeLayout.getBufferTypes();
     ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()];
     /*
@@ -728,21 +751,18 @@ public class JsonFileReader implements AutoCloseable, 
DictionaryProvider {
         BufferType bufferType = vectorTypes.get(v);
         nextFieldIs(bufferType.getName());
         int innerBufferValueCount = valueCount;
-        if (bufferType.equals(OFFSET) && 
!field.getType().getTypeID().equals(ArrowType.ArrowTypeID.Union)) {
-          /* offset buffer has 1 additional value capacity */
+        if (bufferType.equals(OFFSET) && !(type instanceof ArrowType.Union)) {
+          /* offset buffer has 1 additional value capacity except for dense 
unions */
           innerBufferValueCount = valueCount + 1;
         }
 
         vectorBuffers[v] = readIntoBuffer(allocator, bufferType, 
vector.getMinorType(), innerBufferValueCount);
       }
 
-      if (vectorBuffers.length == 0) {
-        readToken(END_OBJECT);
-        return;
-      }
-
       int nullCount = 0;
-      if (!(vector.getField().getFieldType().getType() instanceof 
ArrowType.Union)) {
+      if (type instanceof ArrowType.Null) {
+        nullCount = valueCount;
+      } else if (!(type instanceof ArrowType.Union)) {
         nullCount = BitVectorHelper.getNullCount(vectorBuffers[0], valueCount);
       }
       final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, 
nullCount);
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 83a8ece0bf..f81d049a92 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -112,8 +112,8 @@ public class ArrowRecordBatch implements ArrowMessage {
       }
       long size = arrowBuf.readableBytes();
       arrowBuffers.add(new ArrowBuffer(offset, size));
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size);
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size);
       }
       offset += size;
       if (alignBuffers) { // align on 8 byte boundaries
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java 
b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
index 741972b4ad..0c9ad1e275 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
@@ -17,6 +17,7 @@
 
 package org.apache.arrow.vector.util;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
@@ -85,6 +86,31 @@ public class Validator {
     }
   }
 
+  /**
+   * Validate two dictionary providers are equal in structure and contents.
+   */
+  public static void compareDictionaryProviders(
+          DictionaryProvider provider1,
+          DictionaryProvider provider2) {
+    List<Long> ids1 = new ArrayList(provider1.getDictionaryIds());
+    List<Long> ids2 = new ArrayList(provider2.getDictionaryIds());
+    java.util.Collections.sort(ids1);
+    java.util.Collections.sort(ids2);
+    if (!ids1.equals(ids2)) {
+      throw new IllegalArgumentException("Different ids in dictionary 
providers:\n" +
+              ids1 + "\n" + ids2);
+    }
+    for (long id : ids1) {
+      Dictionary dict1 = provider1.lookup(id);
+      Dictionary dict2 = provider2.lookup(id);
+      try {
+        compareFieldVectors(dict1.getVector(), dict2.getVector());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException("Different dictionaries:\n" + dict1 
+ "\n" + dict2, e);
+      }
+    }
+  }
+
   /**
    * Validate two arrow vectorSchemaRoot are equal.
    *


Reply via email to