This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new dd437a95 feat: Add ZSTD decompression support to IPC reader (#693)
dd437a95 is described below
commit dd437a958e2e7dd414478fd8eb805555597fc8cf
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Dec 26 20:00:30 2024 -0600
feat: Add ZSTD decompression support to IPC reader (#693)
This PR implements ZSTD buffer decompression in the `ArrowIpcDecoder`
and in the `ArrowArrayStreamReader` when built with
`-DNANOARROW_IPC_WITH_ZSTD=ON`. It also allows a user to inject support
for these into the `ArrowIpcDecoder` if for whatever reason they don't
have control over the build flags (or want to use ZSTD that has been
made available to them in a different way).
This doesn't implement multithreaded decompression but does allow a user
to implement it by not using the default `ArrowIpcSerialDecompressor()`.
This could be included in header-only C++ if there is some interest.
A non-trivial example in Python bindings (where were also wired up to
support it):
```python
import urllib.request
import nanoarrow as na
url =
"https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-basin_point-wkb.arrow"
# Work around the 'no arrow file support'
with urllib.request.urlopen(url) as f:
f.read(8)
print(na.ArrayStream.from_readable(f).read_all())
#> nanoarrow.Array<non-nullable struct<OBJECTID: int64, FEAT_CODE: string,
...>[46]
#> {'OBJECTID': 1, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EB000', 'RIVER':
'BAR...
#> {'OBJECTID': 2, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EC000', 'RIVER':
'ROS...
#> {'OBJECTID': 3, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EA000', 'RIVER':
'TUS...
#> {'OBJECTID': 4, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01DA000', 'RIVER':
'MET...
#> {'OBJECTID': 5, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01ED000', 'RIVER':
'MER...
#> {'OBJECTID': 6, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EE000', 'RIVER':
'HER...
#> {'OBJECTID': 7, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EG000', 'RIVER':
'GOL...
#> {'OBJECTID': 8, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EF000', 'RIVER':
'LAH...
#> {'OBJECTID': 9, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EJ000', 'RIVER':
'SAC...
#> {'OBJECTID': 10, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EH000',
'RIVER': 'EA...
#> ...and 36 more items
```
This PR doesn't implement the R bindings (because adding a zstd
dependency there is a can of worms better suited to another PR).
---
.github/workflows/build-and-test-ipc.yaml | 6 +-
.github/workflows/build-and-test.yaml | 4 +-
CMakeLists.txt | 27 ++++-
ci/scripts/build-arrow-cpp-minimal.sh | 1 +
ci/scripts/bundle.py | 1 +
meson.build | 12 +-
meson.options | 1 +
python/meson.build | 3 +
python/src/nanoarrow/meson.build | 3 +-
python/{meson.build => subprojects/zstd.wrap} | 33 ++----
python/tests/test_ipc.py | 35 +++++-
src/nanoarrow/ipc/codecs.c | 138 +++++++++++++++++++++++
src/nanoarrow/ipc/codecs_test.cc | 122 ++++++++++++++++++++
src/nanoarrow/ipc/decoder.c | 156 ++++++++++++++++++++++----
src/nanoarrow/ipc/decoder_test.cc | 105 +++++++++++++++--
src/nanoarrow/ipc/files_test.cc | 67 +++++++++--
src/nanoarrow/nanoarrow_ipc.h | 75 +++++++++++++
src/nanoarrow/nanoarrow_ipc.hpp | 22 ++++
python/meson.build => subprojects/zstd.wrap | 33 ++----
19 files changed, 753 insertions(+), 91 deletions(-)
diff --git a/.github/workflows/build-and-test-ipc.yaml
b/.github/workflows/build-and-test-ipc.yaml
index 6defa4bd..ee4332d8 100644
--- a/.github/workflows/build-and-test-ipc.yaml
+++ b/.github/workflows/build-and-test-ipc.yaml
@@ -43,7 +43,7 @@ jobs:
fail-fast: false
matrix:
config:
- - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON"}
+ - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON
-DNANOARROW_IPC_WITH_ZSTD=ON"}
- {label: default-noatomics, cmake_args:
"-DCMAKE_C_FLAGS='-DNANOARROW_IPC_USE_STDATOMIC=0'"}
- {label: namespaced-build, cmake_args:
"-DNANOARROW_NAMESPACE=SomeUserNamespace"}
- {label: bundled-build, cmake_args: "-DNANOARROW_BUNDLE=ON"}
@@ -72,13 +72,13 @@ jobs:
with:
path: arrow
# Bump the number at the end of this line to force a new Arrow C++
build
- key: arrow-${{ runner.os }}-${{ runner.arch }}-1
+ key: arrow-${{ runner.os }}-${{ runner.arch }}-3
- name: Build Arrow C++
if: steps.cache-arrow-build.outputs.cache-hit != 'true'
shell: bash
run: |
- ci/scripts/build-arrow-cpp-minimal.sh 18.0.0 arrow
+ ci/scripts/build-arrow-cpp-minimal.sh 18.1.0 arrow
- name: Build
run: |
diff --git a/.github/workflows/build-and-test.yaml
b/.github/workflows/build-and-test.yaml
index ab9b8e8e..8814e910 100644
--- a/.github/workflows/build-and-test.yaml
+++ b/.github/workflows/build-and-test.yaml
@@ -127,7 +127,9 @@ jobs:
verify-meson:
name: meson-build
- runs-on: ubuntu-latest
+ # Workaround until https://github.com/apache/arrow-nanoarrow/issues/663 is
solved
+ # (after which we can use ubuntu-latest)
+ runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a24f7873..c77007d2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -36,6 +36,8 @@ option(NANOARROW_FLATCC_ROOT_DIR "Root directory for flatcc
include and lib dire
OFF)
option(NANOARROW_FLATCC_INCLUDE_DIR "Include directory for flatcc includes"
OFF)
option(NANOARROW_FLATCC_LIB_DIR "Library directory that contains
libflatccrt.a" OFF)
+option(NANOARROW_IPC_WITH_ZSTD "Build nanoarrow with ZSTD compression support
built in"
+ OFF)
option(NANOARROW_DEVICE "Build device extension" OFF)
option(NANOARROW_TESTING "Build testng extension" OFF)
@@ -205,15 +207,25 @@ if(NANOARROW_IPC)
"${NANOARROW_FLATCC_INCLUDE_DIR}")
endif()
+ if(NANOARROW_IPC_WITH_ZSTD)
+ find_package(zstd REQUIRED)
+ set(NANOARROW_IPC_EXTRA_FLAGS "-DNANOARROW_IPC_WITH_ZSTD")
+ set(NANOARROW_IPC_EXTRA_LIBS zstd::libzstd_static)
+ endif()
+
if(NOT NANOARROW_BUNDLE)
set(NANOARROW_IPC_BUILD_SOURCES
- src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
- src/nanoarrow/ipc/reader.c src/nanoarrow/ipc/writer.c)
+ src/nanoarrow/ipc/codecs.c
+ src/nanoarrow/ipc/decoder.c
+ src/nanoarrow/ipc/encoder.c
+ src/nanoarrow/ipc/reader.c
+ src/nanoarrow/ipc/writer.c)
endif()
add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
+ target_compile_definitions(nanoarrow_ipc PRIVATE
${NANOARROW_IPC_EXTRA_FLAGS})
target_link_libraries(nanoarrow_ipc
- PRIVATE flatccrt
+ PRIVATE flatccrt ${NANOARROW_IPC_EXTRA_LIBS}
PUBLIC nanoarrow nanoarrow_coverage_config)
target_include_directories(nanoarrow_ipc
PUBLIC
$<BUILD_INTERFACE:${NANOARROW_BUILD_INCLUDE_DIR}>
@@ -574,6 +586,7 @@ if(NANOARROW_BUILD_TESTS)
include(GoogleTest)
foreach(name
+ codecs
decoder
encoder
reader
@@ -587,6 +600,7 @@ if(NANOARROW_BUILD_TESTS)
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
+ gmock_main
nanoarrow_coverage_config)
if(NOT (name MATCHES "_hpp_"))
@@ -595,7 +609,11 @@ if(NANOARROW_BUILD_TESTS)
if(Arrow_FOUND)
target_compile_definitions(nanoarrow_ipc_${name}_test
- PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW)
+ PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW
+ ${NANOARROW_IPC_EXTRA_FLAGS})
+ else()
+ target_compile_definitions(nanoarrow_ipc_${name}_test
+ PRIVATE ${NANOARROW_IPC_EXTRA_FLAGS})
endif()
gtest_discover_tests(nanoarrow_ipc_${name}_test)
@@ -603,7 +621,6 @@ if(NANOARROW_BUILD_TESTS)
target_link_libraries(nanoarrow_ipc_files_test nanoarrow_testing ZLIB::ZLIB
nanoarrow_coverage_config)
- target_link_libraries(nanoarrow_ipc_decoder_test gmock_main)
endif()
if(NANOARROW_DEVICE)
diff --git a/ci/scripts/build-arrow-cpp-minimal.sh
b/ci/scripts/build-arrow-cpp-minimal.sh
index ff0238be..d31acac2 100755
--- a/ci/scripts/build-arrow-cpp-minimal.sh
+++ b/ci/scripts/build-arrow-cpp-minimal.sh
@@ -53,6 +53,7 @@ cmake ../apache-arrow-${ARROW_CPP_VERSION}/cpp \
-DARROW_JEMALLOC=OFF \
-DARROW_SIMD_LEVEL=NONE \
-DARROW_FILESYSTEM=OFF \
+ -DARROW_WITH_ZSTD=ON \
-DCMAKE_INSTALL_PREFIX="${ARROW_CPP_INSTALL_DIR}"
cmake --build . --parallel $(nproc)
cmake --install . --prefix="${ARROW_CPP_INSTALL_DIR}" --config=Debug
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index 746fcd37..0136422e 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -209,6 +209,7 @@ def bundle_nanoarrow_ipc(
nanoarrow_ipc_c = concatenate_content(
[
src_dir / "ipc" / "flatcc_generated.h",
+ src_dir / "ipc" / "codecs.c",
src_dir / "ipc" / "decoder.c",
src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
diff --git a/meson.build b/meson.build
index 53dd824d..f61c4008 100644
--- a/meson.build
+++ b/meson.build
@@ -109,6 +109,14 @@ nanoarrow_dep = declare_dependency(
if get_option('ipc')
flatcc_dep = dependency('flatcc')
+ ipc_lib_deps = [nanoarrow_dep, flatcc_dep]
+ ipc_lib_c_args = []
+
+ if get_option('ipc_with_zstd')
+ zstd_dep = dependency('libzstd')
+ ipc_lib_deps += zstd_dep
+ ipc_lib_c_args += '-DNANOARROW_IPC_WITH_ZSTD'
+ endif
install_headers(
'src/nanoarrow/nanoarrow_ipc.h',
@@ -118,12 +126,14 @@ if get_option('ipc')
nanoarrow_ipc_lib = library(
'nanoarrow_ipc',
+ 'src/nanoarrow/ipc/codecs.c',
'src/nanoarrow/ipc/decoder.c',
'src/nanoarrow/ipc/encoder.c',
'src/nanoarrow/ipc/reader.c',
'src/nanoarrow/ipc/writer.c',
- dependencies: [nanoarrow_dep, flatcc_dep],
+ dependencies: ipc_lib_deps,
install: true,
+ c_args: ipc_lib_c_args,
)
nanoarrow_ipc_dep = declare_dependency(
include_directories: [incdir],
diff --git a/meson.options b/meson.options
index 93fec950..739088ac 100644
--- a/meson.options
+++ b/meson.options
@@ -20,6 +20,7 @@ option('tests_with_arrow', type: 'boolean', description:
'Build tests with Arrow
option('benchmarks', type: 'boolean', description: 'Build benchmarks', value:
false)
option('apps', type: 'boolean', description: 'Build utility applications',
value: false)
option('ipc', type: 'boolean', description: 'Build IPC libraries', value:
false)
+option('ipc_with_zstd', type: 'boolean', description: 'Build IPC libraries
with ZSTD compression support', value: false)
option('integration_tests', type: 'boolean',
description: 'Build cross-implementation Arrow integration tests',
value: false)
diff --git a/python/meson.build b/python/meson.build
index 49fa67e4..d6e456ea 100644
--- a/python/meson.build
+++ b/python/meson.build
@@ -26,11 +26,14 @@ project(
'warning_level=2',
'c_std=c99',
'default_library=static',
+ 'force_fallback_for=zstd',
# We need to set these options at the project default_option level
# due to https://github.com/mesonbuild/meson/issues/6728
'arrow-nanoarrow:ipc=true',
+ 'arrow-nanoarrow:ipc_with_zstd=true',
'arrow-nanoarrow:device=true',
'arrow-nanoarrow:namespace=PythonPkg',
+ 'zstd:bin_programs=false',
],
)
diff --git a/python/src/nanoarrow/meson.build b/python/src/nanoarrow/meson.build
index 9b8c223b..1c38863c 100644
--- a/python/src/nanoarrow/meson.build
+++ b/python/src/nanoarrow/meson.build
@@ -16,6 +16,7 @@
# under the License.
flatcc_dep = dependency('flatcc')
+zstd_dep = dependency('libzstd')
nanoarrow_proj = subproject('arrow-nanoarrow')
nanoarrow_dep = nanoarrow_proj.get_variable('nanoarrow_dep')
nanoarrow_ipc_dep = nanoarrow_proj.get_variable('nanoarrow_ipc_dep')
@@ -77,7 +78,7 @@ foreach cyf : cyfiles
if stem in ['_array', '_device']
cyfile_deps += [nanoarrow_device_dep]
elif stem == '_ipc_lib'
- cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep]
+ cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep, zstd_dep]
endif
py.extension_module(
diff --git a/python/meson.build b/python/subprojects/zstd.wrap
similarity index 53%
copy from python/meson.build
copy to python/subprojects/zstd.wrap
index 49fa67e4..330530c3 100644
--- a/python/meson.build
+++ b/python/subprojects/zstd.wrap
@@ -15,25 +15,16 @@
# specific language governing permissions and limitations
# under the License.
-project(
- 'nanoarrow',
- 'c',
- 'cython',
- version: run_command(['src/nanoarrow/_version.py', '--print'], check:
true).stdout().strip(),
- license: 'Apache-2.0',
- meson_version: '>=1.2.0',
- default_options: [
- 'warning_level=2',
- 'c_std=c99',
- 'default_library=static',
- # We need to set these options at the project default_option level
- # due to https://github.com/mesonbuild/meson/issues/6728
- 'arrow-nanoarrow:ipc=true',
- 'arrow-nanoarrow:device=true',
- 'arrow-nanoarrow:namespace=PythonPkg',
- ],
-)
+[wrap-file]
+directory = zstd-1.5.6
+source_url =
https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz
+source_filename = zstd-1.5.6.tar.gz
+source_hash = 8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1
+patch_filename = zstd_1.5.6-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/zstd_1.5.6-2/get_patch
+patch_hash = 3e67f7d2edf3c56e6450d4c0f5f3d5fe94799e3608e3795502da03f7dd51b28c
+source_fallback_url =
https://github.com/mesonbuild/wrapdb/releases/download/zstd_1.5.6-2/zstd-1.5.6.tar.gz
+wrapdb_version = 1.5.6-2
-subdir('src/nanoarrow')
-
-meson.add_dist_script('python', meson.current_source_dir() /
'generate_dist.py')
+[provide]
+libzstd = libzstd_dep
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index 73ca88eb..8b691637 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -24,7 +24,7 @@ import pytest
from nanoarrow._utils import NanoarrowException
import nanoarrow as na
-from nanoarrow.ipc import InputStream, StreamWriter
+from nanoarrow.ipc import _EXAMPLE_IPC_SCHEMA, InputStream, StreamWriter
def test_ipc_stream_example():
@@ -92,6 +92,18 @@ def test_ipc_stream_from_url():
assert len(batches[0]) == 3
+def test_ipc_stream_compressed_example():
+ buf = io.BytesIO()
+ buf.write(_EXAMPLE_IPC_SCHEMA)
+ buf.write(COMPRESSED_BATCH)
+ buf.seek(0)
+
+ with InputStream.from_readable(buf) as inp:
+ array = na.Array(inp)
+ assert len(array) == 3
+ assert array.child(0).to_pylist() == [0, 1, 2]
+
+
def test_ipc_stream_python_exception_on_read():
class ExtraordinarilyInconvenientFile:
def readinto(self, *args, **kwargs):
@@ -231,3 +243,24 @@ def test_writer_error_on_write():
with pytest.raises(NanoarrowException):
with StreamWriter.from_writable(io.BytesIO()) as writer:
writer.write_stream(na.c_array([], na.int32()))
+
+
+# fmt: off
+COMPRESSED_BATCH = bytearray([
+ 0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00,
0x20, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00,
0x1e, 0x00,
+ 0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00,
0x50, 0x00,
+ 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00,
0x08, 0x00,
+ 0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x03, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x28, 0xb5, 0x2f, 0xfd,
0x20, 0x0c,
+ 0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00
+])
+# fmt: on
diff --git a/src/nanoarrow/ipc/codecs.c b/src/nanoarrow/ipc/codecs.c
new file mode 100644
index 00000000..552674ce
--- /dev/null
+++ b/src/nanoarrow/ipc/codecs.c
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <inttypes.h>
+
+#include "nanoarrow/nanoarrow_ipc.h"
+
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+#include <zstd.h>
+
+static ArrowErrorCode ArrowIpcDecompressZstd(struct ArrowBufferView src,
uint8_t* dst,
+ int64_t dst_size, struct
ArrowError* error) {
+ size_t code =
+ ZSTD_decompress((void*)dst, (size_t)dst_size, src.data.data,
src.size_bytes);
+ if (ZSTD_isError(code)) {
+ ArrowErrorSet(error,
+ "ZSTD_decompress([buffer with %" PRId64
+ " bytes] -> [buffer with %" PRId64 " bytes]) failed with
error '%s'",
+ src.size_bytes, dst_size, ZSTD_getErrorName(code));
+ return EIO;
+ }
+
+ if (dst_size != (int64_t)code) {
+ ArrowErrorSet(error,
+ "Expected decompressed size of %" PRId64 " bytes but got %"
PRId64
+ " bytes",
+ dst_size, (int64_t)code);
+ return EIO;
+ }
+
+ return NANOARROW_OK;
+}
+#endif
+
+ArrowIpcDecompressFunction ArrowIpcGetZstdDecompressionFunction(void) {
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+ return &ArrowIpcDecompressZstd;
+#else
+ return NULL;
+#endif
+}
+
+struct ArrowIpcSerialDecompressorPrivate {
+ ArrowIpcDecompressFunction decompress_functions[3];
+};
+
+static ArrowErrorCode ArrowIpcSerialDecompressorAdd(
+ struct ArrowIpcDecompressor* decompressor,
+ enum ArrowIpcCompressionType compression_type, struct ArrowBufferView src,
+ uint8_t* dst, int64_t dst_size, struct ArrowError* error) {
+ struct ArrowIpcSerialDecompressorPrivate* private_data =
+ (struct ArrowIpcSerialDecompressorPrivate*)decompressor->private_data;
+
+ ArrowIpcDecompressFunction fn = NULL;
+ switch (compression_type) {
+ case NANOARROW_IPC_COMPRESSION_TYPE_ZSTD:
+ case NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME:
+ fn = private_data->decompress_functions[compression_type];
+ break;
+ default:
+ ArrowErrorSet(error, "Unknown decompression type with value %d",
+ (int)compression_type);
+ return EINVAL;
+ }
+
+ if (fn == NULL) {
+ ArrowErrorSet(
+ error, "Compression type with value %d not supported by this build of
nanoarrow",
+ (int)compression_type);
+ return ENOTSUP;
+ }
+
+ NANOARROW_RETURN_NOT_OK(fn(src, dst, dst_size, error));
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcSerialDecompressorWait(
+ struct ArrowIpcDecompressor* decompressor, int64_t timeout_ms,
+ struct ArrowError* error) {
+ NANOARROW_UNUSED(decompressor);
+ NANOARROW_UNUSED(timeout_ms);
+ NANOARROW_UNUSED(error);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSerialDecompressorRelease(struct ArrowIpcDecompressor*
decompressor) {
+ ArrowFree(decompressor->private_data);
+ decompressor->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcSerialDecompressor(struct ArrowIpcDecompressor*
decompressor) {
+ decompressor->decompress_add = &ArrowIpcSerialDecompressorAdd;
+ decompressor->decompress_wait = &ArrowIpcSerialDecompressorWait;
+ decompressor->release = &ArrowIpcSerialDecompressorRelease;
+ decompressor->private_data =
+ ArrowMalloc(sizeof(struct ArrowIpcSerialDecompressorPrivate));
+ if (decompressor->private_data == NULL) {
+ return ENOMEM;
+ }
+
+ memset(decompressor->private_data, 0, sizeof(struct
ArrowIpcSerialDecompressorPrivate));
+ ArrowIpcSerialDecompressorSetFunction(decompressor,
NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+
ArrowIpcGetZstdDecompressionFunction());
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcSerialDecompressorSetFunction(
+ struct ArrowIpcDecompressor* decompressor,
+ enum ArrowIpcCompressionType compression_type,
+ ArrowIpcDecompressFunction decompress_function) {
+ struct ArrowIpcSerialDecompressorPrivate* private_data =
+ (struct ArrowIpcSerialDecompressorPrivate*)decompressor->private_data;
+
+ switch (compression_type) {
+ case NANOARROW_IPC_COMPRESSION_TYPE_ZSTD:
+ case NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME:
+ break;
+ default:
+ return EINVAL;
+ }
+
+ private_data->decompress_functions[compression_type] = decompress_function;
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/codecs_test.cc b/src/nanoarrow/ipc/codecs_test.cc
new file mode 100644
index 00000000..3c19f502
--- /dev/null
+++ b/src/nanoarrow/ipc/codecs_test.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstring>
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+// ZSTD compressed little endian int32s [0, 1, 2]
+const uint8_t kZstdCompressed012[] = {0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x0c, 0x61,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x02, 0x00, 0x00,
0x00};
+const uint8_t kZstdUncompressed012[] = {0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
+ 0x00, 0x00, 0x02, 0x00, 0x00, 0x00};
+
+TEST(NanoarrowIpcTest, NanoarrowIpcZstdBuildMatchesRuntime) {
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+ ASSERT_NE(ArrowIpcGetZstdDecompressionFunction(), nullptr);
+#else
+ ASSERT_EQ(ArrowIpcGetZstdDecompressionFunction(), nullptr);
+#endif
+}
+
+TEST(NanoarrowIpcTest, ZstdDecodeValidInput) {
+ auto decompress = ArrowIpcGetZstdDecompressionFunction();
+ if (!decompress) {
+ GTEST_SKIP() << "nanoarrow_ipc not built with NANOARROW_IPC_WITH_ZSTD";
+ }
+
+ // Empty->empty seems to work
+ struct ArrowError error {};
+ EXPECT_EQ(decompress({{nullptr}, 0}, nullptr, 0, &error), NANOARROW_OK);
+
+ // Check a decompress of a valid compressed buffer
+ uint8_t out[16];
+ std::memset(out, 0, sizeof(out));
+ ASSERT_EQ(decompress({{&kZstdCompressed012}, sizeof(kZstdCompressed012)},
out,
+ sizeof(kZstdUncompressed012), &error),
+ NANOARROW_OK)
+ << error.message;
+ EXPECT_TRUE(std::memcmp(out, kZstdUncompressed012,
sizeof(kZstdUncompressed012)) == 0);
+
+ ASSERT_EQ(decompress({{kZstdCompressed012}, sizeof(kZstdCompressed012)}, out,
+ sizeof(kZstdUncompressed012) + 1, &error),
+ EIO);
+ EXPECT_STREQ(error.message, "Expected decompressed size of 13 bytes but got
12 bytes");
+}
+
+TEST(NanoarrowIpcTest, ZstdDecodeInvalidInput) {
+ auto decompress = ArrowIpcGetZstdDecompressionFunction();
+ if (!decompress) {
+ GTEST_SKIP() << "nanoarrow_ipc not built with NANOARROW_IPC_WITH_ZSTD";
+ }
+
+ struct ArrowError error {};
+ const char* bad_data = "abcde";
+ EXPECT_EQ(decompress({{bad_data}, 5}, nullptr, 0, &error), EIO);
+ EXPECT_THAT(error.message,
+ ::testing::StartsWith("ZSTD_decompress([buffer with 5 bytes] ->
[buffer "
+ "with 0 bytes]) failed with error"));
+}
+
+TEST(NanoarrowIpcTest, SerialDecompressor) {
+ struct ArrowError error {};
+ nanoarrow::ipc::UniqueDecompressor decompressor;
+
+ ASSERT_EQ(ArrowIpcSerialDecompressor(decompressor.get()), NANOARROW_OK);
+
+ // Check the function setter error
+ ASSERT_EQ(ArrowIpcSerialDecompressorSetFunction(
+ decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_NONE,
nullptr),
+ EINVAL);
+
+ // The serial decompressor never waits and always succeeds when requested to
+ EXPECT_EQ(decompressor->decompress_wait(decompressor.get(), 0, &error),
NANOARROW_OK);
+
+ // Check a decompress for a supported codec if we have one (or for an error
if we don't)
+ uint8_t out[12];
+ std::memset(out, 0, sizeof(out));
+ if (ArrowIpcGetZstdDecompressionFunction() != nullptr) {
+ EXPECT_EQ(decompressor->decompress_add(
+ decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+ {{&kZstdCompressed012}, sizeof(kZstdCompressed012)}, out,
sizeof(out),
+ &error),
+ NANOARROW_OK);
+ } else {
+ EXPECT_EQ(decompressor->decompress_add(decompressor.get(),
+ NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+ {{nullptr}, 0}, nullptr, 0, &error),
+ ENOTSUP);
+ EXPECT_STREQ(
+ error.message,
+ "Compression type with value 2 not supported by this build of
nanoarrow");
+ }
+
+ // Either way, if we explicitly remove support for a codec, we should get an
error
+ ASSERT_EQ(ArrowIpcSerialDecompressorSetFunction(
+ decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(decompressor->decompress_add(decompressor.get(),
+ NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+ {{nullptr}, 0}, nullptr, 0, &error),
+ ENOTSUP);
+ EXPECT_STREQ(error.message,
+ "Compression type with value 2 not supported by this build of
nanoarrow");
+}
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index b74b4e0e..606bea81 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -97,6 +97,8 @@ struct ArrowIpcDecoderPrivate {
const void* last_message;
// Storage for a Footer
struct ArrowIpcFooter footer;
+ // Decompressor for compression support
+ struct ArrowIpcDecompressor decompressor;
};
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
@@ -243,6 +245,28 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder*
decoder) {
return NANOARROW_OK;
}
+static ArrowErrorCode ArrowIpcDecoderInitDecompressor(
+ struct ArrowIpcDecoderPrivate* private_data) {
+ if (private_data->decompressor.release == NULL) {
+
NANOARROW_RETURN_NOT_OK(ArrowIpcSerialDecompressor(&private_data->decompressor));
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderSetDecompressor(struct ArrowIpcDecoder* decoder,
+ struct ArrowIpcDecompressor*
decompressor) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (private_data->decompressor.release != NULL) {
+ private_data->decompressor.release(&private_data->decompressor);
+ }
+
+ memcpy(&private_data->decompressor, decompressor, sizeof(struct
ArrowIpcDecompressor));
+ return NANOARROW_OK;
+}
+
void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -263,6 +287,10 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder)
{
ArrowIpcFooterReset(&private_data->footer);
+ if (private_data->decompressor.release != NULL) {
+ private_data->decompressor.release(&private_data->decompressor);
+ }
+
ArrowFree(private_data);
memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
}
@@ -1450,6 +1478,10 @@ struct ArrowIpcBufferFactory {
struct ArrowBufferView* dst_view, struct
ArrowBuffer* dst,
struct ArrowError* error);
+ /// \brief Caller provided decompressor instance to which any decompression
requests
+ /// should be made.
+ struct ArrowIpcDecompressor* decompressor;
+
/// \brief Caller-defined private data to be used in the callback.
///
/// Usually this would be a description of where the body has been read into
memory or
@@ -1457,18 +1489,81 @@ struct ArrowIpcBufferFactory {
void* private_data;
};
+static ArrowErrorCode ArrowIpcDecompressBufferFromView(
+ struct ArrowIpcDecompressor* decompressor,
+ enum ArrowIpcCompressionType compression_type, struct ArrowBufferView src,
+ struct ArrowBuffer* dst, int* needs_decompression, struct ArrowError*
error) {
+ if (src.size_bytes < (int64_t)sizeof(int64_t)) {
+ ArrowErrorSet(
+ error,
+ "Buffer size must be >= sizeof(int64_t) when buffer compression is
enabled");
+ return EINVAL;
+ }
+
+ // When body compression is enabled, buffers are prefixed with a little
endian
+ // signed 64-bit integer that is the uncompressed body length.
+ int64_t uncompressed_size;
+ memcpy(&uncompressed_size, src.data.data, sizeof(int64_t));
+ if (ArrowIpcSystemEndianness() != NANOARROW_IPC_ENDIANNESS_LITTLE) {
+ uncompressed_size = (int64_t)bswap64(uncompressed_size);
+ }
+
+ // Sentinel for "body compression was enabled but this buffer is not
compressed" is -1
+ if (uncompressed_size == -1) {
+ *needs_decompression = 0;
+ return NANOARROW_OK;
+ }
+
+ if (uncompressed_size < 0) {
+ ArrowErrorSet(error,
+ "Decompressed buffer size must be -1 or >= 0 bytes but was
%" PRId64,
+ uncompressed_size);
+ return EINVAL;
+ }
+
+ // Prepare the source and destination
+ src.data.as_uint8 += sizeof(int64_t);
+ src.size_bytes -= sizeof(int64_t);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferResize(dst, uncompressed_size, 0));
+
+ // Add the task to the decompressor (this may execute synchronously for some
+ // decompressors)
+ NANOARROW_RETURN_NOT_OK(decompressor->decompress_add(
+ decompressor, compression_type, src, dst->data, uncompressed_size,
error));
+
+ // Pass on that we handled the decompression
+ *needs_decompression = 1;
+ return NANOARROW_OK;
+}
+
static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory*
factory,
struct ArrowIpcBufferSource*
src,
struct ArrowBufferView*
dst_view,
struct ArrowBuffer* dst,
struct ArrowError* error) {
- NANOARROW_UNUSED(factory);
- NANOARROW_UNUSED(dst);
- NANOARROW_UNUSED(error);
-
struct ArrowBufferView* body = (struct
ArrowBufferView*)factory->private_data;
- dst_view->data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
- dst_view->size_bytes = src->buffer_length_bytes;
+
+ struct ArrowBufferView src_view;
+ src_view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+ src_view.size_bytes = src->buffer_length_bytes;
+
+ int needs_decompression = 0;
+ int uncompressed_data_offset = 0;
+ if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView(
+ factory->decompressor, src->codec, src_view, dst,
&needs_decompression, error));
+ uncompressed_data_offset += sizeof(int64_t);
+ }
+
+ if (!needs_decompression) {
+ *dst_view = src_view;
+ dst_view->data.as_uint8 += uncompressed_data_offset;
+ dst_view->size_bytes -= uncompressed_data_offset;
+ } else {
+ dst_view->data.data = dst->data;
+ dst_view->size_bytes = dst->size_bytes;
+ }
+
return NANOARROW_OK;
}
@@ -1476,6 +1571,7 @@ static struct ArrowIpcBufferFactory
ArrowIpcBufferFactoryFromView(
struct ArrowBufferView* buffer_view) {
struct ArrowIpcBufferFactory out;
out.make_buffer = &ArrowIpcMakeBufferFromView;
+ out.decompressor = NULL;
out.private_data = buffer_view;
return out;
}
@@ -1485,14 +1581,27 @@ static ArrowErrorCode
ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory*
struct ArrowBufferView*
dst_view,
struct ArrowBuffer* dst,
struct ArrowError* error) {
- NANOARROW_UNUSED(error);
-
struct ArrowIpcSharedBuffer* shared =
(struct ArrowIpcSharedBuffer*)factory->private_data;
- ArrowBufferReset(dst);
- ArrowIpcSharedBufferClone(shared, dst);
- dst->data += src->body_offset_bytes;
- dst->size_bytes = src->buffer_length_bytes;
+
+ int needs_decompression = 0;
+ int uncompressed_data_offset = 0;
+ if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ struct ArrowBufferView src_view;
+ src_view.data.as_uint8 = shared->private_src.data + src->body_offset_bytes;
+ src_view.size_bytes = src->buffer_length_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView(
+ factory->decompressor, src->codec, src_view, dst,
&needs_decompression, error));
+ uncompressed_data_offset += sizeof(int64_t);
+ }
+
+ if (!needs_decompression) {
+ ArrowBufferReset(dst);
+ ArrowIpcSharedBufferClone(shared, dst);
+ dst->data += src->body_offset_bytes + uncompressed_data_offset;
+ dst->size_bytes = src->buffer_length_bytes - uncompressed_data_offset;
+ }
+
dst_view->data.data = dst->data;
dst_view->size_bytes = dst->size_bytes;
return NANOARROW_OK;
@@ -1502,6 +1611,7 @@ static struct ArrowIpcBufferFactory
ArrowIpcBufferFactoryFromShared(
struct ArrowIpcSharedBuffer* shared) {
struct ArrowIpcBufferFactory out;
out.make_buffer = &ArrowIpcMakeBufferFromShared;
+ out.decompressor = NULL;
out.private_data = shared;
return out;
}
@@ -1663,14 +1773,6 @@ static int ArrowIpcDecoderMakeBuffer(struct
ArrowIpcArraySetter* setter, int64_t
return EINVAL;
}
- // If the ArrowIpcBufferFactory is made public, these should get moved
(since then a
- // user could inject support for either one). More likely, by the time that
happens,
- // this library will be able to support some of these features.
- if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
- ArrowErrorSet(error, "The nanoarrow_ipc extension does not support
compression");
- return ENOTSUP;
- }
-
setter->src.body_offset_bytes = offset;
setter->src.buffer_length_bytes = length;
NANOARROW_RETURN_NOT_OK(
@@ -1856,6 +1958,13 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
setter.version = decoder->metadata_version;
+ // If we are going to need a decompressor here, ensure the default one is
+ // initialized.
+ if (setter.src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderInitDecompressor(private_data));
+ setter.factory.decompressor = &private_data->decompressor;
+ }
+
// The flatbuffers FieldNode doesn't count the root struct so we have to
loop over the
// children ourselves
if (field_i == -1) {
@@ -1873,6 +1982,13 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
ArrowIpcDecoderWalkSetArrayView(&setter, root->array_view,
root->array, error));
}
+ // If we decoded a compressed message, wait for any pending decompression
tasks to
+ // complete. The default compressor already performed the decompression
+ if (setter.factory.decompressor != NULL) {
+ NANOARROW_RETURN_NOT_OK(setter.factory.decompressor->decompress_wait(
+ setter.factory.decompressor, -1, error));
+ }
+
*out_view = root->array_view;
return NANOARROW_OK;
}
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index f59e52c5..b3856656 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -24,6 +24,7 @@
#include <arrow/util/key_value_metadata.h>
#endif
#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-spi.h>
#include <gtest/gtest.h>
// For bswap32()
@@ -53,6 +54,7 @@ struct ArrowIpcDecoderPrivate {
int64_t n_buffers;
const void* last_message;
struct ArrowIpcFooter footer;
+ struct ArrowIpcDecompressor decompressor;
};
}
@@ -105,6 +107,40 @@ static uint8_t kSimpleRecordBatch[] = {
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+static uint8_t kSimpleRecordBatchCompressed[] = {
+ 0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00,
0x20, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00,
0x1e, 0x00,
+ 0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00,
0x50, 0x00,
+ 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00,
0x08, 0x00,
+ 0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x03, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x28, 0xb5, 0x2f, 0xfd,
0x20, 0x0c,
+ 0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00};
+
+static uint8_t kSimpleRecordBatchUncompressible[] = {
+ 0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00,
0x20, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00,
0x1e, 0x00,
+ 0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00,
0x50, 0x00,
+ 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00,
0x08, 0x00,
+ 0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x03, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00,
+ 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00};
+
TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
@@ -318,7 +354,66 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
ArrowIpcDecoderReset(&decoder);
}
+void TestDecodeInt32Batch(const uint8_t* batch, size_t batch_len,
+ const std::vector<int32_t> values) {
+ nanoarrow::ipc::UniqueDecoder decoder;
+ nanoarrow::UniqueSchema schema;
+
+ struct ArrowError error;
+ struct ArrowArrayView* array_view;
+
+ ArrowSchemaInit(schema.get());
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(schema.get(), 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
+
+ struct ArrowBufferView data;
+ data.data.as_uint8 = batch;
+ data.size_bytes = static_cast<int64_t>(batch_len);
+
+ ASSERT_EQ(ArrowIpcDecoderInit(decoder.get()), NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderSetSchema(decoder.get(), schema.get(), &error),
NANOARROW_OK);
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(), data, &error),
NANOARROW_OK);
+ struct ArrowBufferView body;
+ body.data.as_uint8 = batch + decoder->header_size_bytes;
+ body.size_bytes = decoder->body_size_bytes;
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(decoder.get(), body, 0,
&array_view, &error),
+ NANOARROW_OK)
+ << error.message;
+ ASSERT_EQ(array_view->length, values.size());
+ int64_t index = 0;
+ for (const auto value : values) {
+ SCOPED_TRACE(std::string("Array index ") + std::to_string(index));
+ EXPECT_EQ(ArrowArrayViewGetIntUnsafe(array_view, index), value);
+ index++;
+ }
+}
+
TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+ ASSERT_NO_FATAL_FAILURE(
+ TestDecodeInt32Batch(kSimpleRecordBatch, sizeof(kSimpleRecordBatch), {1,
2, 3}));
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeCompressedRecordBatch) {
+ if (ArrowIpcGetZstdDecompressionFunction() == nullptr) {
+ EXPECT_FATAL_FAILURE(
+ TestDecodeInt32Batch(kSimpleRecordBatchCompressed,
+ sizeof(kSimpleRecordBatchCompressed), {0, 1, 2}),
+ "Compression type with value 2 not supported by this build of
nanoarrow");
+ } else {
+ ASSERT_NO_FATAL_FAILURE(TestDecodeInt32Batch(
+ kSimpleRecordBatchCompressed, sizeof(kSimpleRecordBatchCompressed),
{0, 1, 2}));
+ }
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeUncompressibleRecordBatch) {
+
ASSERT_NO_FATAL_FAILURE(TestDecodeInt32Batch(kSimpleRecordBatchUncompressible,
+
sizeof(kSimpleRecordBatchUncompressible),
+ {0, 1, 2}));
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchErrors) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
struct ArrowSchema schema;
@@ -387,14 +482,6 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeSimpleRecordBatch) {
ArrowArrayRelease(&array);
- // Field extract should fail if compression was set
- decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
- EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
- NANOARROW_VALIDATION_LEVEL_FULL,
&error),
- ENOTSUP);
- EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support
compression");
- decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
-
// Field extract should fail if body is too small
decoder.body_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
@@ -508,7 +595,7 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowTypeRoundtrip) {
#endif
std::string ArrowSchemaMetadataToString(const char* metadata) {
- struct ArrowMetadataReader reader {};
+ struct ArrowMetadataReader reader;
auto st = ArrowMetadataReaderInit(&reader, metadata);
EXPECT_EQ(st, NANOARROW_OK);
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index cb505ce2..5d39ec41 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -33,8 +33,6 @@
#include "nanoarrow/nanoarrow_ipc.hpp"
#include "nanoarrow/nanoarrow_testing.hpp"
-#include "flatcc/portable/pendian_detect.h"
-
using namespace arrow;
// Helpers for reporting Arrow C++ Result failures
@@ -85,7 +83,7 @@ class TestFile {
}
std::string CheckJSONGzFile() {
- size_t dot_pos = path_.find('.');
+ size_t dot_pos = path_.rfind('.');
return path_.substr(0, dot_pos) + std::string(".json.gz");
}
@@ -401,12 +399,12 @@ ArrowErrorCode InitArrowTestingPath(std::ostream&
builder, ArrowError* error) {
return NANOARROW_OK;
}
-class TestFileFixture : public ::testing::TestWithParam<TestFile> {
+class TestEndianFileFixture : public ::testing::TestWithParam<TestFile> {
protected:
TestFile test_file;
};
-TEST_P(TestFileFixture, NanoarrowIpcTestFileNativeEndian) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileNativeEndian) {
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -423,7 +421,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileNativeEndian) {
param.TestEqualsArrowCpp(dir_builder.str());
}
-TEST_P(TestFileFixture, NanoarrowIpcTestFileSwapEndian) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileSwapEndian) {
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -440,7 +438,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileSwapEndian) {
param.TestEqualsArrowCpp(dir_builder.str());
}
-TEST_P(TestFileFixture, NanoarrowIpcTestFileCheckJSON) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileCheckJSON) {
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -455,7 +453,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileCheckJSON) {
}
INSTANTIATE_TEST_SUITE_P(
- NanoarrowIpcTest, TestFileFixture,
+ NanoarrowIpcTest, TestEndianFileFixture,
::testing::Values(
// Files in data/arrow-ipc-stream/integration/1.0.0-(little|big)endian/
// should read without error and the data should match Arrow C++'s
read.
@@ -495,4 +493,57 @@ INSTANTIATE_TEST_SUITE_P(
"Schema message field with DictionaryEncoding not supported")
// Comment to keep last line from wrapping
));
+
+// Files not related to endianness (i.e., only need testing once)
+class TestFileFixture : public ::testing::TestWithParam<TestFile> {
+ protected:
+ TestFile test_file;
+};
+
+TEST_P(TestFileFixture, NanoarrowIpcTestFileEqualsArrowCpp) {
+ std::stringstream dir_builder;
+ ArrowError error;
+ ArrowErrorInit(&error);
+ if (InitArrowTestingPath(dir_builder, &error) != NANOARROW_OK) {
+ GTEST_SKIP() << error.message;
+ }
+
+ dir_builder << "/data/arrow-ipc-stream/integration/";
+ TestFile param = GetParam();
+ param.TestEqualsArrowCpp(dir_builder.str());
+}
+
+TEST_P(TestFileFixture, NanoarrowIpcTestFileIPCCheckJSON) {
+ std::stringstream dir_builder;
+ ArrowError error;
+ ArrowErrorInit(&error);
+ if (InitArrowTestingPath(dir_builder, &error) != NANOARROW_OK) {
+ GTEST_SKIP() << error.message;
+ }
+
+ dir_builder << "/data/arrow-ipc-stream/integration/";
+ TestFile param = GetParam();
+ param.TestIPCCheckJSON(dir_builder.str());
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ NanoarrowIpcTest, TestFileFixture,
+ ::testing::Values(
+// Testing of other files
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+ TestFile::OK("2.0.0-compression/generated_uncompressible_zstd.stream"),
+ TestFile::OK("2.0.0-compression/generated_zstd.stream"),
+#endif
+ TestFile::OK("0.17.1/generated_union.stream"),
+ TestFile::OK("0.14.1/generated_datetime.stream"),
+ TestFile::OK("0.14.1/generated_decimal.stream"),
+ TestFile::OK("0.14.1/generated_interval.stream"),
+ TestFile::OK("0.14.1/generated_map.stream"),
+ TestFile::OK("0.14.1/generated_nested.stream"),
+ TestFile::OK("0.14.1/generated_primitive.stream"),
+ TestFile::OK("0.14.1/generated_primitive_no_batches.stream"),
+ TestFile::OK("0.14.1/generated_primitive_zerolength.stream")
+ // Comment to keep line from wrapping
+ ));
+
#endif
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index fcf5ef88..4cb4c05e 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -29,8 +29,16 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
#define ArrowIpcSharedBufferReset \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
+#define ArrowIpcGetZstdDecompressionFunction \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcGetZstdDecompressionFunction)
+#define ArrowIpcSerialDecompressor \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSerialDecompressor)
+#define ArrowIpcSerialDecompressorSetFunction \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSerialDecompressorSetFunction)
#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderInit)
#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderReset)
+#define ArrowIpcDecoderSetDecompressor \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetDecompressor)
#define ArrowIpcDecoderPeekHeader \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderPeekHeader)
#define ArrowIpcDecoderVerifyHeader \
@@ -194,6 +202,69 @@ void ArrowIpcSharedBufferReset(struct
ArrowIpcSharedBuffer* shared);
/// the resulting arrays must not be passed to other threads to be released.
int ArrowIpcSharedBufferIsThreadSafe(void);
+/// \brief A user-extensible decompressor
+///
+/// The ArrowIpcDecompressor is the underlying object that enables
decompression in the
+/// ArrowIpcDecoder. Its structure allows it to be backed by a multithreaded
+/// implementation; however, this is not required and the default
implementation does not
+/// implement this. An implementation of a decompressor may support more than
one
+/// ArrowIpcCompressionType.
+struct ArrowIpcDecompressor {
+ /// \brief Queue a buffer for decompression
+ ///
+ /// The values pointed to by dst and dst_size after a call to decompress_add
+ /// are undefined until the next call to decompress_wait returns
NANOARROW_OK.
+ ArrowErrorCode (*decompress_add)(struct ArrowIpcDecompressor* decompressor,
+ enum ArrowIpcCompressionType
compression_type,
+ struct ArrowBufferView src, uint8_t* dst,
+ int64_t dst_size, struct ArrowError* error);
+
+ /// \brief Wait for any unfinished calls to decompress_add to complete
+ ///
+ /// Returns NANOARROW_OK if all pending calls completed. Returns ETIMEOUT
+ /// if not all remaining calls completed.
+ ArrowErrorCode (*decompress_wait)(struct ArrowIpcDecompressor* decompressor,
+ int64_t timeout_ms, struct ArrowError*
error);
+
+ /// \brief Release the decompressor and any resources it may be holding
+ ///
+ /// Release callback implementations must set the release member to NULL.
+ /// Callers must check that the release callback is not NULL before calling
+ /// decompress() or release().
+ void (*release)(struct ArrowIpcDecompressor* decompressor);
+
+ /// \brief Implementation-specific opaque data
+ void* private_data;
+};
+
+/// \brief A self-contained decompression function
+///
+/// For the most common compression type, ZSTD, this function is sufficient to
+/// capture the type of decompression that Arrow IPC requires (i.e.,
decompression
+/// where the uncompressed size was recorded). For other compression types, it
+/// may be more efficient to implement a full ArrowIpcDecompressor, which
allows
+/// for persistent state/allocations between decodes.
+typedef ArrowErrorCode (*ArrowIpcDecompressFunction)(struct ArrowBufferView
src,
+ uint8_t* dst, int64_t
dst_size,
+ struct ArrowError* error);
+
+/// \brief Get the decompression function for ZSTD
+///
+/// The result will be NULL if nanoarrow was not built with
NANOARROW_IPC_WITH_ZSTD.
+ArrowIpcDecompressFunction ArrowIpcGetZstdDecompressionFunction(void);
+
+/// \brief An ArrowIpcDecompressor implementation that performs decompression
in serial
+ArrowErrorCode ArrowIpcSerialDecompressor(struct ArrowIpcDecompressor*
decompressor);
+
+/// \brief Override the ArrowIpcDecompressFunction used for a specific
compression type
+///
+/// This may be used to inject support for a particular type of decompression
if used
+/// with a version of nanoarrow with unknown or minimal capabilities.
+ArrowErrorCode ArrowIpcSerialDecompressorSetFunction(
+ struct ArrowIpcDecompressor* decompressor,
+ enum ArrowIpcCompressionType compression_type,
+ ArrowIpcDecompressFunction decompress_function);
+
/// \brief Decoder for Arrow IPC messages
///
/// This structure is intended to be allocated by the caller,
@@ -244,6 +315,10 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder*
decoder);
/// \brief Release all resources attached to a decoder
void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
+/// \brief Set the decompressor implementation used by this decoder
+ArrowErrorCode ArrowIpcDecoderSetDecompressor(struct ArrowIpcDecoder* decoder,
+ struct ArrowIpcDecompressor*
decompressor);
+
/// \brief Peek at a message header
///
/// The first 8 bytes of an Arrow IPC message are 0xFFFFFFFF followed by the
size
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index e1b90cf1..ce2c03dc 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -73,6 +73,25 @@ inline void release_pointer(struct ArrowIpcEncoder* data) {
ArrowIpcEncoderReset(data);
}
+template <>
+inline void init_pointer(struct ArrowIpcDecompressor* data) {
+ data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcDecompressor* src,
+ struct ArrowIpcDecompressor* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcDecompressor));
+ src->release = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcDecompressor* data) {
+ if (data->release != nullptr) {
+ data->release(data);
+ }
+}
+
template <>
inline void init_pointer(struct ArrowIpcInputStream* data) {
data->release = nullptr;
@@ -150,6 +169,9 @@ using UniqueFooter = internal::Unique<struct
ArrowIpcFooter>;
/// \brief Class wrapping a unique struct ArrowIpcEncoder
using UniqueEncoder = internal::Unique<struct ArrowIpcEncoder>;
+/// \brief Class wrapping a unique struct ArrowIpcDecompressor
+using UniqueDecompressor = internal::Unique<struct ArrowIpcDecompressor>;
+
/// \brief Class wrapping a unique struct ArrowIpcInputStream
using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;
diff --git a/python/meson.build b/subprojects/zstd.wrap
similarity index 53%
copy from python/meson.build
copy to subprojects/zstd.wrap
index 49fa67e4..330530c3 100644
--- a/python/meson.build
+++ b/subprojects/zstd.wrap
@@ -15,25 +15,16 @@
# specific language governing permissions and limitations
# under the License.
-project(
- 'nanoarrow',
- 'c',
- 'cython',
- version: run_command(['src/nanoarrow/_version.py', '--print'], check:
true).stdout().strip(),
- license: 'Apache-2.0',
- meson_version: '>=1.2.0',
- default_options: [
- 'warning_level=2',
- 'c_std=c99',
- 'default_library=static',
- # We need to set these options at the project default_option level
- # due to https://github.com/mesonbuild/meson/issues/6728
- 'arrow-nanoarrow:ipc=true',
- 'arrow-nanoarrow:device=true',
- 'arrow-nanoarrow:namespace=PythonPkg',
- ],
-)
+[wrap-file]
+directory = zstd-1.5.6
+source_url =
https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz
+source_filename = zstd-1.5.6.tar.gz
+source_hash = 8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1
+patch_filename = zstd_1.5.6-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/zstd_1.5.6-2/get_patch
+patch_hash = 3e67f7d2edf3c56e6450d4c0f5f3d5fe94799e3608e3795502da03f7dd51b28c
+source_fallback_url =
https://github.com/mesonbuild/wrapdb/releases/download/zstd_1.5.6-2/zstd-1.5.6.tar.gz
+wrapdb_version = 1.5.6-2
-subdir('src/nanoarrow')
-
-meson.add_dist_script('python', meson.current_source_dir() /
'generate_dist.py')
+[provide]
+libzstd = libzstd_dep