This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 85cc528 ipc: Update dist/ for commit
62cffa6a4e182235a82f3ea9f8715258b7b2ca56
85cc528 is described below
commit 85cc52853d3b395e14caddce74a9f66a8d32a1d2
Author: GitHub Actions <[email protected]>
AuthorDate: Wed Mar 22 20:13:57 2023 +0000
ipc: Update dist/ for commit 62cffa6a4e182235a82f3ea9f8715258b7b2ca56
---
dist/nanoarrow_ipc.c | 445 +++++++++++++++++++++++++++++++++++++++++++++++++--
dist/nanoarrow_ipc.h | 62 +++++++
2 files changed, 497 insertions(+), 10 deletions(-)
diff --git a/dist/nanoarrow_ipc.c b/dist/nanoarrow_ipc.c
index f76f4e9..d1d09ae 100644
--- a/dist/nanoarrow_ipc.c
+++ b/dist/nanoarrow_ipc.c
@@ -21072,21 +21072,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct
ArrowIpcDecoder* decoder,
}
int swap_endian = private_data->system_endianness ==
NANOARROW_IPC_ENDIANNESS_BIG;
- *message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
- if ((*message_size_bytes) < 0) {
+ int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
+ *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
+ if (header_body_size_bytes < 0) {
ArrowErrorSet(
error, "Expected message body size > 0 but found message body size of
%ld bytes",
- (long)(*message_size_bytes));
+ (long)header_body_size_bytes);
return EINVAL;
- } else if ((*message_size_bytes) > data_mut->size_bytes) {
+ } else if (header_body_size_bytes > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found
message "
"body size of %ld bytes",
- (long)data_mut->size_bytes, (long)(*message_size_bytes));
+ (long)data_mut->size_bytes, (long)header_body_size_bytes);
return ESPIPE;
}
- if (*message_size_bytes == 0) {
+ if (header_body_size_bytes == 0) {
ArrowErrorSet(error, "End of Arrow stream");
return ENODATA;
}
@@ -21103,7 +21104,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
- decoder->header_size_bytes += 2 * sizeof(int32_t);
return NANOARROW_OK;
}
@@ -21118,14 +21118,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
// Run flatbuffers verification
- if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes)) !=
+ if (ns(Message_verify_as_root(data.data.as_uint8,
+ decoder->header_size_bytes - (2 *
sizeof(int32_t)))) !=
flatcc_verify_ok) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}
// Read some basic information from the message
- decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
decoder->metadata_version = ns(Message_version(message));
decoder->message_type = ns(Message_header_type(message));
@@ -21144,7 +21144,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct
ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
- decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
@@ -21317,6 +21316,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
case NANOARROW_IPC_ENDIANNESS_LITTLE:
case NANOARROW_IPC_ENDIANNESS_BIG:
private_data->endianness = endianness;
+ return NANOARROW_OK;
default:
return EINVAL;
}
@@ -21483,3 +21483,428 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow.h"
+#include "nanoarrow_ipc.h"
+
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+ struct ArrowIpcInputStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
+ src->release = NULL;
+}
+
+struct ArrowIpcInputStreamBufferPrivate {
+ struct ArrowBuffer input;
+ int64_t cursor_bytes;
+};
+
+static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct
ArrowIpcInputStream* stream,
+ uint8_t* buf, int64_t
buf_size_bytes,
+ int64_t* size_read_out,
+ struct ArrowError* error) {
+ if (buf_size_bytes == 0) {
+ *size_read_out = 0;
+ return NANOARROW_OK;
+ }
+
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+ int64_t bytes_remaining = private_data->input.size_bytes -
private_data->cursor_bytes;
+ int64_t bytes_to_read;
+ if (bytes_remaining > buf_size_bytes) {
+ bytes_to_read = buf_size_bytes;
+ } else {
+ bytes_to_read = bytes_remaining;
+ }
+
+ if (bytes_to_read > 0) {
+ memcpy(buf, private_data->input.data + private_data->cursor_bytes,
bytes_to_read);
+ }
+
+ *size_read_out = bytes_to_read;
+ private_data->cursor_bytes += bytes_to_read;
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream*
stream) {
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+ ArrowBufferReset(&private_data->input);
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream*
stream,
+ struct ArrowBuffer* input) {
+ struct ArrowIpcInputStreamBufferPrivate* private_data =
+ (struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcInputStreamBufferPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ ArrowBufferMove(input, &private_data->input);
+ private_data->cursor_bytes = 0;
+ stream->read = &ArrowIpcInputStreamBufferRead;
+ stream->release = &ArrowIpcInputStreamBufferRelease;
+ stream->private_data = private_data;
+
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcInputStreamFilePrivate {
+ FILE* file_ptr;
+ int stream_finished;
+ int close_on_release;
+};
+
+static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream)
{
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->file_ptr != NULL && private_data->close_on_release) {
+ fclose(private_data->file_ptr);
+ }
+
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream*
stream,
+ uint8_t* buf, int64_t
buf_size_bytes,
+ int64_t* size_read_out,
+ struct ArrowError* error) {
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->stream_finished) {
+ *size_read_out = 0;
+ return NANOARROW_OK;
+ }
+
+ // Do the read
+ int64_t bytes_read = (int64_t)fread(buf, 1, buf_size_bytes,
private_data->file_ptr);
+ *size_read_out = bytes_read;
+
+ if (bytes_read != buf_size_bytes) {
+ private_data->stream_finished = 1;
+
+ // Inspect error
+ int has_error = !feof(private_data->file_ptr) &&
ferror(private_data->file_ptr);
+
+ // Try to close the file now
+ if (private_data->close_on_release) {
+ if (fclose(private_data->file_ptr) == 0) {
+ private_data->file_ptr = NULL;
+ }
+ }
+
+ // Maybe return error
+ if (has_error) {
+ ArrowErrorSet(error, "ArrowIpcInputStreamFile IO error");
+ return EIO;
+ }
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+ void* file_ptr, int
close_on_release) {
+ struct ArrowIpcInputStreamFilePrivate* private_data =
+ (struct ArrowIpcInputStreamFilePrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcInputStreamFilePrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ private_data->file_ptr = (FILE*)file_ptr;
+ private_data->close_on_release = close_on_release;
+ private_data->stream_finished = 0;
+
+ stream->read = &ArrowIpcInputStreamFileRead;
+ stream->release = &ArrowIpcInputStreamFileRelease;
+ stream->private_data = private_data;
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcArrayStreamReaderPrivate {
+ struct ArrowIpcInputStream input;
+ struct ArrowIpcDecoder decoder;
+ struct ArrowSchema out_schema;
+ int64_t field_index;
+ struct ArrowBuffer header;
+ struct ArrowBuffer body;
+ struct ArrowError error;
+};
+
+static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+
+ if (private_data->input.release != NULL) {
+ private_data->input.release(&private_data->input);
+ }
+
+ ArrowIpcDecoderReset(&private_data->decoder);
+
+ if (private_data->out_schema.release != NULL) {
+ private_data->out_schema.release(&private_data->out_schema);
+ }
+
+ ArrowBufferReset(&private_data->header);
+ ArrowBufferReset(&private_data->body);
+
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+static int ArrowIpcArrayStreamReaderNextHeader(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data,
+ enum ArrowIpcMessageType message_type) {
+ private_data->header.size_bytes = 0;
+ int64_t bytes_read = 0;
+
+ // Read 8 bytes (continuation + header size in bytes)
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->header, 8));
+ NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+ private_data->header.data,
8,
+ &bytes_read,
&private_data->error));
+ private_data->header.size_bytes += bytes_read;
+
+ if (bytes_read == 0) {
+ // The caller might not use this error message (e.g., if the end of the
stream
+ // is one of the valid outcomes) but we set the error anyway in case it
gets
+ // propagated higher (e.g., if the stream is emtpy and there's no schema
message)
+ ArrowErrorSet(&private_data->error, "No data available on stream");
+ return ENODATA;
+ } else if (bytes_read != 8) {
+ ArrowErrorSet(&private_data->error,
+ "Expected at least 8 bytes in remainder of stream");
+ return EINVAL;
+ }
+
+ struct ArrowBufferView input_view;
+ input_view.data.data = private_data->header.data;
+ input_view.size_bytes = private_data->header.size_bytes;
+
+ // Use PeekHeader to fill in decoder.header_size_bytes
+ int result =
+ ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error);
+ if (result == ENODATA) {
+ return result;
+ }
+
+ // Read the header bytes
+ int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferReserve(&private_data->header, expected_header_bytes));
+ NANOARROW_RETURN_NOT_OK(
+ private_data->input.read(&private_data->input, private_data->header.data
+ 8,
+ expected_header_bytes, &bytes_read,
&private_data->error));
+ private_data->header.size_bytes += bytes_read;
+
+ // Verify + decode the header
+ input_view.data.data = private_data->header.data;
+ input_view.size_bytes = private_data->header.size_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder,
input_view,
+ &private_data->error));
+
+ // Don't decode the message if it's of the wrong type (because the error
message
+ // is better communicated by the caller)
+ if (private_data->decoder.message_type != message_type) {
+ return NANOARROW_OK;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder,
input_view,
+ &private_data->error));
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderNextBody(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+ int64_t bytes_read;
+ int64_t bytes_to_read = private_data->decoder.body_size_bytes;
+
+ // Read the body bytes
+ private_data->body.size_bytes = 0;
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body,
bytes_to_read));
+ NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+ private_data->body.data,
bytes_to_read,
+ &bytes_read,
&private_data->error));
+ private_data->body.size_bytes += bytes_read;
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+ if (private_data->out_schema.release != NULL) {
+ return NANOARROW_OK;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(
+ private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA));
+
+ // Error if this isn't a schema message
+ if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)
{
+ ArrowErrorSet(&private_data->error,
+ "Unexpected message type at start of input (expected
Schema)");
+ return EINVAL;
+ }
+
+ // ...or if it uses features we don't support
+ if (private_data->decoder.feature_flags &
NANOARROW_IPC_FEATURE_COMPRESSED_BODY) {
+ ArrowErrorSet(&private_data->error,
+ "This stream uses unsupported feature COMPRESSED_BODY");
+ return EINVAL;
+ }
+
+ if (private_data->decoder.feature_flags &
+ NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT) {
+ ArrowErrorSet(&private_data->error,
+ "This stream uses unsupported feature
DICTIONARY_REPLACEMENT");
+ return EINVAL;
+ }
+
+ // Notify the decoder of buffer endianness
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetEndianness(&private_data->decoder,
+
private_data->decoder.endianness));
+
+ struct ArrowSchema tmp;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeSchema(&private_data->decoder, &tmp,
&private_data->error));
+
+ // Only support "read the whole thing" for now
+ if (private_data->field_index != -1) {
+ tmp.release(&tmp);
+ ArrowErrorSet(&private_data->error, "Field index != -1 is not yet
supported");
+ return ENOTSUP;
+ }
+
+ // Notify the decoder of the schema for forthcoming messages
+ int result =
+ ArrowIpcDecoderSetSchema(&private_data->decoder, &tmp,
&private_data->error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ ArrowSchemaMove(&tmp, &private_data->out_schema);
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream,
+ struct ArrowSchema* out) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ private_data->error.message[0] = '\0';
+
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+ return ArrowSchemaDeepCopy(&private_data->out_schema, out);
+}
+
+static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
+ struct ArrowArray* out) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ // Check if we are all done
+ if (private_data->input.release == NULL) {
+ out->release = NULL;
+ return NANOARROW_OK;
+ }
+
+ private_data->error.message[0] = '\0';
+
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+
+ // Read + decode the next header
+ int result = ArrowIpcArrayStreamReaderNextHeader(
+ private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+ if (result == ENODATA) {
+ // If the stream is finished, release the input
+ private_data->input.release(&private_data->input);
+ out->release = NULL;
+ return NANOARROW_OK;
+ }
+
+ // Make sure we have a RecordBatch message
+ if (private_data->decoder.message_type !=
NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+ ArrowErrorSet(&private_data->error, "Unexpected message type (expected
RecordBatch)");
+ return EINVAL;
+ }
+
+ // Read in the body
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
+
private_data->field_index, out,
+ &private_data->error));
+
+ return NANOARROW_OK;
+}
+
+static const char* ArrowIpcArrayStreamReaderGetLastError(
+ struct ArrowArrayStream* stream) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ return private_data->error.message;
+}
+
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+ struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+ struct ArrowIpcArrayStreamReaderOptions* options) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcArrayStreamReaderPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ int result = ArrowIpcDecoderInit(&private_data->decoder);
+ if (result != NANOARROW_OK) {
+ ArrowFree(private_data);
+ return result;
+ }
+
+ ArrowBufferInit(&private_data->header);
+ ArrowBufferInit(&private_data->body);
+ private_data->out_schema.release = NULL;
+ ArrowIpcInputStreamMove(input_stream, &private_data->input);
+
+ if (options != NULL) {
+ private_data->field_index = options->field_index;
+ } else {
+ private_data->field_index = -1;
+ }
+
+ out->private_data = private_data;
+ out->get_schema = &ArrowIpcArrayStreamReaderGetSchema;
+ out->get_next = &ArrowIpcArrayStreamReaderGetNext;
+ out->get_last_error = &ArrowIpcArrayStreamReaderGetLastError;
+ out->release = &ArrowIpcArrayStreamReaderRelease;
+
+ return NANOARROW_OK;
+}
diff --git a/dist/nanoarrow_ipc.h b/dist/nanoarrow_ipc.h
index 155150e..071b299 100644
--- a/dist/nanoarrow_ipc.h
+++ b/dist/nanoarrow_ipc.h
@@ -39,6 +39,14 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetEndianness \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness)
+#define ArrowIpcInputStreamInitBuffer \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer)
+#define ArrowIpcInputStreamInitFile \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile)
+#define ArrowIpcInputStreamMove \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
+#define ArrowIpcArrayStreamReaderInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
#endif
@@ -219,6 +227,60 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
struct ArrowArray* out,
struct ArrowError* error);
+/// \brief An user-extensible input data source
+struct ArrowIpcInputStream {
+ /// \brief Read up to buf_size_bytes from stream into buf
+ ///
+ /// The actual number of bytes read is placed in the value pointed to by
+ /// size_read_out. Returns NANOARROW_OK on success.
+ ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf,
+ int64_t buf_size_bytes, int64_t* size_read_out,
+ struct ArrowError* error);
+
+ /// \brief Release the stream and any resources it may be holding
+ ///
+ /// Release callback implementations must set the release member to NULL.
+ /// Callers must check that the release callback is not NULL before calling
+ /// read() or release().
+ void (*release)(struct ArrowIpcInputStream* stream);
+
+ /// \brief Private implementation-defined data
+ void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcInputStream
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+ struct ArrowIpcInputStream* dst);
+
+/// \brief Create an input stream from an ArrowBuffer
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream*
stream,
+ struct ArrowBuffer* input);
+
+/// \brief Create an input stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcInputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+ void* file_ptr, int
close_on_release);
+
+/// \brief Options for ArrowIpcArrayStreamReaderInit()
+struct ArrowIpcArrayStreamReaderOptions {
+ /// \brief The field index to extract. Defaults to -1 (i.e., read all
fields).
+ int64_t field_index;
+};
+
+/// \brief Initialize an ArrowArrayStream from an input stream of bytes
+///
+/// The stream of bytes must begin with a Schema message and be followed by
+/// zero or more RecordBatch messages as described in the Arrow IPC stream
+/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK
+/// is returned, the ArrowArrayStream takes ownership of input_stream and
+/// the caller is responsible for releasing out.
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+ struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+ struct ArrowIpcArrayStreamReaderOptions* options);
+
#ifdef __cplusplus
}
#endif