This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 85cc528  ipc: Update dist/ for commit 
62cffa6a4e182235a82f3ea9f8715258b7b2ca56
85cc528 is described below

commit 85cc52853d3b395e14caddce74a9f66a8d32a1d2
Author: GitHub Actions <[email protected]>
AuthorDate: Wed Mar 22 20:13:57 2023 +0000

    ipc: Update dist/ for commit 62cffa6a4e182235a82f3ea9f8715258b7b2ca56
---
 dist/nanoarrow_ipc.c | 445 +++++++++++++++++++++++++++++++++++++++++++++++++--
 dist/nanoarrow_ipc.h |  62 +++++++
 2 files changed, 497 insertions(+), 10 deletions(-)

diff --git a/dist/nanoarrow_ipc.c b/dist/nanoarrow_ipc.c
index f76f4e9..d1d09ae 100644
--- a/dist/nanoarrow_ipc.c
+++ b/dist/nanoarrow_ipc.c
@@ -21072,21 +21072,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct 
ArrowIpcDecoder* decoder,
   }
 
   int swap_endian = private_data->system_endianness == 
NANOARROW_IPC_ENDIANNESS_BIG;
-  *message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
-  if ((*message_size_bytes) < 0) {
+  int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
+  *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
+  if (header_body_size_bytes < 0) {
     ArrowErrorSet(
         error, "Expected message body size > 0 but found message body size of 
%ld bytes",
-        (long)(*message_size_bytes));
+        (long)header_body_size_bytes);
     return EINVAL;
-  } else if ((*message_size_bytes) > data_mut->size_bytes) {
+  } else if (header_body_size_bytes > data_mut->size_bytes) {
     ArrowErrorSet(error,
                   "Expected 0 <= message body size <= %ld bytes but found 
message "
                   "body size of %ld bytes",
-                  (long)data_mut->size_bytes, (long)(*message_size_bytes));
+                  (long)data_mut->size_bytes, (long)header_body_size_bytes);
     return ESPIPE;
   }
 
-  if (*message_size_bytes == 0) {
+  if (header_body_size_bytes == 0) {
     ArrowErrorSet(error, "End of Arrow stream");
     return ENODATA;
   }
@@ -21103,7 +21104,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct 
ArrowIpcDecoder* decoder,
   ArrowIpcDecoderResetHeaderInfo(decoder);
   NANOARROW_RETURN_NOT_OK(
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
   return NANOARROW_OK;
 }
 
@@ -21118,14 +21118,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct 
ArrowIpcDecoder* decoder,
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
 
   // Run flatbuffers verification
-  if (ns(Message_verify_as_root(data.data.as_uint8, 
decoder->header_size_bytes)) !=
+  if (ns(Message_verify_as_root(data.data.as_uint8,
+                                decoder->header_size_bytes - (2 * 
sizeof(int32_t)))) !=
       flatcc_verify_ok) {
     ArrowErrorSet(error, "Message flatbuffer verification failed");
     return EINVAL;
   }
 
   // Read some basic information from the message
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   decoder->metadata_version = ns(Message_version(message));
   decoder->message_type = ns(Message_header_type(message));
@@ -21144,7 +21144,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct 
ArrowIpcDecoder* decoder,
   ArrowIpcDecoderResetHeaderInfo(decoder);
   NANOARROW_RETURN_NOT_OK(
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
 
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   if (!message) {
@@ -21317,6 +21316,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct 
ArrowIpcDecoder* decoder,
     case NANOARROW_IPC_ENDIANNESS_LITTLE:
     case NANOARROW_IPC_ENDIANNESS_BIG:
       private_data->endianness = endianness;
+      return NANOARROW_OK;
     default:
       return EINVAL;
   }
@@ -21483,3 +21483,428 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
   ArrowArrayMove(&temp, out);
   return NANOARROW_OK;
 }
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow.h"
+#include "nanoarrow_ipc.h"
+
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+                             struct ArrowIpcInputStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
+  src->release = NULL;
+}
+
+struct ArrowIpcInputStreamBufferPrivate {
+  struct ArrowBuffer input;
+  int64_t cursor_bytes;
+};
+
+static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct 
ArrowIpcInputStream* stream,
+                                                    uint8_t* buf, int64_t 
buf_size_bytes,
+                                                    int64_t* size_read_out,
+                                                    struct ArrowError* error) {
+  if (buf_size_bytes == 0) {
+    *size_read_out = 0;
+    return NANOARROW_OK;
+  }
+
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+  int64_t bytes_remaining = private_data->input.size_bytes - 
private_data->cursor_bytes;
+  int64_t bytes_to_read;
+  if (bytes_remaining > buf_size_bytes) {
+    bytes_to_read = buf_size_bytes;
+  } else {
+    bytes_to_read = bytes_remaining;
+  }
+
+  if (bytes_to_read > 0) {
+    memcpy(buf, private_data->input.data + private_data->cursor_bytes, 
bytes_to_read);
+  }
+
+  *size_read_out = bytes_to_read;
+  private_data->cursor_bytes += bytes_to_read;
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream* 
stream) {
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+  ArrowBufferReset(&private_data->input);
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* 
stream,
+                                             struct ArrowBuffer* input) {
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcInputStreamBufferPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  ArrowBufferMove(input, &private_data->input);
+  private_data->cursor_bytes = 0;
+  stream->read = &ArrowIpcInputStreamBufferRead;
+  stream->release = &ArrowIpcInputStreamBufferRelease;
+  stream->private_data = private_data;
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcInputStreamFilePrivate {
+  FILE* file_ptr;
+  int stream_finished;
+  int close_on_release;
+};
+
+static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream) 
{
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->file_ptr != NULL && private_data->close_on_release) {
+    fclose(private_data->file_ptr);
+  }
+
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* 
stream,
+                                                  uint8_t* buf, int64_t 
buf_size_bytes,
+                                                  int64_t* size_read_out,
+                                                  struct ArrowError* error) {
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->stream_finished) {
+    *size_read_out = 0;
+    return NANOARROW_OK;
+  }
+
+  // Do the read
+  int64_t bytes_read = (int64_t)fread(buf, 1, buf_size_bytes, 
private_data->file_ptr);
+  *size_read_out = bytes_read;
+
+  if (bytes_read != buf_size_bytes) {
+    private_data->stream_finished = 1;
+
+    // Inspect error
+    int has_error = !feof(private_data->file_ptr) && 
ferror(private_data->file_ptr);
+
+    // Try to close the file now
+    if (private_data->close_on_release) {
+      if (fclose(private_data->file_ptr) == 0) {
+        private_data->file_ptr = NULL;
+      }
+    }
+
+    // Maybe return error
+    if (has_error) {
+      ArrowErrorSet(error, "ArrowIpcInputStreamFile IO error");
+      return EIO;
+    }
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+                                           void* file_ptr, int 
close_on_release) {
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcInputStreamFilePrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  private_data->file_ptr = (FILE*)file_ptr;
+  private_data->close_on_release = close_on_release;
+  private_data->stream_finished = 0;
+
+  stream->read = &ArrowIpcInputStreamFileRead;
+  stream->release = &ArrowIpcInputStreamFileRelease;
+  stream->private_data = private_data;
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArrayStreamReaderPrivate {
+  struct ArrowIpcInputStream input;
+  struct ArrowIpcDecoder decoder;
+  struct ArrowSchema out_schema;
+  int64_t field_index;
+  struct ArrowBuffer header;
+  struct ArrowBuffer body;
+  struct ArrowError error;
+};
+
+static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+
+  if (private_data->input.release != NULL) {
+    private_data->input.release(&private_data->input);
+  }
+
+  ArrowIpcDecoderReset(&private_data->decoder);
+
+  if (private_data->out_schema.release != NULL) {
+    private_data->out_schema.release(&private_data->out_schema);
+  }
+
+  ArrowBufferReset(&private_data->header);
+  ArrowBufferReset(&private_data->body);
+
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+static int ArrowIpcArrayStreamReaderNextHeader(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data,
+    enum ArrowIpcMessageType message_type) {
+  private_data->header.size_bytes = 0;
+  int64_t bytes_read = 0;
+
+  // Read 8 bytes (continuation + header size in bytes)
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->header, 8));
+  NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+                                                   private_data->header.data, 
8,
+                                                   &bytes_read, 
&private_data->error));
+  private_data->header.size_bytes += bytes_read;
+
+  if (bytes_read == 0) {
+    // The caller might not use this error message (e.g., if the end of the 
stream
+    // is one of the valid outcomes) but we set the error anyway in case it 
gets
+    // propagated higher (e.g., if the stream is emtpy and there's no schema 
message)
+    ArrowErrorSet(&private_data->error, "No data available on stream");
+    return ENODATA;
+  } else if (bytes_read != 8) {
+    ArrowErrorSet(&private_data->error,
+                  "Expected at least 8 bytes in remainder of stream");
+    return EINVAL;
+  }
+
+  struct ArrowBufferView input_view;
+  input_view.data.data = private_data->header.data;
+  input_view.size_bytes = private_data->header.size_bytes;
+
+  // Use PeekHeader to fill in decoder.header_size_bytes
+  int result =
+      ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, 
&private_data->error);
+  if (result == ENODATA) {
+    return result;
+  }
+
+  // Read the header bytes
+  int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowBufferReserve(&private_data->header, expected_header_bytes));
+  NANOARROW_RETURN_NOT_OK(
+      private_data->input.read(&private_data->input, private_data->header.data 
+ 8,
+                               expected_header_bytes, &bytes_read, 
&private_data->error));
+  private_data->header.size_bytes += bytes_read;
+
+  // Verify + decode the header
+  input_view.data.data = private_data->header.data;
+  input_view.size_bytes = private_data->header.size_bytes;
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, 
input_view,
+                                                      &private_data->error));
+
+  // Don't decode the message if it's of the wrong type (because the error 
message
+  // is better communicated by the caller)
+  if (private_data->decoder.message_type != message_type) {
+    return NANOARROW_OK;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, 
input_view,
+                                                      &private_data->error));
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderNextBody(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+  int64_t bytes_read;
+  int64_t bytes_to_read = private_data->decoder.body_size_bytes;
+
+  // Read the body bytes
+  private_data->body.size_bytes = 0;
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body, 
bytes_to_read));
+  NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+                                                   private_data->body.data, 
bytes_to_read,
+                                                   &bytes_read, 
&private_data->error));
+  private_data->body.size_bytes += bytes_read;
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+  if (private_data->out_schema.release != NULL) {
+    return NANOARROW_OK;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(
+      private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA));
+
+  // Error if this isn't a schema message
+  if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) 
{
+    ArrowErrorSet(&private_data->error,
+                  "Unexpected message type at start of input (expected 
Schema)");
+    return EINVAL;
+  }
+
+  // ...or if it uses features we don't support
+  if (private_data->decoder.feature_flags & 
NANOARROW_IPC_FEATURE_COMPRESSED_BODY) {
+    ArrowErrorSet(&private_data->error,
+                  "This stream uses unsupported feature COMPRESSED_BODY");
+    return EINVAL;
+  }
+
+  if (private_data->decoder.feature_flags &
+      NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT) {
+    ArrowErrorSet(&private_data->error,
+                  "This stream uses unsupported feature 
DICTIONARY_REPLACEMENT");
+    return EINVAL;
+  }
+
+  // Notify the decoder of buffer endianness
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetEndianness(&private_data->decoder,
+                                                       
private_data->decoder.endianness));
+
+  struct ArrowSchema tmp;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowIpcDecoderDecodeSchema(&private_data->decoder, &tmp, 
&private_data->error));
+
+  // Only support "read the whole thing" for now
+  if (private_data->field_index != -1) {
+    tmp.release(&tmp);
+    ArrowErrorSet(&private_data->error, "Field index != -1 is not yet 
supported");
+    return ENOTSUP;
+  }
+
+  // Notify the decoder of the schema for forthcoming messages
+  int result =
+      ArrowIpcDecoderSetSchema(&private_data->decoder, &tmp, 
&private_data->error);
+  if (result != NANOARROW_OK) {
+    tmp.release(&tmp);
+    return result;
+  }
+
+  ArrowSchemaMove(&tmp, &private_data->out_schema);
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream,
+                                              struct ArrowSchema* out) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  private_data->error.message[0] = '\0';
+  
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+  return ArrowSchemaDeepCopy(&private_data->out_schema, out);
+}
+
+static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
+                                            struct ArrowArray* out) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  // Check if we are all done
+  if (private_data->input.release == NULL) {
+    out->release = NULL;
+    return NANOARROW_OK;
+  }
+
+  private_data->error.message[0] = '\0';
+  
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+
+  // Read + decode the next header
+  int result = ArrowIpcArrayStreamReaderNextHeader(
+      private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  if (result == ENODATA) {
+    // If the stream is finished, release the input
+    private_data->input.release(&private_data->input);
+    out->release = NULL;
+    return NANOARROW_OK;
+  }
+
+  // Make sure we have a RecordBatch message
+  if (private_data->decoder.message_type != 
NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(&private_data->error, "Unexpected message type (expected 
RecordBatch)");
+    return EINVAL;
+  }
+
+  // Read in the body
+  NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+  struct ArrowBufferView body_view;
+  body_view.data.data = private_data->body.data;
+  body_view.size_bytes = private_data->body.size_bytes;
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
+                                                     
private_data->field_index, out,
+                                                     &private_data->error));
+
+  return NANOARROW_OK;
+}
+
+static const char* ArrowIpcArrayStreamReaderGetLastError(
+    struct ArrowArrayStream* stream) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  return private_data->error.message;
+}
+
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+    struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+    struct ArrowIpcArrayStreamReaderOptions* options) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcArrayStreamReaderPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  int result = ArrowIpcDecoderInit(&private_data->decoder);
+  if (result != NANOARROW_OK) {
+    ArrowFree(private_data);
+    return result;
+  }
+
+  ArrowBufferInit(&private_data->header);
+  ArrowBufferInit(&private_data->body);
+  private_data->out_schema.release = NULL;
+  ArrowIpcInputStreamMove(input_stream, &private_data->input);
+
+  if (options != NULL) {
+    private_data->field_index = options->field_index;
+  } else {
+    private_data->field_index = -1;
+  }
+
+  out->private_data = private_data;
+  out->get_schema = &ArrowIpcArrayStreamReaderGetSchema;
+  out->get_next = &ArrowIpcArrayStreamReaderGetNext;
+  out->get_last_error = &ArrowIpcArrayStreamReaderGetLastError;
+  out->release = &ArrowIpcArrayStreamReaderRelease;
+
+  return NANOARROW_OK;
+}
diff --git a/dist/nanoarrow_ipc.h b/dist/nanoarrow_ipc.h
index 155150e..071b299 100644
--- a/dist/nanoarrow_ipc.h
+++ b/dist/nanoarrow_ipc.h
@@ -39,6 +39,14 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
 #define ArrowIpcDecoderSetEndianness \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness)
+#define ArrowIpcInputStreamInitBuffer \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer)
+#define ArrowIpcInputStreamInitFile \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile)
+#define ArrowIpcInputStreamMove \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
+#define ArrowIpcArrayStreamReaderInit \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
 
 #endif
 
@@ -219,6 +227,60 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
                                           struct ArrowArray* out,
                                           struct ArrowError* error);
 
+/// \brief An user-extensible input data source
+struct ArrowIpcInputStream {
+  /// \brief Read up to buf_size_bytes from stream into buf
+  ///
+  /// The actual number of bytes read is placed in the value pointed to by
+  /// size_read_out. Returns NANOARROW_OK on success.
+  ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf,
+                         int64_t buf_size_bytes, int64_t* size_read_out,
+                         struct ArrowError* error);
+
+  /// \brief Release the stream and any resources it may be holding
+  ///
+  /// Release callback implementations must set the release member to NULL.
+  /// Callers must check that the release callback is not NULL before calling
+  /// read() or release().
+  void (*release)(struct ArrowIpcInputStream* stream);
+
+  /// \brief Private implementation-defined data
+  void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcInputStream
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+                             struct ArrowIpcInputStream* dst);
+
+/// \brief Create an input stream from an ArrowBuffer
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* 
stream,
+                                             struct ArrowBuffer* input);
+
+/// \brief Create an input stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcInputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+                                           void* file_ptr, int 
close_on_release);
+
+/// \brief Options for ArrowIpcArrayStreamReaderInit()
+struct ArrowIpcArrayStreamReaderOptions {
+  /// \brief The field index to extract. Defaults to -1 (i.e., read all 
fields).
+  int64_t field_index;
+};
+
+/// \brief Initialize an ArrowArrayStream from an input stream of bytes
+///
+/// The stream of bytes must begin with a Schema message and be followed by
+/// zero or more RecordBatch messages as described in the Arrow IPC stream
+/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK
+/// is returned, the ArrowArrayStream takes ownership of input_stream and
+/// the caller is responsible for releasing out.
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+    struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+    struct ArrowIpcArrayStreamReaderOptions* options);
+
 #ifdef __cplusplus
 }
 #endif

Reply via email to