This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 62cffa6 feat(extensions/nanoarrow_ipc): Add single-threaded stream
reader (#164)
62cffa6 is described below
commit 62cffa6a4e182235a82f3ea9f8715258b7b2ca56
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Mar 22 16:05:58 2023 -0400
feat(extensions/nanoarrow_ipc): Add single-threaded stream reader (#164)
Higher level runtimes may be able to use the `ArrowIpcDecoder` (or more
than one) and handle IO/parallelization using tools that are difficult
to provide from C; however, for testing we do need the ability to read
streams in their entirety. This PR provides a tool to do that based on
an arbitrary bytes input.
This reduces the overhead of coordinating the various steps required to
decode a stream to:
```c
struct ArrowIpcInputStream input;
ArrowIpcInputStreamInit<Buffer|File|Custom Implementation>(&input, ...);
struct ArrowArrayStream stream;
ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr);
struct ArrowSchema schema;
stream.get_schema(&stream, &schema);
struct ArrowArray array;
while (1) {
stream.get_next(&stream, &array);
if (array.release != NULL) {
array.release(&array);
} else {
break;
}
}
schema.release(&schema);
stream.release(&stream);
```
There is also a utility to read an entire stream from file or stdin:
```bash
$ ./dump_stream big.arrows
Read Schema <0.000122 seconds>
struct
state: string
geometry: geoarrow.polygon{list}
rings: list
vertices: geoarrow.point{fixed_size_list(2)}
xy: double
Read 23548499 rows in 1 batch(es) <1.623532 seconds>
```
---
.github/workflows/build-and-test-ipc.yaml | 11 +-
extensions/nanoarrow_ipc/CMakeLists.txt | 24 +-
extensions/nanoarrow_ipc/src/apps/dump_stream.c | 138 +++++++
.../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 62 +++
.../{nanoarrow_ipc.c => nanoarrow_ipc_decoder.c} | 20 +-
...w_ipc_test.cc => nanoarrow_ipc_decoder_test.cc} | 0
.../src/nanoarrow/nanoarrow_ipc_reader.c | 425 +++++++++++++++++++++
.../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 314 +++++++++++++++
8 files changed, 977 insertions(+), 17 deletions(-)
diff --git a/.github/workflows/build-and-test-ipc.yaml
b/.github/workflows/build-and-test-ipc.yaml
index 5c5b780..af2d543 100644
--- a/.github/workflows/build-and-test-ipc.yaml
+++ b/.github/workflows/build-and-test-ipc.yaml
@@ -41,7 +41,7 @@ jobs:
fail-fast: false
matrix:
config:
- - {label: default-build, cmake_args: ""}
+ - {label: default-build, cmake_args: "-DNANOARROW_IPC_BUILD_APPS=ON"}
- {label: namespaced-build, cmake_args:
"-DNANOARROW_NAMESPACE=SomeUserNamespace"}
- {label: bundled-build, cmake_args: "-DNANOARROW_IPC_BUNDLE=ON"}
@@ -101,6 +101,15 @@ jobs:
cd build
ctest -T test --output-on-failure .
+ - name: Test dump_stream
+ if: matrix.config.label == 'default-build'
+ run: |
+ $SUBDIR/build/dump_stream || true
+ $SUBDIR/build/dump_stream this_is_not_a_file || true
+ $SUBDIR/build/dump_stream examples/cmake-ipc/invalid.arrows || true
+ $SUBDIR/build/dump_stream examples/cmake-ipc/schema-valid.arrows
+ cat examples/cmake-ipc/schema-valid.arrows |
$SUBDIR/build/dump_stream -
+
- name: Run tests with valgrind
if: matrix.config.label == 'default-build'
run: |
diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt
b/extensions/nanoarrow_ipc/CMakeLists.txt
index 38517f7..8830b35 100644
--- a/extensions/nanoarrow_ipc/CMakeLists.txt
+++ b/extensions/nanoarrow_ipc/CMakeLists.txt
@@ -22,6 +22,7 @@ include(FetchContent)
project(nanoarrow_ipc)
option(NANOARROW_IPC_BUILD_TESTS "Build tests" OFF)
+option(NANOARROW_IPC_BUILD_APPS "Build utility applications" OFF)
option(NANOARROW_IPC_BUNDLE "Create bundled nanoarrow_ipc.h and
nanoarrow_ipc.c" OFF)
option(NANOARROW_IPC_FLATCC_ROOT_DIR "Root directory for flatcc include and
lib directories" OFF)
option(NANOARROW_IPC_FLATCC_INCLUDE_DIR "Include directory for flatcc
includes" OFF)
@@ -91,11 +92,13 @@ if (NANOARROW_IPC_BUNDLE)
file(READ src/nanoarrow/nanoarrow_ipc.h SRC_FILE_CONTENTS)
file(WRITE ${NANOARROW_IPC_H_TEMP} "${SRC_FILE_CONTENTS}")
- # combine flatcc-generated headers and nanoarrow_ipc.c
+ # combine flatcc-generated headers and nanoarrow_ipc sources
set(NANOARROW_IPC_C_TEMP
${CMAKE_BINARY_DIR}/amalgamation/nanoarrow/nanoarrow_ipc.c)
file(READ src/nanoarrow/nanoarrow_ipc_flatcc_generated.h SRC_FILE_CONTENTS)
file(WRITE ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
- file(READ src/nanoarrow/nanoarrow_ipc.c SRC_FILE_CONTENTS)
+ file(READ src/nanoarrow/nanoarrow_ipc_decoder.c SRC_FILE_CONTENTS)
+ file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
+ file(READ src/nanoarrow/nanoarrow_ipc_reader.c SRC_FILE_CONTENTS)
file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
# remove the include for the generated files in the bundled version
@@ -143,7 +146,9 @@ if (NANOARROW_IPC_BUNDLE)
install(DIRECTORY thirdparty/flatcc/include/flatcc DESTINATION ".")
else()
# This is a normal CMake build that builds + installs some includes and a
static lib
- add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc.c)
+ add_library(nanoarrow_ipc
+ src/nanoarrow/nanoarrow_ipc_decoder.c
+ src/nanoarrow/nanoarrow_ipc_reader.c)
target_link_libraries(nanoarrow_ipc PRIVATE flatccrt)
target_include_directories(nanoarrow_ipc PUBLIC
@@ -185,7 +190,8 @@ if (NANOARROW_IPC_BUILD_TESTS)
enable_testing()
- add_executable(nanoarrow_ipc_test src/nanoarrow/nanoarrow_ipc_test.cc)
+ add_executable(nanoarrow_ipc_decoder_test
src/nanoarrow/nanoarrow_ipc_decoder_test.cc)
+ add_executable(nanoarrow_ipc_reader_test
src/nanoarrow/nanoarrow_ipc_reader_test.cc)
if(NANOARROW_IPC_CODE_COVERAGE)
target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
@@ -193,8 +199,14 @@ if (NANOARROW_IPC_BUILD_TESTS)
target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
endif()
- target_link_libraries(nanoarrow_ipc_test nanoarrow_ipc nanoarrow
arrow_shared gtest_main)
+ target_link_libraries(nanoarrow_ipc_decoder_test nanoarrow_ipc nanoarrow
arrow_shared gtest_main)
+ target_link_libraries(nanoarrow_ipc_reader_test nanoarrow_ipc nanoarrow
gtest_main)
include(GoogleTest)
- gtest_discover_tests(nanoarrow_ipc_test)
+ gtest_discover_tests(nanoarrow_ipc_decoder_test)
+endif()
+
+if (NANOARROW_IPC_BUILD_APPS)
+ add_executable(dump_stream src/apps/dump_stream.c)
+ target_link_libraries(dump_stream nanoarrow_ipc nanoarrow)
endif()
diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c
b/extensions/nanoarrow_ipc/src/apps/dump_stream.c
new file mode 100644
index 0000000..1a6b323
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "nanoarrow/nanoarrow_ipc.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+
+void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf,
+ int buf_size) {
+ int n_chars = ArrowSchemaToString(schema, buf, buf_size, 0);
+
+ for (int i = 0; i < level; i++) {
+ fprintf(stdout, " ");
+ }
+
+ if (schema->name == NULL) {
+ fprintf(stdout, "%s\n", buf);
+ } else {
+ fprintf(stdout, "%s: %s\n", schema->name, buf);
+ }
+
+ for (int64_t i = 0; i < schema->n_children; i++) {
+ dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size);
+ }
+}
+
+int main(int argc, char* argv[]) {
+ // Parse arguments
+ if (argc != 2) {
+ fprintf(stderr, "Usage: dump_stream FILENAME (or - for stdin)\n");
+ return 1;
+ }
+
+ // Sort the input stream
+ FILE* file_ptr;
+ if (strcmp(argv[1], "-") == 0) {
+ file_ptr = freopen(NULL, "rb", stdin);
+ } else {
+ file_ptr = fopen(argv[1], "rb");
+ }
+
+ if (file_ptr == NULL) {
+ fprintf(stderr, "Failed to open input '%s'\n", argv[1]);
+ return 1;
+ }
+
+ struct ArrowIpcInputStream input;
+ int result = ArrowIpcInputStreamInitFile(&input, file_ptr, 0);
+ if (result != NANOARROW_OK) {
+ fprintf(stderr, "ArrowIpcInputStreamInitFile() failed\n");
+ return 1;
+ }
+
+ struct ArrowArrayStream stream;
+ result = ArrowIpcArrayStreamReaderInit(&stream, &input, NULL);
+ if (result != NANOARROW_OK) {
+ fprintf(stderr, "ArrowIpcArrayStreamReaderInit() failed\n");
+ return 1;
+ }
+
+ clock_t begin = clock();
+
+ struct ArrowSchema schema;
+ result = stream.get_schema(&stream, &schema);
+ if (result != NANOARROW_OK) {
+ const char* message = stream.get_last_error(&stream);
+ if (message == NULL) {
+ message = "";
+ }
+
+ fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n",
result, message);
+ stream.release(&stream);
+ return 1;
+ }
+
+ clock_t end = clock();
+ double elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
+ fprintf(stdout, "Read Schema <%.06f seconds>\n", elapsed);
+
+ char schema_tmp[8096];
+ memset(schema_tmp, 0, sizeof(schema_tmp));
+ dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp));
+ schema.release(&schema);
+
+ struct ArrowArray array;
+ array.release = NULL;
+
+ int64_t batch_count = 0;
+ int64_t row_count = 0;
+ begin = clock();
+
+ while (1) {
+ result = stream.get_next(&stream, &array);
+ if (result != NANOARROW_OK) {
+ const char* message = stream.get_last_error(&stream);
+ if (message == NULL) {
+ message = "";
+ }
+
+ fprintf(stderr, "stream.get_next() returned %d with error '%s'\n",
result, message);
+ stream.release(&stream);
+ return 1;
+ }
+
+ if (array.release != NULL) {
+ row_count += array.length;
+ batch_count++;
+ array.release(&array);
+ } else {
+ break;
+ }
+ }
+
+ end = clock();
+ elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
+ fprintf(stdout, "Read %ld rows in %ld batch(es) <%.06f seconds>\n",
(long)row_count,
+ (long)batch_count, elapsed);
+
+ stream.release(&stream);
+ fclose(file_ptr);
+ return 0;
+}
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 155150e..071b299 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -39,6 +39,14 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetEndianness \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness)
+#define ArrowIpcInputStreamInitBuffer \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer)
+#define ArrowIpcInputStreamInitFile \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile)
+#define ArrowIpcInputStreamMove \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
+#define ArrowIpcArrayStreamReaderInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
#endif
@@ -219,6 +227,60 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
struct ArrowArray* out,
struct ArrowError* error);
+/// \brief An user-extensible input data source
+struct ArrowIpcInputStream {
+ /// \brief Read up to buf_size_bytes from stream into buf
+ ///
+ /// The actual number of bytes read is placed in the value pointed to by
+ /// size_read_out. Returns NANOARROW_OK on success.
+ ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf,
+ int64_t buf_size_bytes, int64_t* size_read_out,
+ struct ArrowError* error);
+
+ /// \brief Release the stream and any resources it may be holding
+ ///
+ /// Release callback implementations must set the release member to NULL.
+ /// Callers must check that the release callback is not NULL before calling
+ /// read() or release().
+ void (*release)(struct ArrowIpcInputStream* stream);
+
+ /// \brief Private implementation-defined data
+ void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcInputStream
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+ struct ArrowIpcInputStream* dst);
+
+/// \brief Create an input stream from an ArrowBuffer
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream*
stream,
+ struct ArrowBuffer* input);
+
+/// \brief Create an input stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcInputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+ void* file_ptr, int
close_on_release);
+
+/// \brief Options for ArrowIpcArrayStreamReaderInit()
+struct ArrowIpcArrayStreamReaderOptions {
+ /// \brief The field index to extract. Defaults to -1 (i.e., read all
fields).
+ int64_t field_index;
+};
+
+/// \brief Initialize an ArrowArrayStream from an input stream of bytes
+///
+/// The stream of bytes must begin with a Schema message and be followed by
+/// zero or more RecordBatch messages as described in the Arrow IPC stream
+/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK
+/// is returned, the ArrowArrayStream takes ownership of input_stream and
+/// the caller is responsible for releasing out.
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+ struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+ struct ArrowIpcArrayStreamReaderOptions* options);
+
#ifdef __cplusplus
}
#endif
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
similarity index 98%
rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index 91011b8..e9c673e 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -801,21 +801,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct
ArrowIpcDecoder* decoder,
}
int swap_endian = private_data->system_endianness ==
NANOARROW_IPC_ENDIANNESS_BIG;
- *message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
- if ((*message_size_bytes) < 0) {
+ int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
+ *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
+ if (header_body_size_bytes < 0) {
ArrowErrorSet(
error, "Expected message body size > 0 but found message body size of
%ld bytes",
- (long)(*message_size_bytes));
+ (long)header_body_size_bytes);
return EINVAL;
- } else if ((*message_size_bytes) > data_mut->size_bytes) {
+ } else if (header_body_size_bytes > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found
message "
"body size of %ld bytes",
- (long)data_mut->size_bytes, (long)(*message_size_bytes));
+ (long)data_mut->size_bytes, (long)header_body_size_bytes);
return ESPIPE;
}
- if (*message_size_bytes == 0) {
+ if (header_body_size_bytes == 0) {
ArrowErrorSet(error, "End of Arrow stream");
return ENODATA;
}
@@ -832,7 +833,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
- decoder->header_size_bytes += 2 * sizeof(int32_t);
return NANOARROW_OK;
}
@@ -847,14 +847,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
// Run flatbuffers verification
- if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes)) !=
+ if (ns(Message_verify_as_root(data.data.as_uint8,
+ decoder->header_size_bytes - (2 *
sizeof(int32_t)))) !=
flatcc_verify_ok) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}
// Read some basic information from the message
- decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
decoder->metadata_version = ns(Message_version(message));
decoder->message_type = ns(Message_header_type(message));
@@ -873,7 +873,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
- decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
@@ -1046,6 +1045,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
case NANOARROW_IPC_ENDIANNESS_LITTLE:
case NANOARROW_IPC_ENDIANNESS_BIG:
private_data->endianness = endianness;
+ return NANOARROW_OK;
default:
return EINVAL;
}
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
similarity index 100%
rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
new file mode 100644
index 0000000..f813cab
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
@@ -0,0 +1,425 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow.h"
+#include "nanoarrow_ipc.h"
+
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+ struct ArrowIpcInputStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
+ src->release = NULL;
+}
+
+struct ArrowIpcInputStreamBufferPrivate {
+ struct ArrowBuffer input;
+ int64_t cursor_bytes;
+};
+
+static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct
ArrowIpcInputStream* stream,
+ uint8_t* buf, int64_t
buf_size_bytes,
+ int64_t* size_read_out,
+ struct ArrowError* error) {
+ if (buf_size_bytes == 0) {
+ *size_read_out = 0;
+ return NANOARROW_OK;
+ }
+
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+ int64_t bytes_remaining = private_data->input.size_bytes -
private_data->cursor_bytes;
+ int64_t bytes_to_read;
+ if (bytes_remaining > buf_size_bytes) {
+ bytes_to_read = buf_size_bytes;
+ } else {
+ bytes_to_read = bytes_remaining;
+ }
+
+ if (bytes_to_read > 0) {
+ memcpy(buf, private_data->input.data + private_data->cursor_bytes,
bytes_to_read);
+ }
+
+ *size_read_out = bytes_to_read;
+ private_data->cursor_bytes += bytes_to_read;
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream*
stream) {
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+ ArrowBufferReset(&private_data->input);
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream*
stream,
+ struct ArrowBuffer* input) {
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcInputStreamBufferPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ ArrowBufferMove(input, &private_data->input);
+ private_data->cursor_bytes = 0;
+ stream->read = &ArrowIpcInputStreamBufferRead;
+ stream->release = &ArrowIpcInputStreamBufferRelease;
+ stream->private_data = private_data;
+
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcInputStreamFilePrivate {
+ FILE* file_ptr;
+ int stream_finished;
+ int close_on_release;
+};
+
+static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream)
{
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->file_ptr != NULL && private_data->close_on_release) {
+ fclose(private_data->file_ptr);
+ }
+
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream*
stream,
+ uint8_t* buf, int64_t
buf_size_bytes,
+ int64_t* size_read_out,
+ struct ArrowError* error) {
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->stream_finished) {
+ *size_read_out = 0;
+ return NANOARROW_OK;
+ }
+
+ // Do the read
+ int64_t bytes_read = (int64_t)fread(buf, 1, buf_size_bytes,
private_data->file_ptr);
+ *size_read_out = bytes_read;
+
+ if (bytes_read != buf_size_bytes) {
+ private_data->stream_finished = 1;
+
+ // Inspect error
+ int has_error = !feof(private_data->file_ptr) &&
ferror(private_data->file_ptr);
+
+ // Try to close the file now
+ if (private_data->close_on_release) {
+ if (fclose(private_data->file_ptr) == 0) {
+ private_data->file_ptr = NULL;
+ }
+ }
+
+ // Maybe return error
+ if (has_error) {
+ ArrowErrorSet(error, "ArrowIpcInputStreamFile IO error");
+ return EIO;
+ }
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+ void* file_ptr, int
close_on_release) {
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcInputStreamFilePrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ private_data->file_ptr = (FILE*)file_ptr;
+ private_data->close_on_release = close_on_release;
+ private_data->stream_finished = 0;
+
+ stream->read = &ArrowIpcInputStreamFileRead;
+ stream->release = &ArrowIpcInputStreamFileRelease;
+ stream->private_data = private_data;
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcArrayStreamReaderPrivate {
+ struct ArrowIpcInputStream input;
+ struct ArrowIpcDecoder decoder;
+ struct ArrowSchema out_schema;
+ int64_t field_index;
+ struct ArrowBuffer header;
+ struct ArrowBuffer body;
+ struct ArrowError error;
+};
+
+static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+
+ if (private_data->input.release != NULL) {
+ private_data->input.release(&private_data->input);
+ }
+
+ ArrowIpcDecoderReset(&private_data->decoder);
+
+ if (private_data->out_schema.release != NULL) {
+ private_data->out_schema.release(&private_data->out_schema);
+ }
+
+ ArrowBufferReset(&private_data->header);
+ ArrowBufferReset(&private_data->body);
+
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+static int ArrowIpcArrayStreamReaderNextHeader(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data,
+ enum ArrowIpcMessageType message_type) {
+ private_data->header.size_bytes = 0;
+ int64_t bytes_read = 0;
+
+ // Read 8 bytes (continuation + header size in bytes)
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->header, 8));
+ NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+ private_data->header.data,
8,
+ &bytes_read,
&private_data->error));
+ private_data->header.size_bytes += bytes_read;
+
+ if (bytes_read == 0) {
+ // The caller might not use this error message (e.g., if the end of the
stream
+ // is one of the valid outcomes) but we set the error anyway in case it
gets
+ // propagated higher (e.g., if the stream is emtpy and there's no schema
message)
+ ArrowErrorSet(&private_data->error, "No data available on stream");
+ return ENODATA;
+ } else if (bytes_read != 8) {
+ ArrowErrorSet(&private_data->error,
+ "Expected at least 8 bytes in remainder of stream");
+ return EINVAL;
+ }
+
+ struct ArrowBufferView input_view;
+ input_view.data.data = private_data->header.data;
+ input_view.size_bytes = private_data->header.size_bytes;
+
+ // Use PeekHeader to fill in decoder.header_size_bytes
+ int result =
+ ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error);
+ if (result == ENODATA) {
+ return result;
+ }
+
+ // Read the header bytes
+ int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferReserve(&private_data->header, expected_header_bytes));
+ NANOARROW_RETURN_NOT_OK(
+ private_data->input.read(&private_data->input, private_data->header.data
+ 8,
+ expected_header_bytes, &bytes_read,
&private_data->error));
+ private_data->header.size_bytes += bytes_read;
+
+ // Verify + decode the header
+ input_view.data.data = private_data->header.data;
+ input_view.size_bytes = private_data->header.size_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder,
input_view,
+ &private_data->error));
+
+ // Don't decode the message if it's of the wrong type (because the error
message
+ // is better communicated by the caller)
+ if (private_data->decoder.message_type != message_type) {
+ return NANOARROW_OK;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder,
input_view,
+ &private_data->error));
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderNextBody(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+ int64_t bytes_read;
+ int64_t bytes_to_read = private_data->decoder.body_size_bytes;
+
+ // Read the body bytes
+ private_data->body.size_bytes = 0;
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body,
bytes_to_read));
+ NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+ private_data->body.data,
bytes_to_read,
+ &bytes_read,
&private_data->error));
+ private_data->body.size_bytes += bytes_read;
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+ if (private_data->out_schema.release != NULL) {
+ return NANOARROW_OK;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(
+ private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA));
+
+ // Error if this isn't a schema message
+ if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)
{
+ ArrowErrorSet(&private_data->error,
+ "Unexpected message type at start of input (expected
Schema)");
+ return EINVAL;
+ }
+
+ // ...or if it uses features we don't support
+ if (private_data->decoder.feature_flags &
NANOARROW_IPC_FEATURE_COMPRESSED_BODY) {
+ ArrowErrorSet(&private_data->error,
+ "This stream uses unsupported feature COMPRESSED_BODY");
+ return EINVAL;
+ }
+
+ if (private_data->decoder.feature_flags &
+ NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT) {
+ ArrowErrorSet(&private_data->error,
+ "This stream uses unsupported feature
DICTIONARY_REPLACEMENT");
+ return EINVAL;
+ }
+
+ // Notify the decoder of buffer endianness
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetEndianness(&private_data->decoder,
+
private_data->decoder.endianness));
+
+ struct ArrowSchema tmp;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeSchema(&private_data->decoder, &tmp,
&private_data->error));
+
+ // Only support "read the whole thing" for now
+ if (private_data->field_index != -1) {
+ tmp.release(&tmp);
+ ArrowErrorSet(&private_data->error, "Field index != -1 is not yet
supported");
+ return ENOTSUP;
+ }
+
+ // Notify the decoder of the schema for forthcoming messages
+ int result =
+ ArrowIpcDecoderSetSchema(&private_data->decoder, &tmp,
&private_data->error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ ArrowSchemaMove(&tmp, &private_data->out_schema);
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream,
+ struct ArrowSchema* out) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ private_data->error.message[0] = '\0';
+
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+ return ArrowSchemaDeepCopy(&private_data->out_schema, out);
+}
+
+static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
+ struct ArrowArray* out) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ // Check if we are all done
+ if (private_data->input.release == NULL) {
+ out->release = NULL;
+ return NANOARROW_OK;
+ }
+
+ private_data->error.message[0] = '\0';
+
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+
+ // Read + decode the next header
+ int result = ArrowIpcArrayStreamReaderNextHeader(
+ private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+ if (result == ENODATA) {
+ // If the stream is finished, release the input
+ private_data->input.release(&private_data->input);
+ out->release = NULL;
+ return NANOARROW_OK;
+ }
+
+ // Make sure we have a RecordBatch message
+ if (private_data->decoder.message_type !=
NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+ ArrowErrorSet(&private_data->error, "Unexpected message type (expected
RecordBatch)");
+ return EINVAL;
+ }
+
+ // Read in the body
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
+
private_data->field_index, out,
+ &private_data->error));
+
+ return NANOARROW_OK;
+}
+
+static const char* ArrowIpcArrayStreamReaderGetLastError(
+ struct ArrowArrayStream* stream) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ return private_data->error.message;
+}
+
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+ struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+ struct ArrowIpcArrayStreamReaderOptions* options) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcArrayStreamReaderPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ int result = ArrowIpcDecoderInit(&private_data->decoder);
+ if (result != NANOARROW_OK) {
+ ArrowFree(private_data);
+ return result;
+ }
+
+ ArrowBufferInit(&private_data->header);
+ ArrowBufferInit(&private_data->body);
+ private_data->out_schema.release = NULL;
+ ArrowIpcInputStreamMove(input_stream, &private_data->input);
+
+ if (options != NULL) {
+ private_data->field_index = options->field_index;
+ } else {
+ private_data->field_index = -1;
+ }
+
+ out->private_data = private_data;
+ out->get_schema = &ArrowIpcArrayStreamReaderGetSchema;
+ out->get_next = &ArrowIpcArrayStreamReaderGetNext;
+ out->get_last_error = &ArrowIpcArrayStreamReaderGetLastError;
+ out->release = &ArrowIpcArrayStreamReaderRelease;
+
+ return NANOARROW_OK;
+}
diff --git
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
new file mode 100644
index 0000000..126aaf3
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <stdio.h>
+
+#include "nanoarrow_ipc.h"
+
+static uint8_t kSimpleSchema[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00,
0x00, 0x00,
+ 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00,
0x0c, 0x00,
+ 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00,
0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
0x84, 0xff,
+ 0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00,
0x00, 0x00,
+ 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00,
0x08, 0x00,
+ 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00,
0x00, 0x00,
+ 0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00,
0x18, 0x00,
+ 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00,
0x14, 0x00,
+ 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00,
0x70, 0x00,
+ 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c,
0x00, 0x00,
+ 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00,
0x04, 0x00,
+ 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76,
0x61, 0x6c,
+ 0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00,
0x0e, 0x00,
+ 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66,
0x69, 0x65,
+ 0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00,
0x08, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00};
+
+static uint8_t kSimpleRecordBatch[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00,
0x0c, 0x00,
+ 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
+static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00,
0x00};
+
+TEST(NanoarrowIpcReader, InputStreamBuffer) {
+ uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+ struct ArrowBuffer input;
+ ArrowBufferInit(&input);
+ ASSERT_EQ(ArrowBufferAppend(&input, input_data, sizeof(input_data)),
NANOARROW_OK);
+
+ struct ArrowIpcInputStream stream;
+ uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff};
+ int64_t size_read_bytes;
+
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&stream, &input), NANOARROW_OK);
+ EXPECT_EQ(input.data, nullptr);
+
+ EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 2);
+ uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff};
+ EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 2);
+ uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff};
+ EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 1);
+ uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+ EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 0);
+
+ EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 0);
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, InputStreamFile) {
+ uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+ FILE* file_ptr = tmpfile();
+ ASSERT_NE(file_ptr, nullptr);
+ ASSERT_EQ(fwrite(input_data, 1, sizeof(input_data), file_ptr),
sizeof(input_data));
+ fseek(file_ptr, 0, SEEK_SET);
+
+ struct ArrowIpcInputStream stream;
+ uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff};
+ int64_t size_read_bytes;
+
+ ASSERT_EQ(ArrowIpcInputStreamInitFile(&stream, file_ptr, 1), NANOARROW_OK);
+
+ EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 2);
+ uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff};
+ EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 2);
+ uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff};
+ EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 1);
+ uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+ EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0);
+
+ EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 0);
+
+ EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(size_read_bytes, 0);
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderBasic) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kSimpleRecordBatch,
sizeof(kSimpleRecordBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ schema.release(&schema);
+
+ struct ArrowArray array;
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.length, 3);
+ array.release(&array);
+
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kSimpleRecordBatch,
sizeof(kSimpleRecordBatch)),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kEndOfStream,
sizeof(kEndOfStream)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ schema.release(&schema);
+
+ struct ArrowArray array;
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.length, 3);
+ array.release(&array);
+
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderExpectedRecordBatch) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ schema.release(&schema);
+
+ struct ArrowArray array;
+ ASSERT_EQ(stream.get_next(&stream, &array), EINVAL);
+ EXPECT_STREQ(stream.get_last_error(&stream),
+ "Unexpected message type (expected RecordBatch)");
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleRecordBatch,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL);
+ EXPECT_STREQ(stream.get_last_error(&stream),
+ "Unexpected message type at start of input (expected Schema)");
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kSimpleRecordBatch,
sizeof(kSimpleRecordBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ struct ArrowIpcArrayStreamReaderOptions options;
+ options.field_index = 0;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), ENOTSUP);
+ EXPECT_STREQ(stream.get_last_error(&stream), "Field index != -1 is not yet
supported");
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderEmptyInput) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), ENODATA);
+ EXPECT_STREQ(stream.get_last_error(&stream), "No data available on stream");
+
+ stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderIncompletePrefix) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppendUInt8(&input_buffer, 0x00), NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL);
+ EXPECT_STREQ(stream.get_last_error(&stream),
+ "Expected at least 8 bytes in remainder of stream");
+
+ stream.release(&stream);
+}