This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd437a95 feat: Add ZSTD decompression support to IPC reader (#693)
dd437a95 is described below

commit dd437a958e2e7dd414478fd8eb805555597fc8cf
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Dec 26 20:00:30 2024 -0600

    feat: Add ZSTD decompression support to IPC reader (#693)
    
    This PR implements ZSTD buffer decompression in the `ArrowIpcDecoder`
    and in the `ArrowArrayStreamReader` when built with
    `-DNANOARROW_IPC_WITH_ZSTD=ON`. It also allows a user to inject support
    for these into the `ArrowIpcDecoder` if for whatever reason they don't
    have control over the build flags (or want to use ZSTD that has been
    made available to them in a different way).
    
    This doesn't implement multithreaded decompression but does allow a user
    to implement it by not using the default `ArrowIpcSerialDecompressor()`.
    This could be included in header-only C++ if there is some interest.
    
    A non-trivial example in Python bindings (where were also wired up to
    support it):
    
    ```python
    import urllib.request
    import nanoarrow as na
    url = 
"https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-basin_point-wkb.arrow";
    
    # Work around the 'no arrow file support'
    with urllib.request.urlopen(url) as f:
        f.read(8)
        print(na.ArrayStream.from_readable(f).read_all())
    #> nanoarrow.Array<non-nullable struct<OBJECTID: int64, FEAT_CODE: string, 
...>[46]
    #> {'OBJECTID': 1, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EB000', 'RIVER': 
'BAR...
    #> {'OBJECTID': 2, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EC000', 'RIVER': 
'ROS...
    #> {'OBJECTID': 3, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EA000', 'RIVER': 
'TUS...
    #> {'OBJECTID': 4, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01DA000', 'RIVER': 
'MET...
    #> {'OBJECTID': 5, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01ED000', 'RIVER': 
'MER...
    #> {'OBJECTID': 6, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EE000', 'RIVER': 
'HER...
    #> {'OBJECTID': 7, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EG000', 'RIVER': 
'GOL...
    #> {'OBJECTID': 8, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EF000', 'RIVER': 
'LAH...
    #> {'OBJECTID': 9, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EJ000', 'RIVER': 
'SAC...
    #> {'OBJECTID': 10, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EH000', 
'RIVER': 'EA...
    #> ...and 36 more items
    ```
    
    This PR doesn't implement the R bindings (because adding a zstd
    dependency there is a can of worms better suited to another PR).
---
 .github/workflows/build-and-test-ipc.yaml     |   6 +-
 .github/workflows/build-and-test.yaml         |   4 +-
 CMakeLists.txt                                |  27 ++++-
 ci/scripts/build-arrow-cpp-minimal.sh         |   1 +
 ci/scripts/bundle.py                          |   1 +
 meson.build                                   |  12 +-
 meson.options                                 |   1 +
 python/meson.build                            |   3 +
 python/src/nanoarrow/meson.build              |   3 +-
 python/{meson.build => subprojects/zstd.wrap} |  33 ++----
 python/tests/test_ipc.py                      |  35 +++++-
 src/nanoarrow/ipc/codecs.c                    | 138 +++++++++++++++++++++++
 src/nanoarrow/ipc/codecs_test.cc              | 122 ++++++++++++++++++++
 src/nanoarrow/ipc/decoder.c                   | 156 ++++++++++++++++++++++----
 src/nanoarrow/ipc/decoder_test.cc             | 105 +++++++++++++++--
 src/nanoarrow/ipc/files_test.cc               |  67 +++++++++--
 src/nanoarrow/nanoarrow_ipc.h                 |  75 +++++++++++++
 src/nanoarrow/nanoarrow_ipc.hpp               |  22 ++++
 python/meson.build => subprojects/zstd.wrap   |  33 ++----
 19 files changed, 753 insertions(+), 91 deletions(-)

diff --git a/.github/workflows/build-and-test-ipc.yaml 
b/.github/workflows/build-and-test-ipc.yaml
index 6defa4bd..ee4332d8 100644
--- a/.github/workflows/build-and-test-ipc.yaml
+++ b/.github/workflows/build-and-test-ipc.yaml
@@ -43,7 +43,7 @@ jobs:
       fail-fast: false
       matrix:
         config:
-          - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON"}
+          - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON 
-DNANOARROW_IPC_WITH_ZSTD=ON"}
           - {label: default-noatomics, cmake_args: 
"-DCMAKE_C_FLAGS='-DNANOARROW_IPC_USE_STDATOMIC=0'"}
           - {label: namespaced-build, cmake_args: 
"-DNANOARROW_NAMESPACE=SomeUserNamespace"}
           - {label: bundled-build, cmake_args: "-DNANOARROW_BUNDLE=ON"}
@@ -72,13 +72,13 @@ jobs:
         with:
           path: arrow
           # Bump the number at the end of this line to force a new Arrow C++ 
build
-          key: arrow-${{ runner.os }}-${{ runner.arch }}-1
+          key: arrow-${{ runner.os }}-${{ runner.arch }}-3
 
       - name: Build Arrow C++
         if: steps.cache-arrow-build.outputs.cache-hit != 'true'
         shell: bash
         run: |
-          ci/scripts/build-arrow-cpp-minimal.sh 18.0.0 arrow
+          ci/scripts/build-arrow-cpp-minimal.sh 18.1.0 arrow
 
       - name: Build
         run: |
diff --git a/.github/workflows/build-and-test.yaml 
b/.github/workflows/build-and-test.yaml
index ab9b8e8e..8814e910 100644
--- a/.github/workflows/build-and-test.yaml
+++ b/.github/workflows/build-and-test.yaml
@@ -127,7 +127,9 @@ jobs:
 
   verify-meson:
     name: meson-build
-    runs-on: ubuntu-latest
+    # Workaround until https://github.com/apache/arrow-nanoarrow/issues/663 is 
solved
+    # (after which we can use ubuntu-latest)
+    runs-on: ubuntu-22.04
     steps:
       - uses: actions/checkout@v4
       - uses: actions/setup-python@v5
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a24f7873..c77007d2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -36,6 +36,8 @@ option(NANOARROW_FLATCC_ROOT_DIR "Root directory for flatcc 
include and lib dire
        OFF)
 option(NANOARROW_FLATCC_INCLUDE_DIR "Include directory for flatcc includes" 
OFF)
 option(NANOARROW_FLATCC_LIB_DIR "Library directory that contains 
libflatccrt.a" OFF)
+option(NANOARROW_IPC_WITH_ZSTD "Build nanoarrow with ZSTD compression support 
built in"
+       OFF)
 
 option(NANOARROW_DEVICE "Build device extension" OFF)
 option(NANOARROW_TESTING "Build testng extension" OFF)
@@ -205,15 +207,25 @@ if(NANOARROW_IPC)
                                      "${NANOARROW_FLATCC_INCLUDE_DIR}")
   endif()
 
+  if(NANOARROW_IPC_WITH_ZSTD)
+    find_package(zstd REQUIRED)
+    set(NANOARROW_IPC_EXTRA_FLAGS "-DNANOARROW_IPC_WITH_ZSTD")
+    set(NANOARROW_IPC_EXTRA_LIBS zstd::libzstd_static)
+  endif()
+
   if(NOT NANOARROW_BUNDLE)
     set(NANOARROW_IPC_BUILD_SOURCES
-        src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
-        src/nanoarrow/ipc/reader.c src/nanoarrow/ipc/writer.c)
+        src/nanoarrow/ipc/codecs.c
+        src/nanoarrow/ipc/decoder.c
+        src/nanoarrow/ipc/encoder.c
+        src/nanoarrow/ipc/reader.c
+        src/nanoarrow/ipc/writer.c)
   endif()
 
   add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
+  target_compile_definitions(nanoarrow_ipc PRIVATE 
${NANOARROW_IPC_EXTRA_FLAGS})
   target_link_libraries(nanoarrow_ipc
-                        PRIVATE flatccrt
+                        PRIVATE flatccrt ${NANOARROW_IPC_EXTRA_LIBS}
                         PUBLIC nanoarrow nanoarrow_coverage_config)
   target_include_directories(nanoarrow_ipc
                              PUBLIC 
$<BUILD_INTERFACE:${NANOARROW_BUILD_INCLUDE_DIR}>
@@ -574,6 +586,7 @@ if(NANOARROW_BUILD_TESTS)
     include(GoogleTest)
 
     foreach(name
+            codecs
             decoder
             encoder
             reader
@@ -587,6 +600,7 @@ if(NANOARROW_BUILD_TESTS)
                             nanoarrow
                             ${NANOARROW_ARROW_TARGET}
                             gtest_main
+                            gmock_main
                             nanoarrow_coverage_config)
 
       if(NOT (name MATCHES "_hpp_"))
@@ -595,7 +609,11 @@ if(NANOARROW_BUILD_TESTS)
 
       if(Arrow_FOUND)
         target_compile_definitions(nanoarrow_ipc_${name}_test
-                                   PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW)
+                                   PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW
+                                           ${NANOARROW_IPC_EXTRA_FLAGS})
+      else()
+        target_compile_definitions(nanoarrow_ipc_${name}_test
+                                   PRIVATE ${NANOARROW_IPC_EXTRA_FLAGS})
       endif()
 
       gtest_discover_tests(nanoarrow_ipc_${name}_test)
@@ -603,7 +621,6 @@ if(NANOARROW_BUILD_TESTS)
 
     target_link_libraries(nanoarrow_ipc_files_test nanoarrow_testing ZLIB::ZLIB
                           nanoarrow_coverage_config)
-    target_link_libraries(nanoarrow_ipc_decoder_test gmock_main)
   endif()
 
   if(NANOARROW_DEVICE)
diff --git a/ci/scripts/build-arrow-cpp-minimal.sh 
b/ci/scripts/build-arrow-cpp-minimal.sh
index ff0238be..d31acac2 100755
--- a/ci/scripts/build-arrow-cpp-minimal.sh
+++ b/ci/scripts/build-arrow-cpp-minimal.sh
@@ -53,6 +53,7 @@ cmake ../apache-arrow-${ARROW_CPP_VERSION}/cpp \
   -DARROW_JEMALLOC=OFF \
   -DARROW_SIMD_LEVEL=NONE \
   -DARROW_FILESYSTEM=OFF \
+  -DARROW_WITH_ZSTD=ON \
   -DCMAKE_INSTALL_PREFIX="${ARROW_CPP_INSTALL_DIR}"
 cmake --build . --parallel $(nproc)
 cmake --install . --prefix="${ARROW_CPP_INSTALL_DIR}" --config=Debug
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index 746fcd37..0136422e 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -209,6 +209,7 @@ def bundle_nanoarrow_ipc(
     nanoarrow_ipc_c = concatenate_content(
         [
             src_dir / "ipc" / "flatcc_generated.h",
+            src_dir / "ipc" / "codecs.c",
             src_dir / "ipc" / "decoder.c",
             src_dir / "ipc" / "encoder.c",
             src_dir / "ipc" / "reader.c",
diff --git a/meson.build b/meson.build
index 53dd824d..f61c4008 100644
--- a/meson.build
+++ b/meson.build
@@ -109,6 +109,14 @@ nanoarrow_dep = declare_dependency(
 
 if get_option('ipc')
     flatcc_dep = dependency('flatcc')
+    ipc_lib_deps = [nanoarrow_dep, flatcc_dep]
+    ipc_lib_c_args = []
+
+    if get_option('ipc_with_zstd')
+        zstd_dep = dependency('libzstd')
+        ipc_lib_deps += zstd_dep
+        ipc_lib_c_args += '-DNANOARROW_IPC_WITH_ZSTD'
+    endif
 
     install_headers(
         'src/nanoarrow/nanoarrow_ipc.h',
@@ -118,12 +126,14 @@ if get_option('ipc')
 
     nanoarrow_ipc_lib = library(
         'nanoarrow_ipc',
+        'src/nanoarrow/ipc/codecs.c',
         'src/nanoarrow/ipc/decoder.c',
         'src/nanoarrow/ipc/encoder.c',
         'src/nanoarrow/ipc/reader.c',
         'src/nanoarrow/ipc/writer.c',
-        dependencies: [nanoarrow_dep, flatcc_dep],
+        dependencies: ipc_lib_deps,
         install: true,
+        c_args: ipc_lib_c_args,
     )
     nanoarrow_ipc_dep = declare_dependency(
         include_directories: [incdir],
diff --git a/meson.options b/meson.options
index 93fec950..739088ac 100644
--- a/meson.options
+++ b/meson.options
@@ -20,6 +20,7 @@ option('tests_with_arrow', type: 'boolean', description: 
'Build tests with Arrow
 option('benchmarks', type: 'boolean', description: 'Build benchmarks', value: 
false)
 option('apps', type: 'boolean', description: 'Build utility applications', 
value: false)
 option('ipc', type: 'boolean', description: 'Build IPC libraries', value: 
false)
+option('ipc_with_zstd', type: 'boolean', description: 'Build IPC libraries 
with ZSTD compression support', value: false)
 option('integration_tests', type: 'boolean',
        description: 'Build cross-implementation Arrow integration tests',
        value: false)
diff --git a/python/meson.build b/python/meson.build
index 49fa67e4..d6e456ea 100644
--- a/python/meson.build
+++ b/python/meson.build
@@ -26,11 +26,14 @@ project(
         'warning_level=2',
         'c_std=c99',
         'default_library=static',
+        'force_fallback_for=zstd',
         # We need to set these options at the project default_option level
         # due to https://github.com/mesonbuild/meson/issues/6728
         'arrow-nanoarrow:ipc=true',
+        'arrow-nanoarrow:ipc_with_zstd=true',
         'arrow-nanoarrow:device=true',
         'arrow-nanoarrow:namespace=PythonPkg',
+        'zstd:bin_programs=false',
     ],
 )
 
diff --git a/python/src/nanoarrow/meson.build b/python/src/nanoarrow/meson.build
index 9b8c223b..1c38863c 100644
--- a/python/src/nanoarrow/meson.build
+++ b/python/src/nanoarrow/meson.build
@@ -16,6 +16,7 @@
 # under the License.
 
 flatcc_dep = dependency('flatcc')
+zstd_dep = dependency('libzstd')
 nanoarrow_proj = subproject('arrow-nanoarrow')
 nanoarrow_dep = nanoarrow_proj.get_variable('nanoarrow_dep')
 nanoarrow_ipc_dep = nanoarrow_proj.get_variable('nanoarrow_ipc_dep')
@@ -77,7 +78,7 @@ foreach cyf : cyfiles
     if stem in ['_array', '_device']
         cyfile_deps += [nanoarrow_device_dep]
     elif stem == '_ipc_lib'
-        cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep]
+        cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep, zstd_dep]
     endif
 
     py.extension_module(
diff --git a/python/meson.build b/python/subprojects/zstd.wrap
similarity index 53%
copy from python/meson.build
copy to python/subprojects/zstd.wrap
index 49fa67e4..330530c3 100644
--- a/python/meson.build
+++ b/python/subprojects/zstd.wrap
@@ -15,25 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-project(
-    'nanoarrow',
-    'c',
-    'cython',
-    version: run_command(['src/nanoarrow/_version.py', '--print'], check: 
true).stdout().strip(),
-    license: 'Apache-2.0',
-    meson_version: '>=1.2.0',
-    default_options: [
-        'warning_level=2',
-        'c_std=c99',
-        'default_library=static',
-        # We need to set these options at the project default_option level
-        # due to https://github.com/mesonbuild/meson/issues/6728
-        'arrow-nanoarrow:ipc=true',
-        'arrow-nanoarrow:device=true',
-        'arrow-nanoarrow:namespace=PythonPkg',
-    ],
-)
+[wrap-file]
+directory = zstd-1.5.6
+source_url = 
https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz
+source_filename = zstd-1.5.6.tar.gz
+source_hash = 8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1
+patch_filename = zstd_1.5.6-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/zstd_1.5.6-2/get_patch
+patch_hash = 3e67f7d2edf3c56e6450d4c0f5f3d5fe94799e3608e3795502da03f7dd51b28c
+source_fallback_url = 
https://github.com/mesonbuild/wrapdb/releases/download/zstd_1.5.6-2/zstd-1.5.6.tar.gz
+wrapdb_version = 1.5.6-2
 
-subdir('src/nanoarrow')
-
-meson.add_dist_script('python', meson.current_source_dir() / 
'generate_dist.py')
+[provide]
+libzstd = libzstd_dep
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index 73ca88eb..8b691637 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -24,7 +24,7 @@ import pytest
 from nanoarrow._utils import NanoarrowException
 
 import nanoarrow as na
-from nanoarrow.ipc import InputStream, StreamWriter
+from nanoarrow.ipc import _EXAMPLE_IPC_SCHEMA, InputStream, StreamWriter
 
 
 def test_ipc_stream_example():
@@ -92,6 +92,18 @@ def test_ipc_stream_from_url():
                 assert len(batches[0]) == 3
 
 
+def test_ipc_stream_compressed_example():
+    buf = io.BytesIO()
+    buf.write(_EXAMPLE_IPC_SCHEMA)
+    buf.write(COMPRESSED_BATCH)
+    buf.seek(0)
+
+    with InputStream.from_readable(buf) as inp:
+        array = na.Array(inp)
+        assert len(array) == 3
+        assert array.child(0).to_pylist() == [0, 1, 2]
+
+
 def test_ipc_stream_python_exception_on_read():
     class ExtraordinarilyInconvenientFile:
         def readinto(self, *args, **kwargs):
@@ -231,3 +243,24 @@ def test_writer_error_on_write():
     with pytest.raises(NanoarrowException):
         with StreamWriter.from_writable(io.BytesIO()) as writer:
             writer.write_stream(na.c_array([], na.int32()))
+
+
+# fmt: off
+COMPRESSED_BATCH = bytearray([
+    0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00, 
0x20, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 
0x1e, 0x00,
+    0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00, 
0x50, 0x00,
+    0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 
0x08, 0x00,
+    0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 
0x03, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x28, 0xb5, 0x2f, 0xfd, 
0x20, 0x0c,
+    0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00
+])
+# fmt: on
diff --git a/src/nanoarrow/ipc/codecs.c b/src/nanoarrow/ipc/codecs.c
new file mode 100644
index 00000000..552674ce
--- /dev/null
+++ b/src/nanoarrow/ipc/codecs.c
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <inttypes.h>
+
+#include "nanoarrow/nanoarrow_ipc.h"
+
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+#include <zstd.h>
+
+static ArrowErrorCode ArrowIpcDecompressZstd(struct ArrowBufferView src, 
uint8_t* dst,
+                                             int64_t dst_size, struct 
ArrowError* error) {
+  size_t code =
+      ZSTD_decompress((void*)dst, (size_t)dst_size, src.data.data, 
src.size_bytes);
+  if (ZSTD_isError(code)) {
+    ArrowErrorSet(error,
+                  "ZSTD_decompress([buffer with %" PRId64
+                  " bytes] -> [buffer with %" PRId64 " bytes]) failed with 
error '%s'",
+                  src.size_bytes, dst_size, ZSTD_getErrorName(code));
+    return EIO;
+  }
+
+  if (dst_size != (int64_t)code) {
+    ArrowErrorSet(error,
+                  "Expected decompressed size of %" PRId64 " bytes but got %" 
PRId64
+                  " bytes",
+                  dst_size, (int64_t)code);
+    return EIO;
+  }
+
+  return NANOARROW_OK;
+}
+#endif
+
+ArrowIpcDecompressFunction ArrowIpcGetZstdDecompressionFunction(void) {
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+  return &ArrowIpcDecompressZstd;
+#else
+  return NULL;
+#endif
+}
+
+struct ArrowIpcSerialDecompressorPrivate {
+  ArrowIpcDecompressFunction decompress_functions[3];
+};
+
+static ArrowErrorCode ArrowIpcSerialDecompressorAdd(
+    struct ArrowIpcDecompressor* decompressor,
+    enum ArrowIpcCompressionType compression_type, struct ArrowBufferView src,
+    uint8_t* dst, int64_t dst_size, struct ArrowError* error) {
+  struct ArrowIpcSerialDecompressorPrivate* private_data =
+      (struct ArrowIpcSerialDecompressorPrivate*)decompressor->private_data;
+
+  ArrowIpcDecompressFunction fn = NULL;
+  switch (compression_type) {
+    case NANOARROW_IPC_COMPRESSION_TYPE_ZSTD:
+    case NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME:
+      fn = private_data->decompress_functions[compression_type];
+      break;
+    default:
+      ArrowErrorSet(error, "Unknown decompression type with value %d",
+                    (int)compression_type);
+      return EINVAL;
+  }
+
+  if (fn == NULL) {
+    ArrowErrorSet(
+        error, "Compression type with value %d not supported by this build of 
nanoarrow",
+        (int)compression_type);
+    return ENOTSUP;
+  }
+
+  NANOARROW_RETURN_NOT_OK(fn(src, dst, dst_size, error));
+  return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcSerialDecompressorWait(
+    struct ArrowIpcDecompressor* decompressor, int64_t timeout_ms,
+    struct ArrowError* error) {
+  NANOARROW_UNUSED(decompressor);
+  NANOARROW_UNUSED(timeout_ms);
+  NANOARROW_UNUSED(error);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcSerialDecompressorRelease(struct ArrowIpcDecompressor* 
decompressor) {
+  ArrowFree(decompressor->private_data);
+  decompressor->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcSerialDecompressor(struct ArrowIpcDecompressor* 
decompressor) {
+  decompressor->decompress_add = &ArrowIpcSerialDecompressorAdd;
+  decompressor->decompress_wait = &ArrowIpcSerialDecompressorWait;
+  decompressor->release = &ArrowIpcSerialDecompressorRelease;
+  decompressor->private_data =
+      ArrowMalloc(sizeof(struct ArrowIpcSerialDecompressorPrivate));
+  if (decompressor->private_data == NULL) {
+    return ENOMEM;
+  }
+
+  memset(decompressor->private_data, 0, sizeof(struct 
ArrowIpcSerialDecompressorPrivate));
+  ArrowIpcSerialDecompressorSetFunction(decompressor, 
NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+                                        
ArrowIpcGetZstdDecompressionFunction());
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcSerialDecompressorSetFunction(
+    struct ArrowIpcDecompressor* decompressor,
+    enum ArrowIpcCompressionType compression_type,
+    ArrowIpcDecompressFunction decompress_function) {
+  struct ArrowIpcSerialDecompressorPrivate* private_data =
+      (struct ArrowIpcSerialDecompressorPrivate*)decompressor->private_data;
+
+  switch (compression_type) {
+    case NANOARROW_IPC_COMPRESSION_TYPE_ZSTD:
+    case NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME:
+      break;
+    default:
+      return EINVAL;
+  }
+
+  private_data->decompress_functions[compression_type] = decompress_function;
+  return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/codecs_test.cc b/src/nanoarrow/ipc/codecs_test.cc
new file mode 100644
index 00000000..3c19f502
--- /dev/null
+++ b/src/nanoarrow/ipc/codecs_test.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstring>
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+// ZSTD compressed little endian int32s [0, 1, 2]
+const uint8_t kZstdCompressed012[] = {0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x0c, 0x61,
+                                      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
+                                      0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 
0x00};
+const uint8_t kZstdUncompressed012[] = {0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
+                                        0x00, 0x00, 0x02, 0x00, 0x00, 0x00};
+
+TEST(NanoarrowIpcTest, NanoarrowIpcZstdBuildMatchesRuntime) {
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+  ASSERT_NE(ArrowIpcGetZstdDecompressionFunction(), nullptr);
+#else
+  ASSERT_EQ(ArrowIpcGetZstdDecompressionFunction(), nullptr);
+#endif
+}
+
+TEST(NanoarrowIpcTest, ZstdDecodeValidInput) {
+  auto decompress = ArrowIpcGetZstdDecompressionFunction();
+  if (!decompress) {
+    GTEST_SKIP() << "nanoarrow_ipc not built with NANOARROW_IPC_WITH_ZSTD";
+  }
+
+  // Empty->empty seems to work
+  struct ArrowError error {};
+  EXPECT_EQ(decompress({{nullptr}, 0}, nullptr, 0, &error), NANOARROW_OK);
+
+  // Check a decompress of a valid compressed buffer
+  uint8_t out[16];
+  std::memset(out, 0, sizeof(out));
+  ASSERT_EQ(decompress({{&kZstdCompressed012}, sizeof(kZstdCompressed012)}, 
out,
+                       sizeof(kZstdUncompressed012), &error),
+            NANOARROW_OK)
+      << error.message;
+  EXPECT_TRUE(std::memcmp(out, kZstdUncompressed012, 
sizeof(kZstdUncompressed012)) == 0);
+
+  ASSERT_EQ(decompress({{kZstdCompressed012}, sizeof(kZstdCompressed012)}, out,
+                       sizeof(kZstdUncompressed012) + 1, &error),
+            EIO);
+  EXPECT_STREQ(error.message, "Expected decompressed size of 13 bytes but got 
12 bytes");
+}
+
+TEST(NanoarrowIpcTest, ZstdDecodeInvalidInput) {
+  auto decompress = ArrowIpcGetZstdDecompressionFunction();
+  if (!decompress) {
+    GTEST_SKIP() << "nanoarrow_ipc not built with NANOARROW_IPC_WITH_ZSTD";
+  }
+
+  struct ArrowError error {};
+  const char* bad_data = "abcde";
+  EXPECT_EQ(decompress({{bad_data}, 5}, nullptr, 0, &error), EIO);
+  EXPECT_THAT(error.message,
+              ::testing::StartsWith("ZSTD_decompress([buffer with 5 bytes] -> 
[buffer "
+                                    "with 0 bytes]) failed with error"));
+}
+
+TEST(NanoarrowIpcTest, SerialDecompressor) {
+  struct ArrowError error {};
+  nanoarrow::ipc::UniqueDecompressor decompressor;
+
+  ASSERT_EQ(ArrowIpcSerialDecompressor(decompressor.get()), NANOARROW_OK);
+
+  // Check the function setter error
+  ASSERT_EQ(ArrowIpcSerialDecompressorSetFunction(
+                decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_NONE, 
nullptr),
+            EINVAL);
+
+  // The serial decompressor never waits and always succeeds when requested to
+  EXPECT_EQ(decompressor->decompress_wait(decompressor.get(), 0, &error), 
NANOARROW_OK);
+
+  // Check a decompress for a supported codec if we have one (or for an error 
if we don't)
+  uint8_t out[12];
+  std::memset(out, 0, sizeof(out));
+  if (ArrowIpcGetZstdDecompressionFunction() != nullptr) {
+    EXPECT_EQ(decompressor->decompress_add(
+                  decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+                  {{&kZstdCompressed012}, sizeof(kZstdCompressed012)}, out, 
sizeof(out),
+                  &error),
+              NANOARROW_OK);
+  } else {
+    EXPECT_EQ(decompressor->decompress_add(decompressor.get(),
+                                           NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+                                           {{nullptr}, 0}, nullptr, 0, &error),
+              ENOTSUP);
+    EXPECT_STREQ(
+        error.message,
+        "Compression type with value 2 not supported by this build of 
nanoarrow");
+  }
+
+  // Either way, if we explicitly remove support for a codec, we should get an 
error
+  ASSERT_EQ(ArrowIpcSerialDecompressorSetFunction(
+                decompressor.get(), NANOARROW_IPC_COMPRESSION_TYPE_ZSTD, 
nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(decompressor->decompress_add(decompressor.get(),
+                                         NANOARROW_IPC_COMPRESSION_TYPE_ZSTD,
+                                         {{nullptr}, 0}, nullptr, 0, &error),
+            ENOTSUP);
+  EXPECT_STREQ(error.message,
+               "Compression type with value 2 not supported by this build of 
nanoarrow");
+}
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index b74b4e0e..606bea81 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -97,6 +97,8 @@ struct ArrowIpcDecoderPrivate {
   const void* last_message;
   // Storage for a Footer
   struct ArrowIpcFooter footer;
+  // Decompressor for compression support
+  struct ArrowIpcDecompressor decompressor;
 };
 
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
@@ -243,6 +245,28 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* 
decoder) {
   return NANOARROW_OK;
 }
 
+static ArrowErrorCode ArrowIpcDecoderInitDecompressor(
+    struct ArrowIpcDecoderPrivate* private_data) {
+  if (private_data->decompressor.release == NULL) {
+    
NANOARROW_RETURN_NOT_OK(ArrowIpcSerialDecompressor(&private_data->decompressor));
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderSetDecompressor(struct ArrowIpcDecoder* decoder,
+                                              struct ArrowIpcDecompressor* 
decompressor) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  if (private_data->decompressor.release != NULL) {
+    private_data->decompressor.release(&private_data->decompressor);
+  }
+
+  memcpy(&private_data->decompressor, decompressor, sizeof(struct 
ArrowIpcDecompressor));
+  return NANOARROW_OK;
+}
+
 void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -263,6 +287,10 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) 
{
 
     ArrowIpcFooterReset(&private_data->footer);
 
+    if (private_data->decompressor.release != NULL) {
+      private_data->decompressor.release(&private_data->decompressor);
+    }
+
     ArrowFree(private_data);
     memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
   }
@@ -1450,6 +1478,10 @@ struct ArrowIpcBufferFactory {
                                 struct ArrowBufferView* dst_view, struct 
ArrowBuffer* dst,
                                 struct ArrowError* error);
 
+  /// \brief Caller provided decompressor instance to which any decompression 
requests
+  /// should be made.
+  struct ArrowIpcDecompressor* decompressor;
+
   /// \brief Caller-defined private data to be used in the callback.
   ///
   /// Usually this would be a description of where the body has been read into 
memory or
@@ -1457,18 +1489,81 @@ struct ArrowIpcBufferFactory {
   void* private_data;
 };
 
+static ArrowErrorCode ArrowIpcDecompressBufferFromView(
+    struct ArrowIpcDecompressor* decompressor,
+    enum ArrowIpcCompressionType compression_type, struct ArrowBufferView src,
+    struct ArrowBuffer* dst, int* needs_decompression, struct ArrowError* 
error) {
+  if (src.size_bytes < (int64_t)sizeof(int64_t)) {
+    ArrowErrorSet(
+        error,
+        "Buffer size must be >= sizeof(int64_t) when buffer compression is 
enabled");
+    return EINVAL;
+  }
+
+  // When body compression is enabled, buffers are prefixed with a little 
endian
+  // signed 64-bit integer that is the uncompressed body length.
+  int64_t uncompressed_size;
+  memcpy(&uncompressed_size, src.data.data, sizeof(int64_t));
+  if (ArrowIpcSystemEndianness() != NANOARROW_IPC_ENDIANNESS_LITTLE) {
+    uncompressed_size = (int64_t)bswap64(uncompressed_size);
+  }
+
+  // Sentinel for "body compression was enabled but this buffer is not 
compressed" is -1
+  if (uncompressed_size == -1) {
+    *needs_decompression = 0;
+    return NANOARROW_OK;
+  }
+
+  if (uncompressed_size < 0) {
+    ArrowErrorSet(error,
+                  "Decompressed buffer size must be -1 or >= 0 bytes but was 
%" PRId64,
+                  uncompressed_size);
+    return EINVAL;
+  }
+
+  // Prepare the source and destination
+  src.data.as_uint8 += sizeof(int64_t);
+  src.size_bytes -= sizeof(int64_t);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferResize(dst, uncompressed_size, 0));
+
+  // Add the task to the decompressor (this may execute synchronously for some
+  // decompressors)
+  NANOARROW_RETURN_NOT_OK(decompressor->decompress_add(
+      decompressor, compression_type, src, dst->data, uncompressed_size, 
error));
+
+  // Pass on that we handled the decompression
+  *needs_decompression = 1;
+  return NANOARROW_OK;
+}
+
 static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* 
factory,
                                                  struct ArrowIpcBufferSource* 
src,
                                                  struct ArrowBufferView* 
dst_view,
                                                  struct ArrowBuffer* dst,
                                                  struct ArrowError* error) {
-  NANOARROW_UNUSED(factory);
-  NANOARROW_UNUSED(dst);
-  NANOARROW_UNUSED(error);
-
   struct ArrowBufferView* body = (struct 
ArrowBufferView*)factory->private_data;
-  dst_view->data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
-  dst_view->size_bytes = src->buffer_length_bytes;
+
+  struct ArrowBufferView src_view;
+  src_view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+  src_view.size_bytes = src->buffer_length_bytes;
+
+  int needs_decompression = 0;
+  int uncompressed_data_offset = 0;
+  if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView(
+        factory->decompressor, src->codec, src_view, dst, 
&needs_decompression, error));
+    uncompressed_data_offset += sizeof(int64_t);
+  }
+
+  if (!needs_decompression) {
+    *dst_view = src_view;
+    dst_view->data.as_uint8 += uncompressed_data_offset;
+    dst_view->size_bytes -= uncompressed_data_offset;
+  } else {
+    dst_view->data.data = dst->data;
+    dst_view->size_bytes = dst->size_bytes;
+  }
+
   return NANOARROW_OK;
 }
 
@@ -1476,6 +1571,7 @@ static struct ArrowIpcBufferFactory 
ArrowIpcBufferFactoryFromView(
     struct ArrowBufferView* buffer_view) {
   struct ArrowIpcBufferFactory out;
   out.make_buffer = &ArrowIpcMakeBufferFromView;
+  out.decompressor = NULL;
   out.private_data = buffer_view;
   return out;
 }
@@ -1485,14 +1581,27 @@ static ArrowErrorCode 
ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory*
                                                    struct ArrowBufferView* 
dst_view,
                                                    struct ArrowBuffer* dst,
                                                    struct ArrowError* error) {
-  NANOARROW_UNUSED(error);
-
   struct ArrowIpcSharedBuffer* shared =
       (struct ArrowIpcSharedBuffer*)factory->private_data;
-  ArrowBufferReset(dst);
-  ArrowIpcSharedBufferClone(shared, dst);
-  dst->data += src->body_offset_bytes;
-  dst->size_bytes = src->buffer_length_bytes;
+
+  int needs_decompression = 0;
+  int uncompressed_data_offset = 0;
+  if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    struct ArrowBufferView src_view;
+    src_view.data.as_uint8 = shared->private_src.data + src->body_offset_bytes;
+    src_view.size_bytes = src->buffer_length_bytes;
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView(
+        factory->decompressor, src->codec, src_view, dst, 
&needs_decompression, error));
+    uncompressed_data_offset += sizeof(int64_t);
+  }
+
+  if (!needs_decompression) {
+    ArrowBufferReset(dst);
+    ArrowIpcSharedBufferClone(shared, dst);
+    dst->data += src->body_offset_bytes + uncompressed_data_offset;
+    dst->size_bytes = src->buffer_length_bytes - uncompressed_data_offset;
+  }
+
   dst_view->data.data = dst->data;
   dst_view->size_bytes = dst->size_bytes;
   return NANOARROW_OK;
@@ -1502,6 +1611,7 @@ static struct ArrowIpcBufferFactory 
ArrowIpcBufferFactoryFromShared(
     struct ArrowIpcSharedBuffer* shared) {
   struct ArrowIpcBufferFactory out;
   out.make_buffer = &ArrowIpcMakeBufferFromShared;
+  out.decompressor = NULL;
   out.private_data = shared;
   return out;
 }
@@ -1663,14 +1773,6 @@ static int ArrowIpcDecoderMakeBuffer(struct 
ArrowIpcArraySetter* setter, int64_t
     return EINVAL;
   }
 
-  // If the ArrowIpcBufferFactory is made public, these should get moved 
(since then a
-  // user could inject support for either one). More likely, by the time that 
happens,
-  // this library will be able to support some of these features.
-  if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
-    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
-    return ENOTSUP;
-  }
-
   setter->src.body_offset_bytes = offset;
   setter->src.buffer_length_bytes = length;
   NANOARROW_RETURN_NOT_OK(
@@ -1856,6 +1958,13 @@ static ArrowErrorCode 
ArrowIpcDecoderDecodeArrayViewInternal(
   setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
   setter.version = decoder->metadata_version;
 
+  // If we are going to need a decompressor here, ensure the default one is
+  // initialized.
+  if (setter.src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderInitDecompressor(private_data));
+    setter.factory.decompressor = &private_data->decompressor;
+  }
+
   // The flatbuffers FieldNode doesn't count the root struct so we have to 
loop over the
   // children ourselves
   if (field_i == -1) {
@@ -1873,6 +1982,13 @@ static ArrowErrorCode 
ArrowIpcDecoderDecodeArrayViewInternal(
         ArrowIpcDecoderWalkSetArrayView(&setter, root->array_view, 
root->array, error));
   }
 
+  // If we decoded a compressed message, wait for any pending decompression 
tasks to
+  // complete. The default compressor already performed the decompression
+  if (setter.factory.decompressor != NULL) {
+    NANOARROW_RETURN_NOT_OK(setter.factory.decompressor->decompress_wait(
+        setter.factory.decompressor, -1, error));
+  }
+
   *out_view = root->array_view;
   return NANOARROW_OK;
 }
diff --git a/src/nanoarrow/ipc/decoder_test.cc 
b/src/nanoarrow/ipc/decoder_test.cc
index f59e52c5..b3856656 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -24,6 +24,7 @@
 #include <arrow/util/key_value_metadata.h>
 #endif
 #include <gmock/gmock-matchers.h>
+#include <gtest/gtest-spi.h>
 #include <gtest/gtest.h>
 
 // For bswap32()
@@ -53,6 +54,7 @@ struct ArrowIpcDecoderPrivate {
   int64_t n_buffers;
   const void* last_message;
   struct ArrowIpcFooter footer;
+  struct ArrowIpcDecompressor decompressor;
 };
 }
 
@@ -105,6 +107,40 @@ static uint8_t kSimpleRecordBatch[] = {
     0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 
0x03, 0x00,
     0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
 
+static uint8_t kSimpleRecordBatchCompressed[] = {
+    0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00, 
0x20, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 
0x1e, 0x00,
+    0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00, 
0x50, 0x00,
+    0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 
0x08, 0x00,
+    0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 
0x03, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x28, 0xb5, 0x2f, 0xfd, 
0x20, 0x0c,
+    0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00};
+
+static uint8_t kSimpleRecordBatchUncompressible[] = {
+    0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00, 
0x20, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 
0x1e, 0x00,
+    0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00, 
0x50, 0x00,
+    0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 
0x08, 0x00,
+    0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 
0x03, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 
0x01, 0x00,
+    0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00};
+
 TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
   struct ArrowIpcDecoder decoder;
   struct ArrowError error;
@@ -318,7 +354,66 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+void TestDecodeInt32Batch(const uint8_t* batch, size_t batch_len,
+                          const std::vector<int32_t> values) {
+  nanoarrow::ipc::UniqueDecoder decoder;
+  nanoarrow::UniqueSchema schema;
+
+  struct ArrowError error;
+  struct ArrowArrayView* array_view;
+
+  ArrowSchemaInit(schema.get());
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(schema.get(), 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = batch;
+  data.size_bytes = static_cast<int64_t>(batch_len);
+
+  ASSERT_EQ(ArrowIpcDecoderInit(decoder.get()), NANOARROW_OK);
+  ASSERT_EQ(ArrowIpcDecoderSetSchema(decoder.get(), schema.get(), &error), 
NANOARROW_OK);
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(), data, &error), 
NANOARROW_OK);
+  struct ArrowBufferView body;
+  body.data.as_uint8 = batch + decoder->header_size_bytes;
+  body.size_bytes = decoder->body_size_bytes;
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(decoder.get(), body, 0, 
&array_view, &error),
+            NANOARROW_OK)
+      << error.message;
+  ASSERT_EQ(array_view->length, values.size());
+  int64_t index = 0;
+  for (const auto value : values) {
+    SCOPED_TRACE(std::string("Array index ") + std::to_string(index));
+    EXPECT_EQ(ArrowArrayViewGetIntUnsafe(array_view, index), value);
+    index++;
+  }
+}
+
 TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  ASSERT_NO_FATAL_FAILURE(
+      TestDecodeInt32Batch(kSimpleRecordBatch, sizeof(kSimpleRecordBatch), {1, 
2, 3}));
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeCompressedRecordBatch) {
+  if (ArrowIpcGetZstdDecompressionFunction() == nullptr) {
+    EXPECT_FATAL_FAILURE(
+        TestDecodeInt32Batch(kSimpleRecordBatchCompressed,
+                             sizeof(kSimpleRecordBatchCompressed), {0, 1, 2}),
+        "Compression type with value 2 not supported by this build of 
nanoarrow");
+  } else {
+    ASSERT_NO_FATAL_FAILURE(TestDecodeInt32Batch(
+        kSimpleRecordBatchCompressed, sizeof(kSimpleRecordBatchCompressed), 
{0, 1, 2}));
+  }
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeUncompressibleRecordBatch) {
+  
ASSERT_NO_FATAL_FAILURE(TestDecodeInt32Batch(kSimpleRecordBatchUncompressible,
+                                               
sizeof(kSimpleRecordBatchUncompressible),
+                                               {0, 1, 2}));
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchErrors) {
   struct ArrowIpcDecoder decoder;
   struct ArrowError error;
   struct ArrowSchema schema;
@@ -387,14 +482,6 @@ TEST(NanoarrowIpcTest, 
NanoarrowIpcDecodeSimpleRecordBatch) {
 
   ArrowArrayRelease(&array);
 
-  // Field extract should fail if compression was set
-  decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
-  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
-                                       NANOARROW_VALIDATION_LEVEL_FULL, 
&error),
-            ENOTSUP);
-  EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support 
compression");
-  decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
-
   // Field extract should fail if body is too small
   decoder.body_size_bytes = 0;
   EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
@@ -508,7 +595,7 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowTypeRoundtrip) {
 #endif
 
 std::string ArrowSchemaMetadataToString(const char* metadata) {
-  struct ArrowMetadataReader reader {};
+  struct ArrowMetadataReader reader;
   auto st = ArrowMetadataReaderInit(&reader, metadata);
   EXPECT_EQ(st, NANOARROW_OK);
 
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index cb505ce2..5d39ec41 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -33,8 +33,6 @@
 #include "nanoarrow/nanoarrow_ipc.hpp"
 #include "nanoarrow/nanoarrow_testing.hpp"
 
-#include "flatcc/portable/pendian_detect.h"
-
 using namespace arrow;
 
 // Helpers for reporting Arrow C++ Result failures
@@ -85,7 +83,7 @@ class TestFile {
   }
 
   std::string CheckJSONGzFile() {
-    size_t dot_pos = path_.find('.');
+    size_t dot_pos = path_.rfind('.');
     return path_.substr(0, dot_pos) + std::string(".json.gz");
   }
 
@@ -401,12 +399,12 @@ ArrowErrorCode InitArrowTestingPath(std::ostream& 
builder, ArrowError* error) {
   return NANOARROW_OK;
 }
 
-class TestFileFixture : public ::testing::TestWithParam<TestFile> {
+class TestEndianFileFixture : public ::testing::TestWithParam<TestFile> {
  protected:
   TestFile test_file;
 };
 
-TEST_P(TestFileFixture, NanoarrowIpcTestFileNativeEndian) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileNativeEndian) {
   std::stringstream dir_builder;
   ArrowError error;
   ArrowErrorInit(&error);
@@ -423,7 +421,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileNativeEndian) {
   param.TestEqualsArrowCpp(dir_builder.str());
 }
 
-TEST_P(TestFileFixture, NanoarrowIpcTestFileSwapEndian) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileSwapEndian) {
   std::stringstream dir_builder;
   ArrowError error;
   ArrowErrorInit(&error);
@@ -440,7 +438,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileSwapEndian) {
   param.TestEqualsArrowCpp(dir_builder.str());
 }
 
-TEST_P(TestFileFixture, NanoarrowIpcTestFileCheckJSON) {
+TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileCheckJSON) {
   std::stringstream dir_builder;
   ArrowError error;
   ArrowErrorInit(&error);
@@ -455,7 +453,7 @@ TEST_P(TestFileFixture, NanoarrowIpcTestFileCheckJSON) {
 }
 
 INSTANTIATE_TEST_SUITE_P(
-    NanoarrowIpcTest, TestFileFixture,
+    NanoarrowIpcTest, TestEndianFileFixture,
     ::testing::Values(
         // Files in data/arrow-ipc-stream/integration/1.0.0-(little|big)endian/
         // should read without error and the data should match Arrow C++'s 
read.
@@ -495,4 +493,57 @@ INSTANTIATE_TEST_SUITE_P(
             "Schema message field with DictionaryEncoding not supported")
         // Comment to keep last line from wrapping
         ));
+
+// Files not related to endianness (i.e., only need testing once)
+class TestFileFixture : public ::testing::TestWithParam<TestFile> {
+ protected:
+  TestFile test_file;
+};
+
+TEST_P(TestFileFixture, NanoarrowIpcTestFileEqualsArrowCpp) {
+  std::stringstream dir_builder;
+  ArrowError error;
+  ArrowErrorInit(&error);
+  if (InitArrowTestingPath(dir_builder, &error) != NANOARROW_OK) {
+    GTEST_SKIP() << error.message;
+  }
+
+  dir_builder << "/data/arrow-ipc-stream/integration/";
+  TestFile param = GetParam();
+  param.TestEqualsArrowCpp(dir_builder.str());
+}
+
+TEST_P(TestFileFixture, NanoarrowIpcTestFileIPCCheckJSON) {
+  std::stringstream dir_builder;
+  ArrowError error;
+  ArrowErrorInit(&error);
+  if (InitArrowTestingPath(dir_builder, &error) != NANOARROW_OK) {
+    GTEST_SKIP() << error.message;
+  }
+
+  dir_builder << "/data/arrow-ipc-stream/integration/";
+  TestFile param = GetParam();
+  param.TestIPCCheckJSON(dir_builder.str());
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    NanoarrowIpcTest, TestFileFixture,
+    ::testing::Values(
+// Testing of other files
+#if defined(NANOARROW_IPC_WITH_ZSTD)
+        TestFile::OK("2.0.0-compression/generated_uncompressible_zstd.stream"),
+        TestFile::OK("2.0.0-compression/generated_zstd.stream"),
+#endif
+        TestFile::OK("0.17.1/generated_union.stream"),
+        TestFile::OK("0.14.1/generated_datetime.stream"),
+        TestFile::OK("0.14.1/generated_decimal.stream"),
+        TestFile::OK("0.14.1/generated_interval.stream"),
+        TestFile::OK("0.14.1/generated_map.stream"),
+        TestFile::OK("0.14.1/generated_nested.stream"),
+        TestFile::OK("0.14.1/generated_primitive.stream"),
+        TestFile::OK("0.14.1/generated_primitive_no_batches.stream"),
+        TestFile::OK("0.14.1/generated_primitive_zerolength.stream")
+        // Comment to keep line from wrapping
+        ));
+
 #endif
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index fcf5ef88..4cb4c05e 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -29,8 +29,16 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
 #define ArrowIpcSharedBufferReset \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
+#define ArrowIpcGetZstdDecompressionFunction \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcGetZstdDecompressionFunction)
+#define ArrowIpcSerialDecompressor \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSerialDecompressor)
+#define ArrowIpcSerialDecompressorSetFunction \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSerialDecompressorSetFunction)
 #define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderInit)
 #define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderReset)
+#define ArrowIpcDecoderSetDecompressor \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetDecompressor)
 #define ArrowIpcDecoderPeekHeader \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderPeekHeader)
 #define ArrowIpcDecoderVerifyHeader \
@@ -194,6 +202,69 @@ void ArrowIpcSharedBufferReset(struct 
ArrowIpcSharedBuffer* shared);
 /// the resulting arrays must not be passed to other threads to be released.
 int ArrowIpcSharedBufferIsThreadSafe(void);
 
+/// \brief A user-extensible decompressor
+///
+/// The ArrowIpcDecompressor is the underlying object that enables 
decompression in the
+/// ArrowIpcDecoder. Its structure allows it to be backed by a multithreaded
+/// implementation; however, this is not required and the default 
implementation does not
+/// implement this. An implementation of a decompressor may support more than 
one
+/// ArrowIpcCompressionType.
+struct ArrowIpcDecompressor {
+  /// \brief Queue a buffer for decompression
+  ///
+  /// The values pointed to by dst and dst_size after a call to decompress_add
+  /// are undefined until the next call to decompress_wait returns 
NANOARROW_OK.
+  ArrowErrorCode (*decompress_add)(struct ArrowIpcDecompressor* decompressor,
+                                   enum ArrowIpcCompressionType 
compression_type,
+                                   struct ArrowBufferView src, uint8_t* dst,
+                                   int64_t dst_size, struct ArrowError* error);
+
+  /// \brief Wait for any unfinished calls to decompress_add to complete
+  ///
+  /// Returns NANOARROW_OK if all pending calls completed. Returns ETIMEOUT
+  /// if not all remaining calls completed.
+  ArrowErrorCode (*decompress_wait)(struct ArrowIpcDecompressor* decompressor,
+                                    int64_t timeout_ms, struct ArrowError* 
error);
+
+  /// \brief Release the decompressor and any resources it may be holding
+  ///
+  /// Release callback implementations must set the release member to NULL.
+  /// Callers must check that the release callback is not NULL before calling
+  /// decompress() or release().
+  void (*release)(struct ArrowIpcDecompressor* decompressor);
+
+  /// \brief Implementation-specific opaque data
+  void* private_data;
+};
+
+/// \brief A self-contained decompression function
+///
+/// For the most common compression type, ZSTD, this function is sufficient to
+/// capture the type of decompression that Arrow IPC requires (i.e., 
decompression
+/// where the uncompressed size was recorded). For other compression types, it
+/// may be more efficient to implement a full ArrowIpcDecompressor, which 
allows
+/// for persistent state/allocations between decodes.
+typedef ArrowErrorCode (*ArrowIpcDecompressFunction)(struct ArrowBufferView 
src,
+                                                     uint8_t* dst, int64_t 
dst_size,
+                                                     struct ArrowError* error);
+
+/// \brief Get the decompression function for ZSTD
+///
+/// The result will be NULL if nanoarrow was not built with 
NANOARROW_IPC_WITH_ZSTD.
+ArrowIpcDecompressFunction ArrowIpcGetZstdDecompressionFunction(void);
+
+/// \brief An ArrowIpcDecompressor implementation that performs decompression 
in serial
+ArrowErrorCode ArrowIpcSerialDecompressor(struct ArrowIpcDecompressor* 
decompressor);
+
+/// \brief Override the ArrowIpcDecompressFunction used for a specific 
compression type
+///
+/// This may be used to inject support for a particular type of decompression 
if used
+/// with a version of nanoarrow with unknown or minimal capabilities.
+ArrowErrorCode ArrowIpcSerialDecompressorSetFunction(
+    struct ArrowIpcDecompressor* decompressor,
+    enum ArrowIpcCompressionType compression_type,
+    ArrowIpcDecompressFunction decompress_function);
+
 /// \brief Decoder for Arrow IPC messages
 ///
 /// This structure is intended to be allocated by the caller,
@@ -244,6 +315,10 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* 
decoder);
 /// \brief Release all resources attached to a decoder
 void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
 
+/// \brief Set the decompressor implementation used by this decoder
+ArrowErrorCode ArrowIpcDecoderSetDecompressor(struct ArrowIpcDecoder* decoder,
+                                              struct ArrowIpcDecompressor* 
decompressor);
+
 /// \brief Peek at a message header
 ///
 /// The first 8 bytes of an Arrow IPC message are 0xFFFFFFFF followed by the 
size
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index e1b90cf1..ce2c03dc 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -73,6 +73,25 @@ inline void release_pointer(struct ArrowIpcEncoder* data) {
   ArrowIpcEncoderReset(data);
 }
 
+template <>
+inline void init_pointer(struct ArrowIpcDecompressor* data) {
+  data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcDecompressor* src,
+                         struct ArrowIpcDecompressor* dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcDecompressor));
+  src->release = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcDecompressor* data) {
+  if (data->release != nullptr) {
+    data->release(data);
+  }
+}
+
 template <>
 inline void init_pointer(struct ArrowIpcInputStream* data) {
   data->release = nullptr;
@@ -150,6 +169,9 @@ using UniqueFooter = internal::Unique<struct 
ArrowIpcFooter>;
 /// \brief Class wrapping a unique struct ArrowIpcEncoder
 using UniqueEncoder = internal::Unique<struct ArrowIpcEncoder>;
 
+/// \brief Class wrapping a unique struct ArrowIpcDecompressor
+using UniqueDecompressor = internal::Unique<struct ArrowIpcDecompressor>;
+
 /// \brief Class wrapping a unique struct ArrowIpcInputStream
 using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;
 
diff --git a/python/meson.build b/subprojects/zstd.wrap
similarity index 53%
copy from python/meson.build
copy to subprojects/zstd.wrap
index 49fa67e4..330530c3 100644
--- a/python/meson.build
+++ b/subprojects/zstd.wrap
@@ -15,25 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-project(
-    'nanoarrow',
-    'c',
-    'cython',
-    version: run_command(['src/nanoarrow/_version.py', '--print'], check: 
true).stdout().strip(),
-    license: 'Apache-2.0',
-    meson_version: '>=1.2.0',
-    default_options: [
-        'warning_level=2',
-        'c_std=c99',
-        'default_library=static',
-        # We need to set these options at the project default_option level
-        # due to https://github.com/mesonbuild/meson/issues/6728
-        'arrow-nanoarrow:ipc=true',
-        'arrow-nanoarrow:device=true',
-        'arrow-nanoarrow:namespace=PythonPkg',
-    ],
-)
+[wrap-file]
+directory = zstd-1.5.6
+source_url = 
https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz
+source_filename = zstd-1.5.6.tar.gz
+source_hash = 8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1
+patch_filename = zstd_1.5.6-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/zstd_1.5.6-2/get_patch
+patch_hash = 3e67f7d2edf3c56e6450d4c0f5f3d5fe94799e3608e3795502da03f7dd51b28c
+source_fallback_url = 
https://github.com/mesonbuild/wrapdb/releases/download/zstd_1.5.6-2/zstd-1.5.6.tar.gz
+wrapdb_version = 1.5.6-2
 
-subdir('src/nanoarrow')
-
-meson.add_dist_script('python', meson.current_source_dir() / 
'generate_dist.py')
+[provide]
+libzstd = libzstd_dep

Reply via email to