This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a3f0929  fix(extensions/nanoarrow_ipc): Fix crash and mixleading error 
messages resulting from corrupted streams (#289)
a3f0929 is described below

commit a3f09295ef964b9e7279dc4ddceec7759d972bc9
Author: Dewey Dunnington <de...@dunnington.ca>
AuthorDate: Fri Aug 25 21:43:42 2023 -0300

    fix(extensions/nanoarrow_ipc): Fix crash and mixleading error messages 
resulting from corrupted streams (#289)
    
    Closes #287.
    
    The reported crash occurred because the internal
    `ArrowIpcDecoderVerifyHeader()` incorrectly interpreted the return value
    of `ArrowIpcDecoderCheckHeader()`. Because the header checker returned
    an error code sometimes even if it succeeded, we had been ignoring the
    error in `ArrowIpcDecoderVerifyHeader()` which resulted in issuing
    commands like `memcpy(dst, src, -8)`. I believe that's undefined
    behaviour, leading to the intermittent nature of the crash.
    
    In adding tests for this kind of error, I also made some improvements to
    error messages along the way.
    
    ```bash
    # docker run --rm -it ghcr.io/apache/arrow-nanoarrow:ubuntu
    # git clone https://github.com/apache/arrow-nanoarrow.git /arrow-nanoarrow
    # or
    # docker run --rm -it -v$(pwd):/arrow-nanoarrow 
ghcr.io/apache/arrow-nanoarrow:ubuntu
    
    cd /arrow-nanoarrow/extensions/nanoarrow_ipc
    mkdir build && cd build
    cmake .. -DNANOARROW_IPC_BUILD_APPS=ON
    
    curl 
https://gist.githubusercontent.com/amoeba/b64fc94ba5224bafcb3734bd261181d5/raw/af4c93da7ce6affba74a80e1ba94ed9573e91be8/test_arrow_data
 | \
        base64 -d > test_binary
    
    with_byte_removed() {
        BYTE_PLUS_ONE=$(($2 + 2))
        cat $1 | head -c $2
        cat $1 | tail -c "+$BYTE_PLUS_ONE"
    }
    
    cmake --build .
    
    echo "Errors:" > out.txt
    for i in {1..32951}; do
      echo "$i/32951"
      echo "$i/32951" >> out.txt
      with_byte_removed test_binary $i | ./dump_stream - 2>> out.txt
    done
    ```
---
 .../src/nanoarrow/nanoarrow_ipc_decoder.c          | 66 ++++++++++++++--------
 .../src/nanoarrow/nanoarrow_ipc_decoder_test.cc    | 66 ++++++++++++++++++----
 .../src/nanoarrow/nanoarrow_ipc_reader.c           | 17 +++---
 .../src/nanoarrow/nanoarrow_ipc_reader_test.cc     | 58 +++++++++++++++++++
 4 files changed, 164 insertions(+), 43 deletions(-)

diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index 1f37aaa..2fac3c7 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -46,6 +46,10 @@
 #include "nanoarrow_ipc.h"
 #include "nanoarrow_ipc_flatcc_generated.h"
 
+// A more readable expression way to refer to the fact that there are 8 bytes
+// at the beginning of every message header.
+const static int64_t kMessageHeaderPrefixSize = 8;
+
 // Internal representation of a parsed "Field" from flatbuffers. This
 // represents a field in a depth-first walk of column arrays and their
 // children.
@@ -929,16 +933,17 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct 
ArrowIpcDecoder* decode
   private_data->last_message = NULL;
 }
 
-// Returns NANOARROW_OK if data is large enough to read the message header,
-// ESPIPE if reading more data might help, or EINVAL if the content is not 
valid
-static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
-                                             struct ArrowBufferView* data_mut,
-                                             int32_t* message_size_bytes,
-                                             struct ArrowError* error) {
+// Returns NANOARROW_OK if data is large enough to read the first 8 bytes
+// of the message header, ESPIPE if reading more data might help, or EINVAL if 
the content
+// is not valid. Advances the input ArrowBufferView by 8 bytes.
+static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* 
decoder,
+                                                  struct ArrowBufferView* 
data_mut,
+                                                  int32_t* message_size_bytes,
+                                                  struct ArrowError* error) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
-  if (data_mut->size_bytes < 8) {
+  if (data_mut->size_bytes < kMessageHeaderPrefixSize) {
     ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes 
remain",
                   (long)data_mut->size_bytes);
     return ESPIPE;
@@ -953,18 +958,12 @@ static inline int ArrowIpcDecoderCheckHeader(struct 
ArrowIpcDecoder* decoder,
 
   int swap_endian = private_data->system_endianness == 
NANOARROW_IPC_ENDIANNESS_BIG;
   int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
-  *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
+  *message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize;
   if (header_body_size_bytes < 0) {
     ArrowErrorSet(
         error, "Expected message body size > 0 but found message body size of 
%ld bytes",
         (long)header_body_size_bytes);
     return EINVAL;
-  } else if (header_body_size_bytes > data_mut->size_bytes) {
-    ArrowErrorSet(error,
-                  "Expected 0 <= message body size <= %ld bytes but found 
message "
-                  "body size of %ld bytes",
-                  (long)data_mut->size_bytes, (long)header_body_size_bytes);
-    return ESPIPE;
   }
 
   if (header_body_size_bytes == 0) {
@@ -982,8 +981,8 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct 
ArrowIpcDecoder* decoder,
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(
-      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
+      decoder, &data, &decoder->header_size_bytes, error));
   return NANOARROW_OK;
 }
 
@@ -994,13 +993,23 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct 
ArrowIpcDecoder* decoder,
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(
-      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
+      decoder, &data, &decoder->header_size_bytes, error));
+
+  // Check that data contains at least the entire header (return ESPIPE to 
signal
+  // that reading more data may help).
+  int64_t message_body_size = decoder->header_size_bytes - 
kMessageHeaderPrefixSize;
+  if (data.size_bytes < message_body_size) {
+    ArrowErrorSet(error,
+                  "Expected >= %ld bytes of remaining data but found %ld bytes 
in buffer",
+                  (long)message_body_size + kMessageHeaderPrefixSize,
+                  (long)data.size_bytes + kMessageHeaderPrefixSize);
+    return ESPIPE;
+  }
 
   // Run flatbuffers verification
-  if (ns(Message_verify_as_root(data.data.as_uint8,
-                                decoder->header_size_bytes - (2 * 
sizeof(int32_t)))) !=
-      flatcc_verify_ok) {
+  if (ns(Message_verify_as_root(data.data.as_uint8, message_body_size) !=
+         flatcc_verify_ok)) {
     ArrowErrorSet(error, "Message flatbuffer verification failed");
     return EINVAL;
   }
@@ -1022,8 +1031,19 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct 
ArrowIpcDecoder* decoder,
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(
-      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
+      decoder, &data, &decoder->header_size_bytes, error));
+
+  // Check that data contains at least the entire header (return ESPIPE to 
signal
+  // that reading more data may help).
+  int64_t message_body_size = decoder->header_size_bytes - 
kMessageHeaderPrefixSize;
+  if (data.size_bytes < message_body_size) {
+    ArrowErrorSet(error,
+                  "Expected >= %ld bytes of remaining data but found %ld bytes 
in buffer",
+                  (long)message_body_size + kMessageHeaderPrefixSize,
+                  (long)data.size_bytes + kMessageHeaderPrefixSize);
+    return ESPIPE;
+  }
 
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   if (!message) {
diff --git 
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
index 3f8653d..de1e3cb 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
@@ -127,12 +127,19 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
   data.data.as_uint8 = kSimpleSchema;
   data.size_bytes = 1;
 
+  // For each error, check both Verify and Decode
+
   ArrowIpcDecoderInit(&decoder);
 
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
   EXPECT_STREQ(error.message,
                "Expected data of at least 8 bytes but only 1 bytes remain");
 
+  ArrowErrorInit(&error);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE);
+  EXPECT_STREQ(error.message,
+               "Expected data of at least 8 bytes but only 1 bytes remain");
+
   uint32_t eight_bad_bytes[] = {0, 0};
   data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
   data.size_bytes = 8;
@@ -140,23 +147,38 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
   EXPECT_STREQ(error.message,
                "Expected 0xFFFFFFFF at start of message but found 0x00000000");
 
+  ArrowErrorInit(&error);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+  EXPECT_STREQ(error.message,
+               "Expected 0xFFFFFFFF at start of message but found 0x00000000");
+
   eight_bad_bytes[0] = 0xFFFFFFFF;
   eight_bad_bytes[1] = negative_one_le;
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
                "Expected message body size > 0 but found message body size of 
-1 bytes");
 
+  ArrowErrorInit(&error);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+  EXPECT_STREQ(error.message,
+               "Expected message body size > 0 but found message body size of 
-1 bytes");
+
   eight_bad_bytes[1] = one_le;
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
-
   EXPECT_STREQ(error.message,
-               "Expected 0 <= message body size <= 0 bytes but found message 
body size "
-               "of 1 bytes");
+               "Expected >= 9 bytes of remaining data but found 8 bytes in 
buffer");
+  ArrowErrorInit(&error);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE);
+  EXPECT_STREQ(error.message,
+               "Expected >= 9 bytes of remaining data but found 8 bytes in 
buffer");
 
   eight_bad_bytes[0] = 0xFFFFFFFF;
   eight_bad_bytes[1] = 0;
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
   EXPECT_STREQ(error.message, "End of Arrow stream");
+  ArrowErrorInit(&error);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA);
+  EXPECT_STREQ(error.message, "End of Arrow stream");
 
   ArrowIpcDecoderReset(&decoder);
 }
@@ -191,15 +213,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
   EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
   EXPECT_EQ(decoder.body_size_bytes, 0);
 
-  uint8_t simple_schema_invalid[280];
-  memcpy(simple_schema_invalid, kSimpleSchema, sizeof(simple_schema_invalid));
-  memset(simple_schema_invalid + 8, 0xFF, sizeof(simple_schema_invalid) - 8);
-
-  data.data.as_uint8 = simple_schema_invalid;
-  data.size_bytes = sizeof(kSimpleSchema);
-  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
-  EXPECT_STREQ(error.message, "Message flatbuffer verification failed");
-
   ArrowIpcDecoderReset(&decoder);
 }
 
@@ -221,6 +234,35 @@ TEST(NanoarrowIpcTest, 
NanoarrowIpcVerifySimpleRecordBatch) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+TEST(NanoarrowIpcTest, NanoarrowIpcVerifyInvalid) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowError error;
+
+  uint8_t simple_schema_invalid[sizeof(kSimpleSchema)];
+  struct ArrowBufferView data;
+  data.data.as_uint8 = simple_schema_invalid;
+  data.size_bytes = sizeof(simple_schema_invalid);
+
+  ArrowIpcDecoderInit(&decoder);
+
+  // Create invalid data by removing bytes one at a time and ensuring an error 
code and
+  // a null-terminated error. After byte 265 this passes because the values 
being modified
+  // are parts of the flatbuffer that won't cause overrun.
+  for (int64_t i = 1; i < 265; i++) {
+    SCOPED_TRACE(i);
+
+    memcpy(simple_schema_invalid, kSimpleSchema, i);
+    memcpy(simple_schema_invalid + i, kSimpleSchema + (i + 1),
+           (sizeof(simple_schema_invalid) - i));
+
+    ArrowErrorInit(&error);
+    ASSERT_NE(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), 
NANOARROW_OK);
+    ASSERT_GT(strlen(error.message), 0);
+  }
+
+  ArrowIpcDecoderReset(&decoder);
+}
+
 TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   struct ArrowIpcDecoder decoder;
   struct ArrowError error;
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
index 2d18900..812cb8c 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
@@ -226,11 +226,8 @@ static int ArrowIpcArrayStreamReaderNextHeader(
   input_view.size_bytes = private_data->header.size_bytes;
 
   // Use PeekHeader to fill in decoder.header_size_bytes
-  int result =
-      ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, 
&private_data->error);
-  if (result == ENODATA) {
-    return result;
-  }
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, 
input_view,
+                                                    &private_data->error));
 
   // Read the header bytes
   int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
@@ -348,7 +345,7 @@ static int ArrowIpcArrayStreamReaderGetNext(struct 
ArrowArrayStream* stream,
                                             struct ArrowArray* out) {
   struct ArrowIpcArrayStreamReaderPrivate* private_data =
       (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
-  private_data->error.message[0] = '\0';
+  ArrowErrorInit(&private_data->error);
   
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
 
   // Read + decode the next header
@@ -359,6 +356,9 @@ static int ArrowIpcArrayStreamReaderGetNext(struct 
ArrowArrayStream* stream,
     // end of stream bytes were read.
     out->release = NULL;
     return NANOARROW_OK;
+  } else if (result != NANOARROW_OK) {
+    // Other error
+    return result;
   }
 
   // Make sure we have a RecordBatch message
@@ -376,10 +376,11 @@ static int ArrowIpcArrayStreamReaderGetNext(struct 
ArrowArrayStream* stream,
     struct ArrowIpcSharedBuffer shared;
     NANOARROW_RETURN_NOT_OK_WITH_ERROR(
         ArrowIpcSharedBufferInit(&shared, &private_data->body), 
&private_data->error);
-    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+    result = ArrowIpcDecoderDecodeArrayFromShared(
         &private_data->decoder, &shared, private_data->field_index, &tmp,
-        NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error));
+        NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error);
     ArrowIpcSharedBufferReset(&shared);
+    NANOARROW_RETURN_NOT_OK(result);
   } else {
     struct ArrowBufferView body_view;
     body_view.data.data = private_data->body.data;
diff --git 
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
index f3e4a5a..9ad918d 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -291,6 +291,64 @@ TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) {
   stream.release(&stream);
 }
 
+TEST(NanoarrowIpcTest, StreamReaderInvalidBuffer) {
+  struct ArrowBuffer input_buffer;
+  struct ArrowIpcInputStream input;
+  struct ArrowArrayStream stream;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  uint8_t simple_stream_invalid[sizeof(kSimpleSchema) + 
sizeof(kSimpleRecordBatch)];
+  struct ArrowBufferView data;
+  data.data.as_uint8 = simple_stream_invalid;
+
+  // Create invalid data by removing bytes one at a time and ensuring an error 
code and
+  // a null-terminated error. After byte 273/280 this passes because the bytes 
are just
+  // padding.
+  data.size_bytes = sizeof(kSimpleSchema);
+  for (int64_t i = 1; i < 273; i++) {
+    SCOPED_TRACE(i);
+
+    memcpy(simple_stream_invalid, kSimpleSchema, i);
+    memcpy(simple_stream_invalid + i, kSimpleSchema + (i + 1),
+           (sizeof(kSimpleSchema) - i));
+
+    ArrowBufferInit(&input_buffer);
+    ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK);
+    ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+    ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+    ASSERT_NE(stream.get_schema(&stream, &schema), NANOARROW_OK);
+    ASSERT_GT(strlen(stream.get_last_error(&stream)), 0);
+
+    stream.release(&stream);
+  }
+
+  // Do the same exercise removing bytes of the record batch message.
+  // Similarly, this succeeds if the byte removed is part of the padding at 
the end.
+  memcpy(simple_stream_invalid, kSimpleSchema, sizeof(kSimpleSchema));
+  data.size_bytes = sizeof(simple_stream_invalid);
+  for (int64_t i = 1; i < 144; i++) {
+    SCOPED_TRACE(i);
+
+    memcpy(simple_stream_invalid + sizeof(kSimpleSchema), kSimpleRecordBatch, 
i);
+    memcpy(simple_stream_invalid + sizeof(kSimpleSchema) + i,
+           kSimpleRecordBatch + (i + 1), (sizeof(kSimpleRecordBatch) - i));
+
+    ArrowBufferInit(&input_buffer);
+    ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK);
+    ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+    ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+    ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+    schema.release(&schema);
+    ASSERT_NE(stream.get_next(&stream, &array), NANOARROW_OK);
+    ASSERT_GT(strlen(stream.get_last_error(&stream)), 0);
+
+    stream.release(&stream);
+  }
+}
+
 TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) {
   struct ArrowBuffer input_buffer;
   ArrowBufferInit(&input_buffer);

Reply via email to