This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b646ad4c2 GH-37537: [Integration][C++] Add C Data Interface 
integration testing (#37769)
3b646ad4c2 is described below

commit 3b646ad4c2b826fe08b31d19e6435f73650bcb5e
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Sep 19 16:41:31 2023 +0200

    GH-37537: [Integration][C++] Add C Data Interface integration testing 
(#37769)
    
    ### Rationale for this change
    
    Currently there are no systematic integration tests between implementations 
of the C Data Interface, only a couple ad-hoc tests.
    
    ### What changes are included in this PR?
    
    1. Add Archery infrastructure for integration testing of the C Data 
Interface
    2. Add implementation of this interface for Arrow C++
    
    ### Are these changes tested?
    
    Yes, by construction.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #37537
    
    Lead-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Will Jones <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 ci/scripts/integration_arrow.sh                    |   6 +-
 cpp/src/arrow/CMakeLists.txt                       |   6 +-
 .../integration/c_data_integration_internal.cc     | 145 +++++++++++
 .../integration/c_data_integration_internal.h      |  48 ++++
 cpp/src/arrow/integration/json_integration.cc      |   7 +-
 cpp/src/arrow/symbols.map                          |   1 +
 dev/archery/archery/cli.py                         |  21 +-
 dev/archery/archery/integration/cdata.py           | 107 +++++++++
 dev/archery/archery/integration/datagen.py         | 110 ++++++---
 dev/archery/archery/integration/runner.py          | 267 ++++++++++++++++-----
 dev/archery/archery/integration/scenario.py        |   7 +-
 dev/archery/archery/integration/tester.py          | 183 +++++++++++++-
 dev/archery/archery/integration/tester_cpp.py      | 114 ++++++++-
 dev/archery/archery/integration/util.py            |   4 +-
 dev/archery/setup.py                               |  11 +-
 15 files changed, 913 insertions(+), 124 deletions(-)

diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 30cbb2d637..a165f8027b 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -22,10 +22,12 @@ set -ex
 arrow_dir=${1}
 gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
 
-pip install -e $arrow_dir/dev/archery
+pip install -e $arrow_dir/dev/archery[integration]
 
 # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
-archery integration \
+time archery integration \
+    --run-c-data \
+    --run-ipc \
     --run-flight \
     --with-cpp=1 \
     --with-csharp=1 \
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index f474d0c517..9a61170115 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -383,7 +383,11 @@ endif()
 #
 
 if(ARROW_BUILD_INTEGRATION OR ARROW_BUILD_TESTS)
-  list(APPEND ARROW_SRCS integration/json_integration.cc 
integration/json_internal.cc)
+  list(APPEND
+       ARROW_SRCS
+       integration/c_data_integration_internal.cc
+       integration/json_integration.cc
+       integration/json_internal.cc)
 endif()
 
 if(ARROW_CSV)
diff --git a/cpp/src/arrow/integration/c_data_integration_internal.cc 
b/cpp/src/arrow/integration/c_data_integration_internal.cc
new file mode 100644
index 0000000000..79e09eaf91
--- /dev/null
+++ b/cpp/src/arrow/integration/c_data_integration_internal.cc
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/integration/c_data_integration_internal.h"
+
+#include <sstream>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/integration/json_integration.h"
+#include "arrow/io/file.h"
+#include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::internal::integration {
+namespace {
+
+template <typename Func>
+const char* StatusToErrorString(Func&& func) {
+  static std::string error;
+
+  Status st = func();
+  if (st.ok()) {
+    return nullptr;
+  }
+  error = st.ToString();
+  ARROW_CHECK_GT(error.length(), 0);
+  return error.c_str();
+}
+
+Result<std::shared_ptr<Schema>> ReadSchemaFromJson(const std::string& 
json_path,
+                                                   MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool));
+  ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file));
+  return reader->schema();
+}
+
+Result<std::shared_ptr<RecordBatch>> ReadBatchFromJson(const std::string& 
json_path,
+                                                       int num_batch, 
MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool));
+  ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file));
+  return reader->ReadRecordBatch(num_batch);
+}
+
+// XXX ideally, we should allow use of a custom memory pool in the C bridge 
API,
+// but that requires non-trivial refactor
+
+Status ExportSchemaFromJson(std::string json_path, ArrowSchema* out) {
+  auto pool = default_memory_pool();
+  ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchemaFromJson(json_path, pool));
+  return ExportSchema(*schema, out);
+}
+
+Status ImportSchemaAndCompareToJson(std::string json_path, ArrowSchema* 
c_schema) {
+  auto pool = default_memory_pool();
+  ARROW_ASSIGN_OR_RAISE(auto json_schema, ReadSchemaFromJson(json_path, pool));
+  ARROW_ASSIGN_OR_RAISE(auto imported_schema, ImportSchema(c_schema));
+  if (!imported_schema->Equals(json_schema, /*check_metadata=*/true)) {
+    return Status::Invalid("Schemas are different:", "\n- Json Schema: ", 
*json_schema,
+                           "\n- Imported Schema: ", *imported_schema);
+  }
+  return Status::OK();
+}
+
+Status ExportBatchFromJson(std::string json_path, int num_batch, ArrowArray* 
out) {
+  auto pool = default_memory_pool();
+  ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch, 
pool));
+  return ExportRecordBatch(*batch, out);
+}
+
+Status ImportBatchAndCompareToJson(std::string json_path, int num_batch,
+                                   ArrowArray* c_batch) {
+  auto pool = default_memory_pool();
+  ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch, 
pool));
+  ARROW_ASSIGN_OR_RAISE(auto imported_batch, ImportRecordBatch(c_batch, 
batch->schema()));
+  RETURN_NOT_OK(imported_batch->ValidateFull());
+  if (!imported_batch->Equals(*batch, /*check_metadata=*/true)) {
+    std::stringstream pp_expected;
+    std::stringstream pp_actual;
+    PrettyPrintOptions options(/*indent=*/2);
+    options.window = 50;
+    ARROW_CHECK_OK(PrettyPrint(*batch, options, &pp_expected));
+    ARROW_CHECK_OK(PrettyPrint(*imported_batch, options, &pp_actual));
+    return Status::Invalid("Record Batches are different:", "\n- Json Batch: ",
+                           pp_expected.str(), "\n- Imported Batch: ", 
pp_actual.str());
+  }
+  return Status::OK();
+}
+
+}  // namespace
+}  // namespace arrow::internal::integration
+
+const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char* 
json_path,
+                                                           ArrowSchema* out) {
+  using namespace arrow::internal::integration;  // NOLINT(build/namespaces)
+  return StatusToErrorString([=]() { return ExportSchemaFromJson(json_path, 
out); });
+}
+
+const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char* 
json_path,
+                                                                   
ArrowSchema* schema) {
+  using namespace arrow::internal::integration;  // NOLINT(build/namespaces)
+  return StatusToErrorString(
+      [=]() { return ImportSchemaAndCompareToJson(json_path, schema); });
+}
+
+const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char* 
json_path,
+                                                          int num_batch,
+                                                          ArrowArray* out) {
+  using namespace arrow::internal::integration;  // NOLINT(build/namespaces)
+  return StatusToErrorString(
+      [=]() { return ExportBatchFromJson(json_path, num_batch, out); });
+}
+
+const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char* 
json_path,
+                                                                  int 
num_batch,
+                                                                  ArrowArray* 
batch) {
+  using namespace arrow::internal::integration;  // NOLINT(build/namespaces)
+  return StatusToErrorString(
+      [=]() { return ImportBatchAndCompareToJson(json_path, num_batch, batch); 
});
+}
+
+int64_t ArrowCpp_BytesAllocated() {
+  auto pool = arrow::default_memory_pool();
+  return pool->bytes_allocated();
+}
diff --git a/cpp/src/arrow/integration/c_data_integration_internal.h 
b/cpp/src/arrow/integration/c_data_integration_internal.h
new file mode 100644
index 0000000000..0a62363dff
--- /dev/null
+++ b/cpp/src/arrow/integration/c_data_integration_internal.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/c/abi.h"
+#include "arrow/util/visibility.h"
+
+// This file only serves as documentation for the C Data Interface integration
+// entrypoints. The actual functions are called by Archery through DLL symbol 
lookup.
+
+extern "C" {
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char* 
json_path,
+                                                           ArrowSchema* out);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char* 
json_path,
+                                                                   
ArrowSchema* schema);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char* 
json_path,
+                                                          int num_batch, 
ArrowArray* out);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char* 
json_path,
+                                                                  int 
num_batch,
+                                                                  ArrowArray* 
batch);
+
+ARROW_EXPORT
+int64_t ArrowCpp_BytesAllocated();
+
+}  // extern "C"
diff --git a/cpp/src/arrow/integration/json_integration.cc 
b/cpp/src/arrow/integration/json_integration.cc
index 178abe5e8b..590f6eddd7 100644
--- a/cpp/src/arrow/integration/json_integration.cc
+++ b/cpp/src/arrow/integration/json_integration.cc
@@ -144,10 +144,9 @@ class IntegrationJsonReader::Impl {
   }
 
   Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) {
-    DCHECK_GE(i, 0) << "i out of bounds";
-    DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
-        << "i out of bounds";
-
+    if (i < 0 || i >= static_cast<int>(record_batches_->GetArray().Size())) {
+      return Status::IndexError("record batch index ", i, " out of bounds");
+    }
     return json::ReadRecordBatch(record_batches_->GetArray()[i], schema_,
                                  &dictionary_memo_, pool_);
   }
diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map
index 9ef0e404bc..0144e61165 100644
--- a/cpp/src/arrow/symbols.map
+++ b/cpp/src/arrow/symbols.map
@@ -32,6 +32,7 @@
     };
     # Also export C-level helpers
     arrow_*;
+    Arrow*;
     # ARROW-14771: export Protobuf symbol table
     descriptor_table_Flight_2eproto;
     descriptor_table_FlightSql_2eproto;
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index 70f865cc2f..7a3b45f978 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -723,8 +723,12 @@ def _set_default(opt, default):
               envvar="ARCHERY_INTEGRATION_WITH_RUST")
 @click.option('--write_generated_json', default="",
               help='Generate test JSON to indicated path')
[email protected]('--run-ipc', is_flag=True, default=False,
+              help='Run IPC integration tests')
 @click.option('--run-flight', is_flag=True, default=False,
               help='Run Flight integration tests')
[email protected]('--run-c-data', is_flag=True, default=False,
+              help='Run C Data Interface integration tests')
 @click.option('--debug', is_flag=True, default=False,
               help='Run executables in debug mode as relevant')
 @click.option('--serial', is_flag=True, default=False,
@@ -753,15 +757,19 @@ def integration(with_all=False, random_seed=12345, 
**args):
     gen_path = args['write_generated_json']
 
     languages = ['cpp', 'csharp', 'java', 'js', 'go', 'rust']
+    formats = ['ipc', 'flight', 'c_data']
 
     enabled_languages = 0
     for lang in languages:
-        param = 'with_{}'.format(lang)
+        param = f'with_{lang}'
         if with_all:
             args[param] = with_all
+        enabled_languages += args[param]
 
-        if args[param]:
-            enabled_languages += 1
+    enabled_formats = 0
+    for fmt in formats:
+        param = f'run_{fmt}'
+        enabled_formats += args[param]
 
     if gen_path:
         # XXX See GH-37575: this option is only used by the JS test suite
@@ -769,8 +777,13 @@ def integration(with_all=False, random_seed=12345, **args):
         os.makedirs(gen_path, exist_ok=True)
         write_js_test_json(gen_path)
     else:
+        if enabled_formats == 0:
+            raise click.UsageError(
+                "Need to enable at least one format to test "
+                "(IPC, Flight, C Data Interface); try --help")
         if enabled_languages == 0:
-            raise Exception("Must enable at least 1 language to test")
+            raise click.UsageError(
+                "Need to enable at least one language to test; try --help")
         run_all_tests(**args)
 
 
diff --git a/dev/archery/archery/integration/cdata.py 
b/dev/archery/archery/integration/cdata.py
new file mode 100644
index 0000000000..c201f5f867
--- /dev/null
+++ b/dev/archery/archery/integration/cdata.py
@@ -0,0 +1,107 @@
+# licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cffi
+from contextlib import contextmanager
+import functools
+
+from .tester import CDataExporter, CDataImporter
+
+
+_c_data_decls = """
+    struct ArrowSchema {
+      // Array type description
+      const char* format;
+      const char* name;
+      const char* metadata;
+      int64_t flags;
+      int64_t n_children;
+      struct ArrowSchema** children;
+      struct ArrowSchema* dictionary;
+
+      // Release callback
+      void (*release)(struct ArrowSchema*);
+      // Opaque producer-specific data
+      void* private_data;
+    };
+
+    struct ArrowArray {
+      // Array data description
+      int64_t length;
+      int64_t null_count;
+      int64_t offset;
+      int64_t n_buffers;
+      int64_t n_children;
+      const void** buffers;
+      struct ArrowArray** children;
+      struct ArrowArray* dictionary;
+
+      // Release callback
+      void (*release)(struct ArrowArray*);
+      // Opaque producer-specific data
+      void* private_data;
+    };
+
+    struct ArrowArrayStream {
+      int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+      int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+      const char* (*get_last_error)(struct ArrowArrayStream*);
+
+      // Release callback
+      void (*release)(struct ArrowArrayStream*);
+      // Opaque producer-specific data
+      void* private_data;
+    };
+    """
+
+
[email protected]_cache
+def ffi() -> cffi.FFI:
+    """
+    Return a FFI object supporting C Data Interface types.
+    """
+    ffi = cffi.FFI()
+    ffi.cdef(_c_data_decls)
+    return ffi
+
+
+@contextmanager
+def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
+    """
+    A context manager for memory release checks.
+
+    The context manager arranges cooperation between the exporter and importer
+    to try and release memory at the end of the enclosed block.
+
+    However, if either the exporter or importer doesn't support deterministic
+    memory release, no memory check is performed.
+    """
+    do_check = (exporter.supports_releasing_memory and
+                importer.supports_releasing_memory)
+    if do_check:
+        before = exporter.record_allocation_state()
+    yield
+    # We don't use a `finally` clause: if the enclosed block raised an
+    # exception, no need to add another one.
+    if do_check:
+        ok = exporter.compare_allocation_state(before, importer.gc_until)
+        if not ok:
+            after = exporter.record_allocation_state()
+            raise RuntimeError(
+                f"Memory was not released correctly after roundtrip: "
+                f"before = {before}, after = {after} (should have been equal)")
diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index f924c8a73c..53f7ba58bf 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -25,6 +25,7 @@ import tempfile
 import numpy as np
 
 from .util import frombytes, tobytes, random_bytes, random_utf8
+from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY
 
 
 def metadata_key_values(pairs):
@@ -1224,15 +1225,16 @@ class RecordBatch(object):
 class File(object):
 
     def __init__(self, name, schema, batches, dictionaries=None,
-                 skip=None, path=None, quirks=None):
+                 skip_testers=None, path=None, quirks=None):
         self.name = name
         self.schema = schema
         self.dictionaries = dictionaries or []
         self.batches = batches
-        self.skip = set()
+        self.skipped_testers = set()
+        self.skipped_formats = {}
         self.path = path
-        if skip:
-            self.skip.update(skip)
+        if skip_testers:
+            self.skipped_testers.update(skip_testers)
         # For tracking flags like whether to validate decimal values
         # fit into the given precision (ARROW-13558).
         self.quirks = set()
@@ -1258,14 +1260,39 @@ class File(object):
             f.write(json.dumps(self.get_json(), indent=2).encode('utf-8'))
         self.path = path
 
-    def skip_category(self, category):
-        """Skip this test for the given category.
+    def skip_tester(self, tester):
+        """Skip this test for the given tester (such as 'C#').
+        """
+        self.skipped_testers.add(tester)
+        return self
 
-        Category should be SKIP_ARROW or SKIP_FLIGHT.
+    def skip_format(self, format, tester='all'):
+        """Skip this test for the given format, and optionally tester.
         """
-        self.skip.add(category)
+        self.skipped_formats.setdefault(format, set()).add(tester)
         return self
 
+    def add_skips_from(self, other_file):
+        """Add skips from another File object.
+        """
+        self.skipped_testers.update(other_file.skipped_testers)
+        for format, testers in other_file.skipped_formats.items():
+            self.skipped_formats.setdefault(format, set()).update(testers)
+
+    def should_skip(self, tester, format):
+        """Whether this (tester, format) combination should be skipped.
+        """
+        if tester in self.skipped_testers:
+            return True
+        testers = self.skipped_formats.get(format, ())
+        return 'all' in testers or tester in testers
+
+    @property
+    def num_batches(self):
+        """The number of record batches in this file.
+        """
+        return len(self.batches)
+
 
 def get_field(name, type_, **kwargs):
     if type_ == 'binary':
@@ -1295,8 +1322,8 @@ def get_field(name, type_, **kwargs):
         raise TypeError(dtype)
 
 
-def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None,
-                   metadata=None):
+def _generate_file(name, fields, batch_sizes, *,
+                   dictionaries=None, metadata=None):
     schema = Schema(fields, metadata=metadata)
     batches = []
     for size in batch_sizes:
@@ -1307,7 +1334,7 @@ def _generate_file(name, fields, batch_sizes, 
dictionaries=None, skip=None,
 
         batches.append(RecordBatch(size, columns))
 
-    return File(name, schema, batches, dictionaries, skip=skip)
+    return File(name, schema, batches, dictionaries)
 
 
 def generate_custom_metadata_case():
@@ -1666,8 +1693,8 @@ def get_generated_json_files(tempdir=None):
         generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
 
         generate_primitive_large_offsets_case([17, 20])
-        .skip_category('C#')
-        .skip_category('JS'),
+        .skip_tester('C#')
+        .skip_tester('JS'),
 
         generate_null_case([10, 0]),
 
@@ -1676,66 +1703,71 @@ def get_generated_json_files(tempdir=None):
         generate_decimal128_case(),
 
         generate_decimal256_case()
-        .skip_category('JS'),
+        .skip_tester('JS'),
 
         generate_datetime_case(),
 
         generate_duration_case()
-        .skip_category('C#')
-        .skip_category('JS'),  # TODO(ARROW-5239): Intervals + JS
+        .skip_tester('C#')
+        .skip_tester('JS'),  # TODO(ARROW-5239): Intervals + JS
 
         generate_interval_case()
-        .skip_category('C#')
-        .skip_category('JS'),  # TODO(ARROW-5239): Intervals + JS
+        .skip_tester('C#')
+        .skip_tester('JS'),  # TODO(ARROW-5239): Intervals + JS
 
         generate_month_day_nano_interval_case()
-        .skip_category('C#')
-        .skip_category('JS'),
+        .skip_tester('C#')
+        .skip_tester('JS'),
 
         generate_map_case()
-        .skip_category('C#'),
+        .skip_tester('C#'),
 
         generate_non_canonical_map_case()
-        .skip_category('C#')
-        .skip_category('Java'),   # TODO(ARROW-8715)
+        .skip_tester('C#')
+        .skip_tester('Java')  # TODO(ARROW-8715)
+        # Canonical map names are restored on import, so the schemas are 
unequal
+        .skip_format(SKIP_C_SCHEMA, 'C++'),
 
         generate_nested_case(),
 
         generate_recursive_nested_case(),
 
         generate_nested_large_offsets_case()
-        .skip_category('C#')
-        .skip_category('JS'),
+        .skip_tester('C#')
+        .skip_tester('JS'),
 
         generate_unions_case()
-        .skip_category('C#'),
+        .skip_tester('C#'),
 
         generate_custom_metadata_case()
-        .skip_category('C#'),
+        .skip_tester('C#'),
 
         generate_duplicate_fieldnames_case()
-        .skip_category('C#')
-        .skip_category('JS'),
+        .skip_tester('C#')
+        .skip_tester('JS'),
 
         generate_dictionary_case()
-        .skip_category('C#'),
+        .skip_tester('C#'),
 
         generate_dictionary_unsigned_case()
-        .skip_category('C#')
-        .skip_category('Java'),  # TODO(ARROW-9377)
+        .skip_tester('C#')
+        .skip_tester('Java'),  # TODO(ARROW-9377)
 
         generate_nested_dictionary_case()
-        .skip_category('C#')
-        .skip_category('Java'),  # TODO(ARROW-7779)
+        .skip_tester('C#')
+        .skip_tester('Java'),  # TODO(ARROW-7779)
 
         generate_run_end_encoded_case()
-        .skip_category('C#')
-        .skip_category('Java')
-        .skip_category('JS')
-        .skip_category('Rust'),
+        .skip_tester('C#')
+        .skip_tester('Java')
+        .skip_tester('JS')
+        .skip_tester('Rust'),
 
         generate_extension_case()
-        .skip_category('C#'),
+        .skip_tester('C#')
+        # TODO: ensure the extension is registered in the C++ entrypoint
+        .skip_format(SKIP_C_SCHEMA, 'C++')
+        .skip_format(SKIP_C_ARRAY, 'C++'),
     ]
 
     generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 0ee9ab814e..2fd1d2d7f0 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -25,17 +25,19 @@ import os
 import sys
 import tempfile
 import traceback
-from typing import Callable, List
+from typing import Callable, List, Optional
 
+from . import cdata
 from .scenario import Scenario
-from .tester import Tester
-from .tester_cpp import CPPTester
+from .tester import Tester, CDataExporter, CDataImporter
+from .tester_cpp import CppTester
 from .tester_go import GoTester
 from .tester_rust import RustTester
 from .tester_java import JavaTester
 from .tester_js import JSTester
 from .tester_csharp import CSharpTester
-from .util import guid, SKIP_ARROW, SKIP_FLIGHT, printer
+from .util import guid, printer
+from .util import SKIP_C_ARRAY, SKIP_C_SCHEMA, SKIP_FLIGHT, SKIP_IPC
 from ..utils.source import ARROW_ROOT_DEFAULT
 from . import datagen
 
@@ -76,7 +78,7 @@ class IntegrationRunner(object):
             self.json_files = [json_file for json_file in self.json_files
                                if self.match in json_file.name]
 
-    def run(self):
+    def run_ipc(self):
         """
         Run Arrow IPC integration tests for the matrix of enabled
         implementations.
@@ -84,23 +86,24 @@ class IntegrationRunner(object):
         for producer, consumer in itertools.product(
                 filter(lambda t: t.PRODUCER, self.testers),
                 filter(lambda t: t.CONSUMER, self.testers)):
-            self._compare_implementations(
+            self._compare_ipc_implementations(
                 producer, consumer, self._produce_consume,
                 self.json_files)
         if self.gold_dirs:
             for gold_dir, consumer in itertools.product(
                     self.gold_dirs,
                     filter(lambda t: t.CONSUMER, self.testers)):
-                log('\n\n\n\n')
+                log('\n')
                 log('******************************************************')
                 log('Tests against golden files in {}'.format(gold_dir))
                 log('******************************************************')
 
                 def run_gold(_, consumer, test_case: datagen.File):
                     return self._run_gold(gold_dir, consumer, test_case)
-                self._compare_implementations(
+                self._compare_ipc_implementations(
                     consumer, consumer, run_gold,
                     self._gold_tests(gold_dir))
+        log('\n')
 
     def run_flight(self):
         """
@@ -112,6 +115,18 @@ class IntegrationRunner(object):
                          self.testers)
         for server, client in itertools.product(servers, clients):
             self._compare_flight_implementations(server, client)
+        log('\n')
+
+    def run_c_data(self):
+        """
+        Run Arrow C Data interface integration tests for the matrix of
+        enabled implementations.
+        """
+        for producer, consumer in itertools.product(
+                filter(lambda t: t.C_DATA_EXPORTER, self.testers),
+                filter(lambda t: t.C_DATA_IMPORTER, self.testers)):
+            self._compare_c_data_implementations(producer, consumer)
+        log('\n')
 
     def _gold_tests(self, gold_dir):
         prefix = os.path.basename(os.path.normpath(gold_dir))
@@ -125,28 +140,31 @@ class IntegrationRunner(object):
                 with open(out_path, "wb") as out:
                     out.write(i.read())
 
+            # Find the generated file with the same name as this gold file
             try:
-                skip = next(f for f in self.json_files
-                            if f.name == name).skip
+                equiv_json_file = next(f for f in self.json_files
+                                       if f.name == name)
             except StopIteration:
-                skip = set()
+                equiv_json_file = None
+
+            skip_testers = set()
             if name == 'union' and prefix == '0.17.1':
-                skip.add("Java")
-                skip.add("JS")
+                skip_testers.add("Java")
+                skip_testers.add("JS")
             if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian':
-                skip.add("C#")
-                skip.add("Java")
-                skip.add("JS")
-                skip.add("Rust")
+                skip_testers.add("C#")
+                skip_testers.add("Java")
+                skip_testers.add("JS")
+                skip_testers.add("Rust")
             if prefix == '2.0.0-compression':
-                skip.add("C#")
-                skip.add("JS")
+                skip_testers.add("C#")
+                skip_testers.add("JS")
 
             # See https://github.com/apache/arrow/pull/9822 for how to
             # disable specific compression type tests.
 
             if prefix == '4.0.0-shareddict':
-                skip.add("C#")
+                skip_testers.add("C#")
 
             quirks = set()
             if prefix in {'0.14.1', '0.17.1',
@@ -157,12 +175,18 @@ class IntegrationRunner(object):
                 quirks.add("no_date64_validate")
                 quirks.add("no_times_validate")
 
-            yield datagen.File(name, None, None, skip=skip, path=out_path,
-                               quirks=quirks)
+            json_file = datagen.File(name, schema=None, batches=None,
+                                     path=out_path,
+                                     skip_testers=skip_testers,
+                                     quirks=quirks)
+            if equiv_json_file is not None:
+                json_file.add_skips_from(equiv_json_file)
+            yield json_file
 
     def _run_test_cases(self,
                         case_runner: Callable[[datagen.File], Outcome],
-                        test_cases: List[datagen.File]) -> None:
+                        test_cases: List[datagen.File],
+                        *, serial: Optional[bool] = None) -> None:
         """
         Populate self.failures with the outcomes of the
         ``case_runner`` ran against ``test_cases``
@@ -171,10 +195,13 @@ class IntegrationRunner(object):
             with printer.cork():
                 return case_runner(test_case)
 
+        if serial is None:
+            serial = self.serial
+
         if self.failures and self.stop_on_error:
             return
 
-        if self.serial:
+        if serial:
             for outcome in map(case_wrapper, test_cases):
                 if outcome.failure is not None:
                     self.failures.append(outcome.failure)
@@ -189,7 +216,7 @@ class IntegrationRunner(object):
                         if self.stop_on_error:
                             break
 
-    def _compare_implementations(
+    def _compare_ipc_implementations(
         self,
         producer: Tester,
         consumer: Tester,
@@ -221,22 +248,17 @@ class IntegrationRunner(object):
         outcome = Outcome()
 
         json_path = test_case.path
-        log('==========================================================')
+        log('=' * 70)
         log('Testing file {0}'.format(json_path))
-        log('==========================================================')
-
-        if producer.name in test_case.skip:
-            log('-- Skipping test because producer {0} does '
-                'not support'.format(producer.name))
-            outcome.skipped = True
 
-        elif consumer.name in test_case.skip:
-            log('-- Skipping test because consumer {0} does '
-                'not support'.format(consumer.name))
+        if test_case.should_skip(producer.name, SKIP_IPC):
+            log(f'-- Skipping test because producer {producer.name} does '
+                f'not support IPC')
             outcome.skipped = True
 
-        elif SKIP_ARROW in test_case.skip:
-            log('-- Skipping test')
+        elif test_case.should_skip(consumer.name, SKIP_IPC):
+            log(f'-- Skipping test because consumer {consumer.name} does '
+                f'not support IPC')
             outcome.skipped = True
 
         else:
@@ -247,6 +269,8 @@ class IntegrationRunner(object):
                 outcome.failure = Failure(test_case, producer, consumer,
                                           sys.exc_info())
 
+        log('=' * 70)
+
         return outcome
 
     def _produce_consume(self,
@@ -344,22 +368,17 @@ class IntegrationRunner(object):
         """
         outcome = Outcome()
 
-        log('=' * 58)
+        log('=' * 70)
         log('Testing file {0}'.format(test_case.name))
-        log('=' * 58)
-
-        if producer.name in test_case.skip:
-            log('-- Skipping test because producer {0} does '
-                'not support'.format(producer.name))
-            outcome.skipped = True
 
-        elif consumer.name in test_case.skip:
-            log('-- Skipping test because consumer {0} does '
-                'not support'.format(consumer.name))
+        if test_case.should_skip(producer.name, SKIP_FLIGHT):
+            log(f'-- Skipping test because producer {producer.name} does '
+                f'not support Flight')
             outcome.skipped = True
 
-        elif SKIP_FLIGHT in test_case.skip:
-            log('-- Skipping test')
+        elif test_case.should_skip(consumer.name, SKIP_FLIGHT):
+            log(f'-- Skipping test because consumer {consumer.name} does '
+                f'not support Flight')
             outcome.skipped = True
 
         else:
@@ -380,6 +399,125 @@ class IntegrationRunner(object):
                 outcome.failure = Failure(test_case, producer, consumer,
                                           sys.exc_info())
 
+        log('=' * 70)
+
+        return outcome
+
+    def _compare_c_data_implementations(
+        self,
+        producer: Tester,
+        consumer: Tester
+    ):
+        log('##########################################################')
+        log(f'C Data Interface: '
+            f'{producer.name} exporting, {consumer.name} importing')
+        log('##########################################################')
+
+        # Serial execution is required for proper memory accounting
+        serial = True
+
+        exporter = producer.make_c_data_exporter()
+        importer = consumer.make_c_data_importer()
+
+        case_runner = partial(self._run_c_schema_test_case, producer, consumer,
+                              exporter, importer)
+        self._run_test_cases(case_runner, self.json_files, serial=serial)
+
+        case_runner = partial(self._run_c_array_test_cases, producer, consumer,
+                              exporter, importer)
+        self._run_test_cases(case_runner, self.json_files, serial=serial)
+
+    def _run_c_schema_test_case(self,
+                                producer: Tester, consumer: Tester,
+                                exporter: CDataExporter,
+                                importer: CDataImporter,
+                                test_case: datagen.File) -> Outcome:
+        """
+        Run one C ArrowSchema test case.
+        """
+        outcome = Outcome()
+
+        def do_run():
+            json_path = test_case.path
+            ffi = cdata.ffi()
+            c_schema_ptr = ffi.new("struct ArrowSchema*")
+            with cdata.check_memory_released(exporter, importer):
+                exporter.export_schema_from_json(json_path, c_schema_ptr)
+                importer.import_schema_and_compare_to_json(json_path, 
c_schema_ptr)
+
+        log('=' * 70)
+        log(f'Testing C ArrowSchema from file {test_case.name!r}')
+
+        if test_case.should_skip(producer.name, SKIP_C_SCHEMA):
+            log(f'-- Skipping test because producer {producer.name} does '
+                f'not support C ArrowSchema')
+            outcome.skipped = True
+
+        elif test_case.should_skip(consumer.name, SKIP_C_SCHEMA):
+            log(f'-- Skipping test because consumer {consumer.name} does '
+                f'not support C ArrowSchema')
+            outcome.skipped = True
+
+        else:
+            try:
+                do_run()
+            except Exception:
+                traceback.print_exc(file=printer.stdout)
+                outcome.failure = Failure(test_case, producer, consumer,
+                                          sys.exc_info())
+
+        log('=' * 70)
+
+        return outcome
+
+    def _run_c_array_test_cases(self,
+                                producer: Tester, consumer: Tester,
+                                exporter: CDataExporter,
+                                importer: CDataImporter,
+                                test_case: datagen.File) -> Outcome:
+        """
+        Run one set C ArrowArray test cases.
+        """
+        outcome = Outcome()
+
+        def do_run():
+            json_path = test_case.path
+            ffi = cdata.ffi()
+            c_array_ptr = ffi.new("struct ArrowArray*")
+            for num_batch in range(test_case.num_batches):
+                log(f'... with record batch #{num_batch}')
+                with cdata.check_memory_released(exporter, importer):
+                    exporter.export_batch_from_json(json_path,
+                                                    num_batch,
+                                                    c_array_ptr)
+                    importer.import_batch_and_compare_to_json(json_path,
+                                                              num_batch,
+                                                              c_array_ptr)
+
+        log('=' * 70)
+        log(f'Testing C ArrowArray '
+            f'from file {test_case.name!r}')
+
+        if test_case.should_skip(producer.name, SKIP_C_ARRAY):
+            log(f'-- Skipping test because producer {producer.name} does '
+                f'not support C ArrowArray')
+            outcome.skipped = True
+
+        elif test_case.should_skip(consumer.name, SKIP_C_ARRAY):
+            log(f'-- Skipping test because consumer {consumer.name} does '
+                f'not support C ArrowArray')
+            outcome.skipped = True
+
+        else:
+            try:
+                do_run()
+            except Exception:
+                traceback.print_exc(file=printer.stdout)
+                outcome.failure = Failure(test_case, producer, consumer,
+                                          sys.exc_info())
+
+        log('=' * 70)
+
         return outcome
 
 
@@ -387,7 +525,7 @@ def get_static_json_files():
     glob_pattern = os.path.join(ARROW_ROOT_DEFAULT,
                                 'integration', 'data', '*.json')
     return [
-        datagen.File(name=os.path.basename(p), path=p, skip=set(),
+        datagen.File(name=os.path.basename(p), path=p,
                      schema=None, batches=None)
         for p in glob.glob(glob_pattern)
     ]
@@ -395,13 +533,14 @@ def get_static_json_files():
 
 def run_all_tests(with_cpp=True, with_java=True, with_js=True,
                   with_csharp=True, with_go=True, with_rust=False,
-                  run_flight=False, tempdir=None, **kwargs):
+                  run_ipc=False, run_flight=False, run_c_data=False,
+                  tempdir=None, **kwargs):
     tempdir = tempdir or tempfile.mkdtemp(prefix='arrow-integration-')
 
     testers: List[Tester] = []
 
     if with_cpp:
-        testers.append(CPPTester(**kwargs))
+        testers.append(CppTester(**kwargs))
 
     if with_java:
         testers.append(JavaTester(**kwargs))
@@ -434,54 +573,57 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
         Scenario(
             "ordered",
             description="Ensure FlightInfo.ordered is supported.",
-            skip={"JS", "C#", "Rust"},
+            skip_testers={"JS", "C#", "Rust"},
         ),
         Scenario(
             "expiration_time:do_get",
             description=("Ensure FlightEndpoint.expiration_time with "
                          "DoGet is working as expected."),
-            skip={"JS", "C#", "Rust"},
+            skip_testers={"JS", "C#", "Rust"},
         ),
         Scenario(
             "expiration_time:list_actions",
             description=("Ensure FlightEndpoint.expiration_time related "
                          "pre-defined actions is working with ListActions "
                          "as expected."),
-            skip={"JS", "C#", "Rust"},
+            skip_testers={"JS", "C#", "Rust"},
         ),
         Scenario(
             "expiration_time:cancel_flight_info",
             description=("Ensure FlightEndpoint.expiration_time and "
                          "CancelFlightInfo are working as expected."),
-            skip={"JS", "C#", "Rust"},
+            skip_testers={"JS", "C#", "Rust"},
         ),
         Scenario(
             "expiration_time:renew_flight_endpoint",
             description=("Ensure FlightEndpoint.expiration_time and "
                          "RenewFlightEndpoint are working as expected."),
-            skip={"JS", "C#", "Rust"},
+            skip_testers={"JS", "C#", "Rust"},
         ),
         Scenario(
             "poll_flight_info",
             description="Ensure PollFlightInfo is supported.",
-            skip={"JS", "C#", "Rust"}
+            skip_testers={"JS", "C#", "Rust"}
         ),
         Scenario(
             "flight_sql",
             description="Ensure Flight SQL protocol is working as expected.",
-            skip={"Rust"}
+            skip_testers={"Rust"}
         ),
         Scenario(
             "flight_sql:extension",
             description="Ensure Flight SQL extensions work as expected.",
-            skip={"Rust"}
+            skip_testers={"Rust"}
         ),
     ]
 
     runner = IntegrationRunner(json_files, flight_scenarios, testers, **kwargs)
-    runner.run()
+    if run_ipc:
+        runner.run_ipc()
     if run_flight:
         runner.run_flight()
+    if run_c_data:
+        runner.run_c_data()
 
     fail_count = 0
     if runner.failures:
@@ -492,7 +634,8 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
             log(test_case.name, producer.name, "producing, ",
                 consumer.name, "consuming")
             if exc_info:
-                traceback.print_exception(*exc_info)
+                exc_type, exc_value, exc_tb = exc_info
+                log(f'{exc_type}: {exc_value}')
             log()
 
     log(fail_count, "failures")
diff --git a/dev/archery/archery/integration/scenario.py 
b/dev/archery/archery/integration/scenario.py
index 1fcbca64e6..89c64452e5 100644
--- a/dev/archery/archery/integration/scenario.py
+++ b/dev/archery/archery/integration/scenario.py
@@ -23,7 +23,10 @@ class Scenario:
     Does not correspond to a particular IPC JSON file.
     """
 
-    def __init__(self, name, description, skip=None):
+    def __init__(self, name, description, skip_testers=None):
         self.name = name
         self.description = description
-        self.skip = skip or set()
+        self.skipped_testers = skip_testers or set()
+
+    def should_skip(self, tester, format):
+        return tester in self.skipped_testers
diff --git a/dev/archery/archery/integration/tester.py 
b/dev/archery/archery/integration/tester.py
index 54bfe621ef..6a3061992d 100644
--- a/dev/archery/archery/integration/tester.py
+++ b/dev/archery/archery/integration/tester.py
@@ -17,12 +17,181 @@
 
 # Base class for language-specific integration test harnesses
 
+from abc import ABC, abstractmethod
+import os
 import subprocess
+import typing
 
 from .util import log
 
 
-class Tester(object):
+_Predicate = typing.Callable[[], bool]
+
+
+class CDataExporter(ABC):
+
+    @abstractmethod
+    def export_schema_from_json(self, json_path: os.PathLike,
+                                c_schema_ptr: object):
+        """
+        Read a JSON integration file and export its schema.
+
+        Parameters
+        ----------
+        json_path : Path
+            Path to the JSON file
+        c_schema_ptr : cffi pointer value
+            Pointer to the ``ArrowSchema`` struct to export to.
+        """
+
+    @abstractmethod
+    def export_batch_from_json(self, json_path: os.PathLike,
+                               num_batch: int,
+                               c_array_ptr: object):
+        """
+        Read a JSON integration file and export one of its batches.
+
+        Parameters
+        ----------
+        json_path : Path
+            Path to the JSON file
+        num_batch : int
+            Number of the record batch in the JSON file
+        c_schema_ptr : cffi pointer value
+            Pointer to the ``ArrowArray`` struct to export to.
+        """
+
+    @property
+    @abstractmethod
+    def supports_releasing_memory(self) -> bool:
+        """
+        Whether the implementation is able to release memory deterministically.
+
+        Here, "release memory" means that, after the `release` callback of
+        a C Data Interface export is called, `compare_allocation_state` is
+        able to trigger the deallocation of the memory underlying the export
+        (for example buffer data).
+
+        If false, then `record_allocation_state` and `compare_allocation_state`
+        are allowed to raise NotImplementedError.
+        """
+
+    def record_allocation_state(self) -> object:
+        """
+        Record the current memory allocation state.
+
+        Returns
+        -------
+        state : object
+            Opaque object representing the allocation state,
+            for example the number of allocated bytes.
+        """
+        raise NotImplementedError
+
+    def compare_allocation_state(self, recorded: object,
+                                 gc_until: typing.Callable[[_Predicate], bool]
+                                 ) -> bool:
+        """
+        Compare the current memory allocation state with the recorded one.
+
+        Parameters
+        ----------
+        recorded : object
+            The previous allocation state returned by
+            `record_allocation_state()`
+        gc_until : callable
+            A callable itself accepting a callable predicate, and
+            returning a boolean.
+            `gc_until` should try to release memory until the predicate
+            becomes true, or until it decides to give up. The final value
+            of the predicate should be returned.
+            `gc_until` is typically provided by the C Data Interface importer.
+
+        Returns
+        -------
+        success : bool
+            Whether memory allocation state finally reached its previously
+            recorded value.
+        """
+        raise NotImplementedError
+
+
+class CDataImporter(ABC):
+
+    @abstractmethod
+    def import_schema_and_compare_to_json(self, json_path: os.PathLike,
+                                          c_schema_ptr: object):
+        """
+        Import schema and compare it to the schema of a JSON integration file.
+
+        An error is raised if importing fails or the schemas differ.
+
+        Parameters
+        ----------
+        json_path : Path
+            The path to the JSON file
+        c_schema_ptr : cffi pointer value
+            Pointer to the ``ArrowSchema`` struct to import from.
+        """
+
+    @abstractmethod
+    def import_batch_and_compare_to_json(self, json_path: os.PathLike,
+                                         num_batch: int,
+                                         c_array_ptr: object):
+        """
+        Import record batch and compare it to one of the batches
+        from a JSON integration file.
+
+        The schema used for importing the record batch is the one from
+        the JSON file.
+
+        An error is raised if importing fails or the batches differ.
+
+        Parameters
+        ----------
+        json_path : Path
+            The path to the JSON file
+        num_batch : int
+            Number of the record batch in the JSON file
+        c_array_ptr : cffi pointer value
+            Pointer to the ``ArrowArray`` struct to import from.
+        """
+
+    @property
+    @abstractmethod
+    def supports_releasing_memory(self) -> bool:
+        """
+        Whether the implementation is able to release memory deterministically.
+
+        Here, "release memory" means calling the `release` callback of
+        a C Data Interface export (which should then trigger a deallocation
+        mechanism on the exporter).
+
+        If false, then `gc_until` is allowed to raise NotImplementedError.
+        """
+
+    def gc_until(self, predicate: _Predicate):
+        """
+        Try to release memory until the predicate becomes true, or fail.
+
+        Depending on the CDataImporter implementation, this may for example
+        try once, or run a garbage collector a given number of times, or
+        any other implementation-specific strategy for releasing memory.
+
+        The running time should be kept reasonable and compatible with
+        execution of multiple C Data integration tests.
+
+        This should not raise if `supports_releasing_memory` is true.
+
+        Returns
+        -------
+        success : bool
+            The final value of the predicate.
+        """
+        raise NotImplementedError
+
+
+class Tester:
     """
     The interface to declare a tester to run integration tests against.
     """
@@ -34,8 +203,12 @@ class Tester(object):
     FLIGHT_SERVER = False
     # whether the language supports receiving Flight
     FLIGHT_CLIENT = False
+    # whether the language supports the C Data Interface as an exporter
+    C_DATA_EXPORTER = False
+    # whether the language supports the C Data Interface as an importer
+    C_DATA_IMPORTER = False
 
-    # the name shown in the logs
+    # the name used for skipping and shown in the logs
     name = "unknown"
 
     def __init__(self, debug=False, **args):
@@ -85,3 +258,9 @@ class Tester(object):
 
     def flight_request(self, port, json_path=None, scenario_name=None):
         raise NotImplementedError
+
+    def make_c_data_exporter(self) -> CDataExporter:
+        raise NotImplementedError
+
+    def make_c_data_importer(self) -> CDataImporter:
+        raise NotImplementedError
diff --git a/dev/archery/archery/integration/tester_cpp.py 
b/dev/archery/archery/integration/tester_cpp.py
index 52cc565dc0..9ddc3c4800 100644
--- a/dev/archery/archery/integration/tester_cpp.py
+++ b/dev/archery/archery/integration/tester_cpp.py
@@ -16,10 +16,12 @@
 # under the License.
 
 import contextlib
+import functools
 import os
 import subprocess
 
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
 from .util import run_cmd, log
 from ..utils.source import ARROW_ROOT_DEFAULT
 
@@ -39,12 +41,19 @@ _FLIGHT_CLIENT_CMD = [
     "localhost",
 ]
 
+_dll_suffix = ".dll" if os.name == "nt" else ".so"
 
-class CPPTester(Tester):
+_DLL_PATH = _EXE_PATH
+_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + _dll_suffix)
+
+
+class CppTester(Tester):
     PRODUCER = True
     CONSUMER = True
     FLIGHT_SERVER = True
     FLIGHT_CLIENT = True
+    C_DATA_EXPORTER = True
+    C_DATA_IMPORTER = True
 
     name = 'C++'
 
@@ -133,3 +142,104 @@ class CPPTester(Tester):
         if self.debug:
             log(' '.join(cmd))
         run_cmd(cmd)
+
+    def make_c_data_exporter(self):
+        return CppCDataExporter(self.debug, self.args)
+
+    def make_c_data_importer(self):
+        return CppCDataImporter(self.debug, self.args)
+
+
+_cpp_c_data_entrypoints = """
+    const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(
+        const char* json_path, struct ArrowSchema* out);
+    const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(
+        const char* json_path, struct ArrowSchema* schema);
+
+    const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(
+        const char* json_path, int num_batch, struct ArrowArray* out);
+    const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(
+        const char* json_path, int num_batch, struct ArrowArray* batch);
+
+    int64_t ArrowCpp_BytesAllocated();
+    """
+
+
[email protected]_cache
+def _load_ffi(ffi, lib_path=_ARROW_DLL):
+    ffi.cdef(_cpp_c_data_entrypoints)
+    dll = ffi.dlopen(lib_path)
+    dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson
+    return dll
+
+
+class _CDataBase:
+
+    def __init__(self, debug, args):
+        self.debug = debug
+        self.args = args
+        self.ffi = cdata.ffi()
+        self.dll = _load_ffi(self.ffi)
+
+    def _check_c_error(self, c_error):
+        """
+        Check a `const char*` error return from an integration entrypoint.
+
+        A null means success, a non-empty string is an error message.
+        The string is statically allocated on the C++ side.
+        """
+        assert self.ffi.typeof(c_error) is self.ffi.typeof("const char*")
+        if c_error != self.ffi.NULL:
+            error = self.ffi.string(c_error).decode('utf8',
+                                                    errors='replace')
+            raise RuntimeError(
+                f"C++ C Data Integration call failed: {error}")
+
+
+class CppCDataExporter(CDataExporter, _CDataBase):
+
+    def export_schema_from_json(self, json_path, c_schema_ptr):
+        c_error = self.dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson(
+            str(json_path).encode(), c_schema_ptr)
+        self._check_c_error(c_error)
+
+    def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+        c_error = self.dll.ArrowCpp_CDataIntegration_ExportBatchFromJson(
+            str(json_path).encode(), num_batch, c_array_ptr)
+        self._check_c_error(c_error)
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def record_allocation_state(self):
+        return self.dll.ArrowCpp_BytesAllocated()
+
+    def compare_allocation_state(self, recorded, gc_until):
+        def pred():
+            # No GC on our side, so just compare allocation state
+            return self.record_allocation_state() == recorded
+
+        return gc_until(pred)
+
+
+class CppCDataImporter(CDataImporter, _CDataBase):
+
+    def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+        c_error = 
self.dll.ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(
+            str(json_path).encode(), c_schema_ptr)
+        self._check_c_error(c_error)
+
+    def import_batch_and_compare_to_json(self, json_path, num_batch,
+                                         c_array_ptr):
+        c_error = 
self.dll.ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(
+            str(json_path).encode(), num_batch, c_array_ptr)
+        self._check_c_error(c_error)
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def gc_until(self, predicate):
+        # No GC on our side, so can evaluate predicate immediately
+        return predicate()
diff --git a/dev/archery/archery/integration/util.py 
b/dev/archery/archery/integration/util.py
index 80ba30052e..afef7d5eb1 100644
--- a/dev/archery/archery/integration/util.py
+++ b/dev/archery/archery/integration/util.py
@@ -32,8 +32,10 @@ def guid():
 
 
 # SKIP categories
-SKIP_ARROW = 'arrow'
+SKIP_C_ARRAY = 'c_array'
+SKIP_C_SCHEMA = 'c_schema'
 SKIP_FLIGHT = 'flight'
+SKIP_IPC = 'ipc'
 
 
 class _Printer:
diff --git a/dev/archery/setup.py b/dev/archery/setup.py
index 627e576fb6..08e41225f6 100755
--- a/dev/archery/setup.py
+++ b/dev/archery/setup.py
@@ -28,16 +28,17 @@ if sys.version_info < (3, 8):
 jinja_req = 'jinja2>=2.11'
 
 extras = {
-    'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint',
-             'cmake_format==0.6.13'],
     'benchmark': ['pandas'],
-    'docker': ['ruamel.yaml', 'python-dotenv'],
-    'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'],
     'crossbow': ['github3.py', jinja_req, 'pygit2>=1.6.0', 'requests',
                  'ruamel.yaml', 'setuptools_scm'],
     'crossbow-upload': ['github3.py', jinja_req, 'ruamel.yaml',
                         'setuptools_scm'],
-    'numpydoc': ['numpydoc==1.1.0']
+    'docker': ['ruamel.yaml', 'python-dotenv'],
+    'integration': ['cffi'],
+    'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint',
+             'cmake_format==0.6.13'],
+    'numpydoc': ['numpydoc==1.1.0'],
+    'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'],
 }
 extras['bot'] = extras['crossbow'] + ['pygithub', 'jira']
 extras['all'] = list(set(functools.reduce(operator.add, extras.values())))

Reply via email to