This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b646ad4c2 GH-37537: [Integration][C++] Add C Data Interface
integration testing (#37769)
3b646ad4c2 is described below
commit 3b646ad4c2b826fe08b31d19e6435f73650bcb5e
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Sep 19 16:41:31 2023 +0200
GH-37537: [Integration][C++] Add C Data Interface integration testing
(#37769)
### Rationale for this change
Currently there are no systematic integration tests between implementations
of the C Data Interface, only a couple ad-hoc tests.
### What changes are included in this PR?
1. Add Archery infrastructure for integration testing of the C Data
Interface
2. Add implementation of this interface for Arrow C++
### Are these changes tested?
Yes, by construction.
### Are there any user-facing changes?
No.
* Closes: #37537
Lead-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
ci/scripts/integration_arrow.sh | 6 +-
cpp/src/arrow/CMakeLists.txt | 6 +-
.../integration/c_data_integration_internal.cc | 145 +++++++++++
.../integration/c_data_integration_internal.h | 48 ++++
cpp/src/arrow/integration/json_integration.cc | 7 +-
cpp/src/arrow/symbols.map | 1 +
dev/archery/archery/cli.py | 21 +-
dev/archery/archery/integration/cdata.py | 107 +++++++++
dev/archery/archery/integration/datagen.py | 110 ++++++---
dev/archery/archery/integration/runner.py | 267 ++++++++++++++++-----
dev/archery/archery/integration/scenario.py | 7 +-
dev/archery/archery/integration/tester.py | 183 +++++++++++++-
dev/archery/archery/integration/tester_cpp.py | 114 ++++++++-
dev/archery/archery/integration/util.py | 4 +-
dev/archery/setup.py | 11 +-
15 files changed, 913 insertions(+), 124 deletions(-)
diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 30cbb2d637..a165f8027b 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -22,10 +22,12 @@ set -ex
arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
-pip install -e $arrow_dir/dev/archery
+pip install -e $arrow_dir/dev/archery[integration]
# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
-archery integration \
+time archery integration \
+ --run-c-data \
+ --run-ipc \
--run-flight \
--with-cpp=1 \
--with-csharp=1 \
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index f474d0c517..9a61170115 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -383,7 +383,11 @@ endif()
#
if(ARROW_BUILD_INTEGRATION OR ARROW_BUILD_TESTS)
- list(APPEND ARROW_SRCS integration/json_integration.cc
integration/json_internal.cc)
+ list(APPEND
+ ARROW_SRCS
+ integration/c_data_integration_internal.cc
+ integration/json_integration.cc
+ integration/json_internal.cc)
endif()
if(ARROW_CSV)
diff --git a/cpp/src/arrow/integration/c_data_integration_internal.cc
b/cpp/src/arrow/integration/c_data_integration_internal.cc
new file mode 100644
index 0000000000..79e09eaf91
--- /dev/null
+++ b/cpp/src/arrow/integration/c_data_integration_internal.cc
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/integration/c_data_integration_internal.h"
+
+#include <sstream>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/integration/json_integration.h"
+#include "arrow/io/file.h"
+#include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::internal::integration {
+namespace {
+
+template <typename Func>
+const char* StatusToErrorString(Func&& func) {
+ static std::string error;
+
+ Status st = func();
+ if (st.ok()) {
+ return nullptr;
+ }
+ error = st.ToString();
+ ARROW_CHECK_GT(error.length(), 0);
+ return error.c_str();
+}
+
+Result<std::shared_ptr<Schema>> ReadSchemaFromJson(const std::string&
json_path,
+ MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool));
+ ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file));
+ return reader->schema();
+}
+
+Result<std::shared_ptr<RecordBatch>> ReadBatchFromJson(const std::string&
json_path,
+ int num_batch,
MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool));
+ ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file));
+ return reader->ReadRecordBatch(num_batch);
+}
+
+// XXX ideally, we should allow use of a custom memory pool in the C bridge
API,
+// but that requires non-trivial refactor
+
+Status ExportSchemaFromJson(std::string json_path, ArrowSchema* out) {
+ auto pool = default_memory_pool();
+ ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchemaFromJson(json_path, pool));
+ return ExportSchema(*schema, out);
+}
+
+Status ImportSchemaAndCompareToJson(std::string json_path, ArrowSchema*
c_schema) {
+ auto pool = default_memory_pool();
+ ARROW_ASSIGN_OR_RAISE(auto json_schema, ReadSchemaFromJson(json_path, pool));
+ ARROW_ASSIGN_OR_RAISE(auto imported_schema, ImportSchema(c_schema));
+ if (!imported_schema->Equals(json_schema, /*check_metadata=*/true)) {
+ return Status::Invalid("Schemas are different:", "\n- Json Schema: ",
*json_schema,
+ "\n- Imported Schema: ", *imported_schema);
+ }
+ return Status::OK();
+}
+
+Status ExportBatchFromJson(std::string json_path, int num_batch, ArrowArray*
out) {
+ auto pool = default_memory_pool();
+ ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch,
pool));
+ return ExportRecordBatch(*batch, out);
+}
+
+Status ImportBatchAndCompareToJson(std::string json_path, int num_batch,
+ ArrowArray* c_batch) {
+ auto pool = default_memory_pool();
+ ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch,
pool));
+ ARROW_ASSIGN_OR_RAISE(auto imported_batch, ImportRecordBatch(c_batch,
batch->schema()));
+ RETURN_NOT_OK(imported_batch->ValidateFull());
+ if (!imported_batch->Equals(*batch, /*check_metadata=*/true)) {
+ std::stringstream pp_expected;
+ std::stringstream pp_actual;
+ PrettyPrintOptions options(/*indent=*/2);
+ options.window = 50;
+ ARROW_CHECK_OK(PrettyPrint(*batch, options, &pp_expected));
+ ARROW_CHECK_OK(PrettyPrint(*imported_batch, options, &pp_actual));
+ return Status::Invalid("Record Batches are different:", "\n- Json Batch: ",
+ pp_expected.str(), "\n- Imported Batch: ",
pp_actual.str());
+ }
+ return Status::OK();
+}
+
+} // namespace
+} // namespace arrow::internal::integration
+
+const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char*
json_path,
+ ArrowSchema* out) {
+ using namespace arrow::internal::integration; // NOLINT(build/namespaces)
+ return StatusToErrorString([=]() { return ExportSchemaFromJson(json_path,
out); });
+}
+
+const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char*
json_path,
+
ArrowSchema* schema) {
+ using namespace arrow::internal::integration; // NOLINT(build/namespaces)
+ return StatusToErrorString(
+ [=]() { return ImportSchemaAndCompareToJson(json_path, schema); });
+}
+
+const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char*
json_path,
+ int num_batch,
+ ArrowArray* out) {
+ using namespace arrow::internal::integration; // NOLINT(build/namespaces)
+ return StatusToErrorString(
+ [=]() { return ExportBatchFromJson(json_path, num_batch, out); });
+}
+
+const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char*
json_path,
+ int
num_batch,
+ ArrowArray*
batch) {
+ using namespace arrow::internal::integration; // NOLINT(build/namespaces)
+ return StatusToErrorString(
+ [=]() { return ImportBatchAndCompareToJson(json_path, num_batch, batch);
});
+}
+
+int64_t ArrowCpp_BytesAllocated() {
+ auto pool = arrow::default_memory_pool();
+ return pool->bytes_allocated();
+}
diff --git a/cpp/src/arrow/integration/c_data_integration_internal.h
b/cpp/src/arrow/integration/c_data_integration_internal.h
new file mode 100644
index 0000000000..0a62363dff
--- /dev/null
+++ b/cpp/src/arrow/integration/c_data_integration_internal.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/c/abi.h"
+#include "arrow/util/visibility.h"
+
+// This file only serves as documentation for the C Data Interface integration
+// entrypoints. The actual functions are called by Archery through DLL symbol
lookup.
+
+extern "C" {
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char*
json_path,
+ ArrowSchema* out);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char*
json_path,
+
ArrowSchema* schema);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char*
json_path,
+ int num_batch,
ArrowArray* out);
+
+ARROW_EXPORT
+const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char*
json_path,
+ int
num_batch,
+ ArrowArray*
batch);
+
+ARROW_EXPORT
+int64_t ArrowCpp_BytesAllocated();
+
+} // extern "C"
diff --git a/cpp/src/arrow/integration/json_integration.cc
b/cpp/src/arrow/integration/json_integration.cc
index 178abe5e8b..590f6eddd7 100644
--- a/cpp/src/arrow/integration/json_integration.cc
+++ b/cpp/src/arrow/integration/json_integration.cc
@@ -144,10 +144,9 @@ class IntegrationJsonReader::Impl {
}
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) {
- DCHECK_GE(i, 0) << "i out of bounds";
- DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
- << "i out of bounds";
-
+ if (i < 0 || i >= static_cast<int>(record_batches_->GetArray().Size())) {
+ return Status::IndexError("record batch index ", i, " out of bounds");
+ }
return json::ReadRecordBatch(record_batches_->GetArray()[i], schema_,
&dictionary_memo_, pool_);
}
diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map
index 9ef0e404bc..0144e61165 100644
--- a/cpp/src/arrow/symbols.map
+++ b/cpp/src/arrow/symbols.map
@@ -32,6 +32,7 @@
};
# Also export C-level helpers
arrow_*;
+ Arrow*;
# ARROW-14771: export Protobuf symbol table
descriptor_table_Flight_2eproto;
descriptor_table_FlightSql_2eproto;
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index 70f865cc2f..7a3b45f978 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -723,8 +723,12 @@ def _set_default(opt, default):
envvar="ARCHERY_INTEGRATION_WITH_RUST")
@click.option('--write_generated_json', default="",
help='Generate test JSON to indicated path')
[email protected]('--run-ipc', is_flag=True, default=False,
+ help='Run IPC integration tests')
@click.option('--run-flight', is_flag=True, default=False,
help='Run Flight integration tests')
[email protected]('--run-c-data', is_flag=True, default=False,
+ help='Run C Data Interface integration tests')
@click.option('--debug', is_flag=True, default=False,
help='Run executables in debug mode as relevant')
@click.option('--serial', is_flag=True, default=False,
@@ -753,15 +757,19 @@ def integration(with_all=False, random_seed=12345,
**args):
gen_path = args['write_generated_json']
languages = ['cpp', 'csharp', 'java', 'js', 'go', 'rust']
+ formats = ['ipc', 'flight', 'c_data']
enabled_languages = 0
for lang in languages:
- param = 'with_{}'.format(lang)
+ param = f'with_{lang}'
if with_all:
args[param] = with_all
+ enabled_languages += args[param]
- if args[param]:
- enabled_languages += 1
+ enabled_formats = 0
+ for fmt in formats:
+ param = f'run_{fmt}'
+ enabled_formats += args[param]
if gen_path:
# XXX See GH-37575: this option is only used by the JS test suite
@@ -769,8 +777,13 @@ def integration(with_all=False, random_seed=12345, **args):
os.makedirs(gen_path, exist_ok=True)
write_js_test_json(gen_path)
else:
+ if enabled_formats == 0:
+ raise click.UsageError(
+ "Need to enable at least one format to test "
+ "(IPC, Flight, C Data Interface); try --help")
if enabled_languages == 0:
- raise Exception("Must enable at least 1 language to test")
+ raise click.UsageError(
+ "Need to enable at least one language to test; try --help")
run_all_tests(**args)
diff --git a/dev/archery/archery/integration/cdata.py
b/dev/archery/archery/integration/cdata.py
new file mode 100644
index 0000000000..c201f5f867
--- /dev/null
+++ b/dev/archery/archery/integration/cdata.py
@@ -0,0 +1,107 @@
+# licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cffi
+from contextlib import contextmanager
+import functools
+
+from .tester import CDataExporter, CDataImporter
+
+
+_c_data_decls = """
+ struct ArrowSchema {
+ // Array type description
+ const char* format;
+ const char* name;
+ const char* metadata;
+ int64_t flags;
+ int64_t n_children;
+ struct ArrowSchema** children;
+ struct ArrowSchema* dictionary;
+
+ // Release callback
+ void (*release)(struct ArrowSchema*);
+ // Opaque producer-specific data
+ void* private_data;
+ };
+
+ struct ArrowArray {
+ // Array data description
+ int64_t length;
+ int64_t null_count;
+ int64_t offset;
+ int64_t n_buffers;
+ int64_t n_children;
+ const void** buffers;
+ struct ArrowArray** children;
+ struct ArrowArray* dictionary;
+
+ // Release callback
+ void (*release)(struct ArrowArray*);
+ // Opaque producer-specific data
+ void* private_data;
+ };
+
+ struct ArrowArrayStream {
+ int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+ int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+ const char* (*get_last_error)(struct ArrowArrayStream*);
+
+ // Release callback
+ void (*release)(struct ArrowArrayStream*);
+ // Opaque producer-specific data
+ void* private_data;
+ };
+ """
+
+
[email protected]_cache
+def ffi() -> cffi.FFI:
+ """
+ Return a FFI object supporting C Data Interface types.
+ """
+ ffi = cffi.FFI()
+ ffi.cdef(_c_data_decls)
+ return ffi
+
+
+@contextmanager
+def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
+ """
+ A context manager for memory release checks.
+
+ The context manager arranges cooperation between the exporter and importer
+ to try and release memory at the end of the enclosed block.
+
+ However, if either the exporter or importer doesn't support deterministic
+ memory release, no memory check is performed.
+ """
+ do_check = (exporter.supports_releasing_memory and
+ importer.supports_releasing_memory)
+ if do_check:
+ before = exporter.record_allocation_state()
+ yield
+ # We don't use a `finally` clause: if the enclosed block raised an
+ # exception, no need to add another one.
+ if do_check:
+ ok = exporter.compare_allocation_state(before, importer.gc_until)
+ if not ok:
+ after = exporter.record_allocation_state()
+ raise RuntimeError(
+ f"Memory was not released correctly after roundtrip: "
+ f"before = {before}, after = {after} (should have been equal)")
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index f924c8a73c..53f7ba58bf 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -25,6 +25,7 @@ import tempfile
import numpy as np
from .util import frombytes, tobytes, random_bytes, random_utf8
+from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY
def metadata_key_values(pairs):
@@ -1224,15 +1225,16 @@ class RecordBatch(object):
class File(object):
def __init__(self, name, schema, batches, dictionaries=None,
- skip=None, path=None, quirks=None):
+ skip_testers=None, path=None, quirks=None):
self.name = name
self.schema = schema
self.dictionaries = dictionaries or []
self.batches = batches
- self.skip = set()
+ self.skipped_testers = set()
+ self.skipped_formats = {}
self.path = path
- if skip:
- self.skip.update(skip)
+ if skip_testers:
+ self.skipped_testers.update(skip_testers)
# For tracking flags like whether to validate decimal values
# fit into the given precision (ARROW-13558).
self.quirks = set()
@@ -1258,14 +1260,39 @@ class File(object):
f.write(json.dumps(self.get_json(), indent=2).encode('utf-8'))
self.path = path
- def skip_category(self, category):
- """Skip this test for the given category.
+ def skip_tester(self, tester):
+ """Skip this test for the given tester (such as 'C#').
+ """
+ self.skipped_testers.add(tester)
+ return self
- Category should be SKIP_ARROW or SKIP_FLIGHT.
+ def skip_format(self, format, tester='all'):
+ """Skip this test for the given format, and optionally tester.
"""
- self.skip.add(category)
+ self.skipped_formats.setdefault(format, set()).add(tester)
return self
+ def add_skips_from(self, other_file):
+ """Add skips from another File object.
+ """
+ self.skipped_testers.update(other_file.skipped_testers)
+ for format, testers in other_file.skipped_formats.items():
+ self.skipped_formats.setdefault(format, set()).update(testers)
+
+ def should_skip(self, tester, format):
+ """Whether this (tester, format) combination should be skipped.
+ """
+ if tester in self.skipped_testers:
+ return True
+ testers = self.skipped_formats.get(format, ())
+ return 'all' in testers or tester in testers
+
+ @property
+ def num_batches(self):
+ """The number of record batches in this file.
+ """
+ return len(self.batches)
+
def get_field(name, type_, **kwargs):
if type_ == 'binary':
@@ -1295,8 +1322,8 @@ def get_field(name, type_, **kwargs):
raise TypeError(dtype)
-def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None,
- metadata=None):
+def _generate_file(name, fields, batch_sizes, *,
+ dictionaries=None, metadata=None):
schema = Schema(fields, metadata=metadata)
batches = []
for size in batch_sizes:
@@ -1307,7 +1334,7 @@ def _generate_file(name, fields, batch_sizes,
dictionaries=None, skip=None,
batches.append(RecordBatch(size, columns))
- return File(name, schema, batches, dictionaries, skip=skip)
+ return File(name, schema, batches, dictionaries)
def generate_custom_metadata_case():
@@ -1666,8 +1693,8 @@ def get_generated_json_files(tempdir=None):
generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
generate_primitive_large_offsets_case([17, 20])
- .skip_category('C#')
- .skip_category('JS'),
+ .skip_tester('C#')
+ .skip_tester('JS'),
generate_null_case([10, 0]),
@@ -1676,66 +1703,71 @@ def get_generated_json_files(tempdir=None):
generate_decimal128_case(),
generate_decimal256_case()
- .skip_category('JS'),
+ .skip_tester('JS'),
generate_datetime_case(),
generate_duration_case()
- .skip_category('C#')
- .skip_category('JS'), # TODO(ARROW-5239): Intervals + JS
+ .skip_tester('C#')
+ .skip_tester('JS'), # TODO(ARROW-5239): Intervals + JS
generate_interval_case()
- .skip_category('C#')
- .skip_category('JS'), # TODO(ARROW-5239): Intervals + JS
+ .skip_tester('C#')
+ .skip_tester('JS'), # TODO(ARROW-5239): Intervals + JS
generate_month_day_nano_interval_case()
- .skip_category('C#')
- .skip_category('JS'),
+ .skip_tester('C#')
+ .skip_tester('JS'),
generate_map_case()
- .skip_category('C#'),
+ .skip_tester('C#'),
generate_non_canonical_map_case()
- .skip_category('C#')
- .skip_category('Java'), # TODO(ARROW-8715)
+ .skip_tester('C#')
+ .skip_tester('Java') # TODO(ARROW-8715)
+ # Canonical map names are restored on import, so the schemas are
unequal
+ .skip_format(SKIP_C_SCHEMA, 'C++'),
generate_nested_case(),
generate_recursive_nested_case(),
generate_nested_large_offsets_case()
- .skip_category('C#')
- .skip_category('JS'),
+ .skip_tester('C#')
+ .skip_tester('JS'),
generate_unions_case()
- .skip_category('C#'),
+ .skip_tester('C#'),
generate_custom_metadata_case()
- .skip_category('C#'),
+ .skip_tester('C#'),
generate_duplicate_fieldnames_case()
- .skip_category('C#')
- .skip_category('JS'),
+ .skip_tester('C#')
+ .skip_tester('JS'),
generate_dictionary_case()
- .skip_category('C#'),
+ .skip_tester('C#'),
generate_dictionary_unsigned_case()
- .skip_category('C#')
- .skip_category('Java'), # TODO(ARROW-9377)
+ .skip_tester('C#')
+ .skip_tester('Java'), # TODO(ARROW-9377)
generate_nested_dictionary_case()
- .skip_category('C#')
- .skip_category('Java'), # TODO(ARROW-7779)
+ .skip_tester('C#')
+ .skip_tester('Java'), # TODO(ARROW-7779)
generate_run_end_encoded_case()
- .skip_category('C#')
- .skip_category('Java')
- .skip_category('JS')
- .skip_category('Rust'),
+ .skip_tester('C#')
+ .skip_tester('Java')
+ .skip_tester('JS')
+ .skip_tester('Rust'),
generate_extension_case()
- .skip_category('C#'),
+ .skip_tester('C#')
+ # TODO: ensure the extension is registered in the C++ entrypoint
+ .skip_format(SKIP_C_SCHEMA, 'C++')
+ .skip_format(SKIP_C_ARRAY, 'C++'),
]
generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 0ee9ab814e..2fd1d2d7f0 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -25,17 +25,19 @@ import os
import sys
import tempfile
import traceback
-from typing import Callable, List
+from typing import Callable, List, Optional
+from . import cdata
from .scenario import Scenario
-from .tester import Tester
-from .tester_cpp import CPPTester
+from .tester import Tester, CDataExporter, CDataImporter
+from .tester_cpp import CppTester
from .tester_go import GoTester
from .tester_rust import RustTester
from .tester_java import JavaTester
from .tester_js import JSTester
from .tester_csharp import CSharpTester
-from .util import guid, SKIP_ARROW, SKIP_FLIGHT, printer
+from .util import guid, printer
+from .util import SKIP_C_ARRAY, SKIP_C_SCHEMA, SKIP_FLIGHT, SKIP_IPC
from ..utils.source import ARROW_ROOT_DEFAULT
from . import datagen
@@ -76,7 +78,7 @@ class IntegrationRunner(object):
self.json_files = [json_file for json_file in self.json_files
if self.match in json_file.name]
- def run(self):
+ def run_ipc(self):
"""
Run Arrow IPC integration tests for the matrix of enabled
implementations.
@@ -84,23 +86,24 @@ class IntegrationRunner(object):
for producer, consumer in itertools.product(
filter(lambda t: t.PRODUCER, self.testers),
filter(lambda t: t.CONSUMER, self.testers)):
- self._compare_implementations(
+ self._compare_ipc_implementations(
producer, consumer, self._produce_consume,
self.json_files)
if self.gold_dirs:
for gold_dir, consumer in itertools.product(
self.gold_dirs,
filter(lambda t: t.CONSUMER, self.testers)):
- log('\n\n\n\n')
+ log('\n')
log('******************************************************')
log('Tests against golden files in {}'.format(gold_dir))
log('******************************************************')
def run_gold(_, consumer, test_case: datagen.File):
return self._run_gold(gold_dir, consumer, test_case)
- self._compare_implementations(
+ self._compare_ipc_implementations(
consumer, consumer, run_gold,
self._gold_tests(gold_dir))
+ log('\n')
def run_flight(self):
"""
@@ -112,6 +115,18 @@ class IntegrationRunner(object):
self.testers)
for server, client in itertools.product(servers, clients):
self._compare_flight_implementations(server, client)
+ log('\n')
+
+ def run_c_data(self):
+ """
+ Run Arrow C Data interface integration tests for the matrix of
+ enabled implementations.
+ """
+ for producer, consumer in itertools.product(
+ filter(lambda t: t.C_DATA_EXPORTER, self.testers),
+ filter(lambda t: t.C_DATA_IMPORTER, self.testers)):
+ self._compare_c_data_implementations(producer, consumer)
+ log('\n')
def _gold_tests(self, gold_dir):
prefix = os.path.basename(os.path.normpath(gold_dir))
@@ -125,28 +140,31 @@ class IntegrationRunner(object):
with open(out_path, "wb") as out:
out.write(i.read())
+ # Find the generated file with the same name as this gold file
try:
- skip = next(f for f in self.json_files
- if f.name == name).skip
+ equiv_json_file = next(f for f in self.json_files
+ if f.name == name)
except StopIteration:
- skip = set()
+ equiv_json_file = None
+
+ skip_testers = set()
if name == 'union' and prefix == '0.17.1':
- skip.add("Java")
- skip.add("JS")
+ skip_testers.add("Java")
+ skip_testers.add("JS")
if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian':
- skip.add("C#")
- skip.add("Java")
- skip.add("JS")
- skip.add("Rust")
+ skip_testers.add("C#")
+ skip_testers.add("Java")
+ skip_testers.add("JS")
+ skip_testers.add("Rust")
if prefix == '2.0.0-compression':
- skip.add("C#")
- skip.add("JS")
+ skip_testers.add("C#")
+ skip_testers.add("JS")
# See https://github.com/apache/arrow/pull/9822 for how to
# disable specific compression type tests.
if prefix == '4.0.0-shareddict':
- skip.add("C#")
+ skip_testers.add("C#")
quirks = set()
if prefix in {'0.14.1', '0.17.1',
@@ -157,12 +175,18 @@ class IntegrationRunner(object):
quirks.add("no_date64_validate")
quirks.add("no_times_validate")
- yield datagen.File(name, None, None, skip=skip, path=out_path,
- quirks=quirks)
+ json_file = datagen.File(name, schema=None, batches=None,
+ path=out_path,
+ skip_testers=skip_testers,
+ quirks=quirks)
+ if equiv_json_file is not None:
+ json_file.add_skips_from(equiv_json_file)
+ yield json_file
def _run_test_cases(self,
case_runner: Callable[[datagen.File], Outcome],
- test_cases: List[datagen.File]) -> None:
+ test_cases: List[datagen.File],
+ *, serial: Optional[bool] = None) -> None:
"""
Populate self.failures with the outcomes of the
``case_runner`` ran against ``test_cases``
@@ -171,10 +195,13 @@ class IntegrationRunner(object):
with printer.cork():
return case_runner(test_case)
+ if serial is None:
+ serial = self.serial
+
if self.failures and self.stop_on_error:
return
- if self.serial:
+ if serial:
for outcome in map(case_wrapper, test_cases):
if outcome.failure is not None:
self.failures.append(outcome.failure)
@@ -189,7 +216,7 @@ class IntegrationRunner(object):
if self.stop_on_error:
break
- def _compare_implementations(
+ def _compare_ipc_implementations(
self,
producer: Tester,
consumer: Tester,
@@ -221,22 +248,17 @@ class IntegrationRunner(object):
outcome = Outcome()
json_path = test_case.path
- log('==========================================================')
+ log('=' * 70)
log('Testing file {0}'.format(json_path))
- log('==========================================================')
-
- if producer.name in test_case.skip:
- log('-- Skipping test because producer {0} does '
- 'not support'.format(producer.name))
- outcome.skipped = True
- elif consumer.name in test_case.skip:
- log('-- Skipping test because consumer {0} does '
- 'not support'.format(consumer.name))
+ if test_case.should_skip(producer.name, SKIP_IPC):
+ log(f'-- Skipping test because producer {producer.name} does '
+ f'not support IPC')
outcome.skipped = True
- elif SKIP_ARROW in test_case.skip:
- log('-- Skipping test')
+ elif test_case.should_skip(consumer.name, SKIP_IPC):
+ log(f'-- Skipping test because consumer {consumer.name} does '
+ f'not support IPC')
outcome.skipped = True
else:
@@ -247,6 +269,8 @@ class IntegrationRunner(object):
outcome.failure = Failure(test_case, producer, consumer,
sys.exc_info())
+ log('=' * 70)
+
return outcome
def _produce_consume(self,
@@ -344,22 +368,17 @@ class IntegrationRunner(object):
"""
outcome = Outcome()
- log('=' * 58)
+ log('=' * 70)
log('Testing file {0}'.format(test_case.name))
- log('=' * 58)
-
- if producer.name in test_case.skip:
- log('-- Skipping test because producer {0} does '
- 'not support'.format(producer.name))
- outcome.skipped = True
- elif consumer.name in test_case.skip:
- log('-- Skipping test because consumer {0} does '
- 'not support'.format(consumer.name))
+ if test_case.should_skip(producer.name, SKIP_FLIGHT):
+ log(f'-- Skipping test because producer {producer.name} does '
+ f'not support Flight')
outcome.skipped = True
- elif SKIP_FLIGHT in test_case.skip:
- log('-- Skipping test')
+ elif test_case.should_skip(consumer.name, SKIP_FLIGHT):
+ log(f'-- Skipping test because consumer {consumer.name} does '
+ f'not support Flight')
outcome.skipped = True
else:
@@ -380,6 +399,125 @@ class IntegrationRunner(object):
outcome.failure = Failure(test_case, producer, consumer,
sys.exc_info())
+ log('=' * 70)
+
+ return outcome
+
+ def _compare_c_data_implementations(
+ self,
+ producer: Tester,
+ consumer: Tester
+ ):
+ log('##########################################################')
+ log(f'C Data Interface: '
+ f'{producer.name} exporting, {consumer.name} importing')
+ log('##########################################################')
+
+ # Serial execution is required for proper memory accounting
+ serial = True
+
+ exporter = producer.make_c_data_exporter()
+ importer = consumer.make_c_data_importer()
+
+ case_runner = partial(self._run_c_schema_test_case, producer, consumer,
+ exporter, importer)
+ self._run_test_cases(case_runner, self.json_files, serial=serial)
+
+ case_runner = partial(self._run_c_array_test_cases, producer, consumer,
+ exporter, importer)
+ self._run_test_cases(case_runner, self.json_files, serial=serial)
+
+ def _run_c_schema_test_case(self,
+ producer: Tester, consumer: Tester,
+ exporter: CDataExporter,
+ importer: CDataImporter,
+ test_case: datagen.File) -> Outcome:
+ """
+ Run one C ArrowSchema test case.
+ """
+ outcome = Outcome()
+
+ def do_run():
+ json_path = test_case.path
+ ffi = cdata.ffi()
+ c_schema_ptr = ffi.new("struct ArrowSchema*")
+ with cdata.check_memory_released(exporter, importer):
+ exporter.export_schema_from_json(json_path, c_schema_ptr)
+ importer.import_schema_and_compare_to_json(json_path,
c_schema_ptr)
+
+ log('=' * 70)
+ log(f'Testing C ArrowSchema from file {test_case.name!r}')
+
+ if test_case.should_skip(producer.name, SKIP_C_SCHEMA):
+ log(f'-- Skipping test because producer {producer.name} does '
+ f'not support C ArrowSchema')
+ outcome.skipped = True
+
+ elif test_case.should_skip(consumer.name, SKIP_C_SCHEMA):
+ log(f'-- Skipping test because consumer {consumer.name} does '
+ f'not support C ArrowSchema')
+ outcome.skipped = True
+
+ else:
+ try:
+ do_run()
+ except Exception:
+ traceback.print_exc(file=printer.stdout)
+ outcome.failure = Failure(test_case, producer, consumer,
+ sys.exc_info())
+
+ log('=' * 70)
+
+ return outcome
+
+ def _run_c_array_test_cases(self,
+ producer: Tester, consumer: Tester,
+ exporter: CDataExporter,
+ importer: CDataImporter,
+ test_case: datagen.File) -> Outcome:
+ """
+ Run one set C ArrowArray test cases.
+ """
+ outcome = Outcome()
+
+ def do_run():
+ json_path = test_case.path
+ ffi = cdata.ffi()
+ c_array_ptr = ffi.new("struct ArrowArray*")
+ for num_batch in range(test_case.num_batches):
+ log(f'... with record batch #{num_batch}')
+ with cdata.check_memory_released(exporter, importer):
+ exporter.export_batch_from_json(json_path,
+ num_batch,
+ c_array_ptr)
+ importer.import_batch_and_compare_to_json(json_path,
+ num_batch,
+ c_array_ptr)
+
+ log('=' * 70)
+ log(f'Testing C ArrowArray '
+ f'from file {test_case.name!r}')
+
+ if test_case.should_skip(producer.name, SKIP_C_ARRAY):
+ log(f'-- Skipping test because producer {producer.name} does '
+ f'not support C ArrowArray')
+ outcome.skipped = True
+
+ elif test_case.should_skip(consumer.name, SKIP_C_ARRAY):
+ log(f'-- Skipping test because consumer {consumer.name} does '
+ f'not support C ArrowArray')
+ outcome.skipped = True
+
+ else:
+ try:
+ do_run()
+ except Exception:
+ traceback.print_exc(file=printer.stdout)
+ outcome.failure = Failure(test_case, producer, consumer,
+ sys.exc_info())
+
+ log('=' * 70)
+
return outcome
@@ -387,7 +525,7 @@ def get_static_json_files():
glob_pattern = os.path.join(ARROW_ROOT_DEFAULT,
'integration', 'data', '*.json')
return [
- datagen.File(name=os.path.basename(p), path=p, skip=set(),
+ datagen.File(name=os.path.basename(p), path=p,
schema=None, batches=None)
for p in glob.glob(glob_pattern)
]
@@ -395,13 +533,14 @@ def get_static_json_files():
def run_all_tests(with_cpp=True, with_java=True, with_js=True,
with_csharp=True, with_go=True, with_rust=False,
- run_flight=False, tempdir=None, **kwargs):
+ run_ipc=False, run_flight=False, run_c_data=False,
+ tempdir=None, **kwargs):
tempdir = tempdir or tempfile.mkdtemp(prefix='arrow-integration-')
testers: List[Tester] = []
if with_cpp:
- testers.append(CPPTester(**kwargs))
+ testers.append(CppTester(**kwargs))
if with_java:
testers.append(JavaTester(**kwargs))
@@ -434,54 +573,57 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
Scenario(
"ordered",
description="Ensure FlightInfo.ordered is supported.",
- skip={"JS", "C#", "Rust"},
+ skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:do_get",
description=("Ensure FlightEndpoint.expiration_time with "
"DoGet is working as expected."),
- skip={"JS", "C#", "Rust"},
+ skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:list_actions",
description=("Ensure FlightEndpoint.expiration_time related "
"pre-defined actions is working with ListActions "
"as expected."),
- skip={"JS", "C#", "Rust"},
+ skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:cancel_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CancelFlightInfo are working as expected."),
- skip={"JS", "C#", "Rust"},
+ skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:renew_flight_endpoint",
description=("Ensure FlightEndpoint.expiration_time and "
"RenewFlightEndpoint are working as expected."),
- skip={"JS", "C#", "Rust"},
+ skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"poll_flight_info",
description="Ensure PollFlightInfo is supported.",
- skip={"JS", "C#", "Rust"}
+ skip_testers={"JS", "C#", "Rust"}
),
Scenario(
"flight_sql",
description="Ensure Flight SQL protocol is working as expected.",
- skip={"Rust"}
+ skip_testers={"Rust"}
),
Scenario(
"flight_sql:extension",
description="Ensure Flight SQL extensions work as expected.",
- skip={"Rust"}
+ skip_testers={"Rust"}
),
]
runner = IntegrationRunner(json_files, flight_scenarios, testers, **kwargs)
- runner.run()
+ if run_ipc:
+ runner.run_ipc()
if run_flight:
runner.run_flight()
+ if run_c_data:
+ runner.run_c_data()
fail_count = 0
if runner.failures:
@@ -492,7 +634,8 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
log(test_case.name, producer.name, "producing, ",
consumer.name, "consuming")
if exc_info:
- traceback.print_exception(*exc_info)
+ exc_type, exc_value, exc_tb = exc_info
+ log(f'{exc_type}: {exc_value}')
log()
log(fail_count, "failures")
diff --git a/dev/archery/archery/integration/scenario.py
b/dev/archery/archery/integration/scenario.py
index 1fcbca64e6..89c64452e5 100644
--- a/dev/archery/archery/integration/scenario.py
+++ b/dev/archery/archery/integration/scenario.py
@@ -23,7 +23,10 @@ class Scenario:
Does not correspond to a particular IPC JSON file.
"""
- def __init__(self, name, description, skip=None):
+ def __init__(self, name, description, skip_testers=None):
self.name = name
self.description = description
- self.skip = skip or set()
+ self.skipped_testers = skip_testers or set()
+
+ def should_skip(self, tester, format):
+ return tester in self.skipped_testers
diff --git a/dev/archery/archery/integration/tester.py
b/dev/archery/archery/integration/tester.py
index 54bfe621ef..6a3061992d 100644
--- a/dev/archery/archery/integration/tester.py
+++ b/dev/archery/archery/integration/tester.py
@@ -17,12 +17,181 @@
# Base class for language-specific integration test harnesses
+from abc import ABC, abstractmethod
+import os
import subprocess
+import typing
from .util import log
-class Tester(object):
+_Predicate = typing.Callable[[], bool]
+
+
+class CDataExporter(ABC):
+
+ @abstractmethod
+ def export_schema_from_json(self, json_path: os.PathLike,
+ c_schema_ptr: object):
+ """
+ Read a JSON integration file and export its schema.
+
+ Parameters
+ ----------
+ json_path : Path
+ Path to the JSON file
+ c_schema_ptr : cffi pointer value
+ Pointer to the ``ArrowSchema`` struct to export to.
+ """
+
+ @abstractmethod
+ def export_batch_from_json(self, json_path: os.PathLike,
+ num_batch: int,
+ c_array_ptr: object):
+ """
+ Read a JSON integration file and export one of its batches.
+
+ Parameters
+ ----------
+ json_path : Path
+ Path to the JSON file
+ num_batch : int
+ Number of the record batch in the JSON file
+ c_schema_ptr : cffi pointer value
+ Pointer to the ``ArrowArray`` struct to export to.
+ """
+
+ @property
+ @abstractmethod
+ def supports_releasing_memory(self) -> bool:
+ """
+ Whether the implementation is able to release memory deterministically.
+
+ Here, "release memory" means that, after the `release` callback of
+ a C Data Interface export is called, `compare_allocation_state` is
+ able to trigger the deallocation of the memory underlying the export
+ (for example buffer data).
+
+ If false, then `record_allocation_state` and `compare_allocation_state`
+ are allowed to raise NotImplementedError.
+ """
+
+ def record_allocation_state(self) -> object:
+ """
+ Record the current memory allocation state.
+
+ Returns
+ -------
+ state : object
+ Opaque object representing the allocation state,
+ for example the number of allocated bytes.
+ """
+ raise NotImplementedError
+
+ def compare_allocation_state(self, recorded: object,
+ gc_until: typing.Callable[[_Predicate], bool]
+ ) -> bool:
+ """
+ Compare the current memory allocation state with the recorded one.
+
+ Parameters
+ ----------
+ recorded : object
+ The previous allocation state returned by
+ `record_allocation_state()`
+ gc_until : callable
+ A callable itself accepting a callable predicate, and
+ returning a boolean.
+ `gc_until` should try to release memory until the predicate
+ becomes true, or until it decides to give up. The final value
+ of the predicate should be returned.
+ `gc_until` is typically provided by the C Data Interface importer.
+
+ Returns
+ -------
+ success : bool
+ Whether memory allocation state finally reached its previously
+ recorded value.
+ """
+ raise NotImplementedError
+
+
+class CDataImporter(ABC):
+
+ @abstractmethod
+ def import_schema_and_compare_to_json(self, json_path: os.PathLike,
+ c_schema_ptr: object):
+ """
+ Import schema and compare it to the schema of a JSON integration file.
+
+ An error is raised if importing fails or the schemas differ.
+
+ Parameters
+ ----------
+ json_path : Path
+ The path to the JSON file
+ c_schema_ptr : cffi pointer value
+ Pointer to the ``ArrowSchema`` struct to import from.
+ """
+
+ @abstractmethod
+ def import_batch_and_compare_to_json(self, json_path: os.PathLike,
+ num_batch: int,
+ c_array_ptr: object):
+ """
+ Import record batch and compare it to one of the batches
+ from a JSON integration file.
+
+ The schema used for importing the record batch is the one from
+ the JSON file.
+
+ An error is raised if importing fails or the batches differ.
+
+ Parameters
+ ----------
+ json_path : Path
+ The path to the JSON file
+ num_batch : int
+ Number of the record batch in the JSON file
+ c_array_ptr : cffi pointer value
+ Pointer to the ``ArrowArray`` struct to import from.
+ """
+
+ @property
+ @abstractmethod
+ def supports_releasing_memory(self) -> bool:
+ """
+ Whether the implementation is able to release memory deterministically.
+
+ Here, "release memory" means calling the `release` callback of
+ a C Data Interface export (which should then trigger a deallocation
+ mechanism on the exporter).
+
+ If false, then `gc_until` is allowed to raise NotImplementedError.
+ """
+
+ def gc_until(self, predicate: _Predicate):
+ """
+ Try to release memory until the predicate becomes true, or fail.
+
+ Depending on the CDataImporter implementation, this may for example
+ try once, or run a garbage collector a given number of times, or
+ any other implementation-specific strategy for releasing memory.
+
+ The running time should be kept reasonable and compatible with
+ execution of multiple C Data integration tests.
+
+ This should not raise if `supports_releasing_memory` is true.
+
+ Returns
+ -------
+ success : bool
+ The final value of the predicate.
+ """
+ raise NotImplementedError
+
+
+class Tester:
"""
The interface to declare a tester to run integration tests against.
"""
@@ -34,8 +203,12 @@ class Tester(object):
FLIGHT_SERVER = False
# whether the language supports receiving Flight
FLIGHT_CLIENT = False
+ # whether the language supports the C Data Interface as an exporter
+ C_DATA_EXPORTER = False
+ # whether the language supports the C Data Interface as an importer
+ C_DATA_IMPORTER = False
- # the name shown in the logs
+ # the name used for skipping and shown in the logs
name = "unknown"
def __init__(self, debug=False, **args):
@@ -85,3 +258,9 @@ class Tester(object):
def flight_request(self, port, json_path=None, scenario_name=None):
raise NotImplementedError
+
+ def make_c_data_exporter(self) -> CDataExporter:
+ raise NotImplementedError
+
+ def make_c_data_importer(self) -> CDataImporter:
+ raise NotImplementedError
diff --git a/dev/archery/archery/integration/tester_cpp.py
b/dev/archery/archery/integration/tester_cpp.py
index 52cc565dc0..9ddc3c4800 100644
--- a/dev/archery/archery/integration/tester_cpp.py
+++ b/dev/archery/archery/integration/tester_cpp.py
@@ -16,10 +16,12 @@
# under the License.
import contextlib
+import functools
import os
import subprocess
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT
@@ -39,12 +41,19 @@ _FLIGHT_CLIENT_CMD = [
"localhost",
]
+_dll_suffix = ".dll" if os.name == "nt" else ".so"
-class CPPTester(Tester):
+_DLL_PATH = _EXE_PATH
+_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + _dll_suffix)
+
+
+class CppTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
+ C_DATA_EXPORTER = True
+ C_DATA_IMPORTER = True
name = 'C++'
@@ -133,3 +142,104 @@ class CPPTester(Tester):
if self.debug:
log(' '.join(cmd))
run_cmd(cmd)
+
+ def make_c_data_exporter(self):
+ return CppCDataExporter(self.debug, self.args)
+
+ def make_c_data_importer(self):
+ return CppCDataImporter(self.debug, self.args)
+
+
+_cpp_c_data_entrypoints = """
+ const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(
+ const char* json_path, struct ArrowSchema* out);
+ const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(
+ const char* json_path, struct ArrowSchema* schema);
+
+ const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(
+ const char* json_path, int num_batch, struct ArrowArray* out);
+ const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(
+ const char* json_path, int num_batch, struct ArrowArray* batch);
+
+ int64_t ArrowCpp_BytesAllocated();
+ """
+
+
[email protected]_cache
+def _load_ffi(ffi, lib_path=_ARROW_DLL):
+ ffi.cdef(_cpp_c_data_entrypoints)
+ dll = ffi.dlopen(lib_path)
+ dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson
+ return dll
+
+
+class _CDataBase:
+
+ def __init__(self, debug, args):
+ self.debug = debug
+ self.args = args
+ self.ffi = cdata.ffi()
+ self.dll = _load_ffi(self.ffi)
+
+ def _check_c_error(self, c_error):
+ """
+ Check a `const char*` error return from an integration entrypoint.
+
+ A null means success, a non-empty string is an error message.
+ The string is statically allocated on the C++ side.
+ """
+ assert self.ffi.typeof(c_error) is self.ffi.typeof("const char*")
+ if c_error != self.ffi.NULL:
+ error = self.ffi.string(c_error).decode('utf8',
+ errors='replace')
+ raise RuntimeError(
+ f"C++ C Data Integration call failed: {error}")
+
+
+class CppCDataExporter(CDataExporter, _CDataBase):
+
+ def export_schema_from_json(self, json_path, c_schema_ptr):
+ c_error = self.dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson(
+ str(json_path).encode(), c_schema_ptr)
+ self._check_c_error(c_error)
+
+ def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+ c_error = self.dll.ArrowCpp_CDataIntegration_ExportBatchFromJson(
+ str(json_path).encode(), num_batch, c_array_ptr)
+ self._check_c_error(c_error)
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def record_allocation_state(self):
+ return self.dll.ArrowCpp_BytesAllocated()
+
+ def compare_allocation_state(self, recorded, gc_until):
+ def pred():
+ # No GC on our side, so just compare allocation state
+ return self.record_allocation_state() == recorded
+
+ return gc_until(pred)
+
+
+class CppCDataImporter(CDataImporter, _CDataBase):
+
+ def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+ c_error =
self.dll.ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(
+ str(json_path).encode(), c_schema_ptr)
+ self._check_c_error(c_error)
+
+ def import_batch_and_compare_to_json(self, json_path, num_batch,
+ c_array_ptr):
+ c_error =
self.dll.ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(
+ str(json_path).encode(), num_batch, c_array_ptr)
+ self._check_c_error(c_error)
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def gc_until(self, predicate):
+ # No GC on our side, so can evaluate predicate immediately
+ return predicate()
diff --git a/dev/archery/archery/integration/util.py
b/dev/archery/archery/integration/util.py
index 80ba30052e..afef7d5eb1 100644
--- a/dev/archery/archery/integration/util.py
+++ b/dev/archery/archery/integration/util.py
@@ -32,8 +32,10 @@ def guid():
# SKIP categories
-SKIP_ARROW = 'arrow'
+SKIP_C_ARRAY = 'c_array'
+SKIP_C_SCHEMA = 'c_schema'
SKIP_FLIGHT = 'flight'
+SKIP_IPC = 'ipc'
class _Printer:
diff --git a/dev/archery/setup.py b/dev/archery/setup.py
index 627e576fb6..08e41225f6 100755
--- a/dev/archery/setup.py
+++ b/dev/archery/setup.py
@@ -28,16 +28,17 @@ if sys.version_info < (3, 8):
jinja_req = 'jinja2>=2.11'
extras = {
- 'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint',
- 'cmake_format==0.6.13'],
'benchmark': ['pandas'],
- 'docker': ['ruamel.yaml', 'python-dotenv'],
- 'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'],
'crossbow': ['github3.py', jinja_req, 'pygit2>=1.6.0', 'requests',
'ruamel.yaml', 'setuptools_scm'],
'crossbow-upload': ['github3.py', jinja_req, 'ruamel.yaml',
'setuptools_scm'],
- 'numpydoc': ['numpydoc==1.1.0']
+ 'docker': ['ruamel.yaml', 'python-dotenv'],
+ 'integration': ['cffi'],
+ 'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint',
+ 'cmake_format==0.6.13'],
+ 'numpydoc': ['numpydoc==1.1.0'],
+ 'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'],
}
extras['bot'] = extras['crossbow'] + ['pygithub', 'jira']
extras['all'] = list(set(functools.reduce(operator.add, extras.values())))