This is an automated email from the ASF dual-hosted git repository. paleolimbot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push: new a3f0929 fix(extensions/nanoarrow_ipc): Fix crash and mixleading error messages resulting from corrupted streams (#289) a3f0929 is described below commit a3f09295ef964b9e7279dc4ddceec7759d972bc9 Author: Dewey Dunnington <de...@dunnington.ca> AuthorDate: Fri Aug 25 21:43:42 2023 -0300 fix(extensions/nanoarrow_ipc): Fix crash and mixleading error messages resulting from corrupted streams (#289) Closes #287. The reported crash occurred because the internal `ArrowIpcDecoderVerifyHeader()` incorrectly interpreted the return value of `ArrowIpcDecoderCheckHeader()`. Because the header checker returned an error code sometimes even if it succeeded, we had been ignoring the error in `ArrowIpcDecoderVerifyHeader()` which resulted in issuing commands like `memcpy(dst, src, -8)`. I believe that's undefined behaviour, leading to the intermittent nature of the crash. In adding tests for this kind of error, I also made some improvements to error messages along the way. ```bash # docker run --rm -it ghcr.io/apache/arrow-nanoarrow:ubuntu # git clone https://github.com/apache/arrow-nanoarrow.git /arrow-nanoarrow # or # docker run --rm -it -v$(pwd):/arrow-nanoarrow ghcr.io/apache/arrow-nanoarrow:ubuntu cd /arrow-nanoarrow/extensions/nanoarrow_ipc mkdir build && cd build cmake .. -DNANOARROW_IPC_BUILD_APPS=ON curl https://gist.githubusercontent.com/amoeba/b64fc94ba5224bafcb3734bd261181d5/raw/af4c93da7ce6affba74a80e1ba94ed9573e91be8/test_arrow_data | \ base64 -d > test_binary with_byte_removed() { BYTE_PLUS_ONE=$(($2 + 2)) cat $1 | head -c $2 cat $1 | tail -c "+$BYTE_PLUS_ONE" } cmake --build . echo "Errors:" > out.txt for i in {1..32951}; do echo "$i/32951" echo "$i/32951" >> out.txt with_byte_removed test_binary $i | ./dump_stream - 2>> out.txt done ``` --- .../src/nanoarrow/nanoarrow_ipc_decoder.c | 66 ++++++++++++++-------- .../src/nanoarrow/nanoarrow_ipc_decoder_test.cc | 66 ++++++++++++++++++---- .../src/nanoarrow/nanoarrow_ipc_reader.c | 17 +++--- .../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 58 +++++++++++++++++++ 4 files changed, 164 insertions(+), 43 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c index 1f37aaa..2fac3c7 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c @@ -46,6 +46,10 @@ #include "nanoarrow_ipc.h" #include "nanoarrow_ipc_flatcc_generated.h" +// A more readable expression way to refer to the fact that there are 8 bytes +// at the beginning of every message header. +const static int64_t kMessageHeaderPrefixSize = 8; + // Internal representation of a parsed "Field" from flatbuffers. This // represents a field in a depth-first walk of column arrays and their // children. @@ -929,16 +933,17 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode private_data->last_message = NULL; } -// Returns NANOARROW_OK if data is large enough to read the message header, -// ESPIPE if reading more data might help, or EINVAL if the content is not valid -static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder, - struct ArrowBufferView* data_mut, - int32_t* message_size_bytes, - struct ArrowError* error) { +// Returns NANOARROW_OK if data is large enough to read the first 8 bytes +// of the message header, ESPIPE if reading more data might help, or EINVAL if the content +// is not valid. Advances the input ArrowBufferView by 8 bytes. +static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decoder, + struct ArrowBufferView* data_mut, + int32_t* message_size_bytes, + struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; - if (data_mut->size_bytes < 8) { + if (data_mut->size_bytes < kMessageHeaderPrefixSize) { ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes remain", (long)data_mut->size_bytes); return ESPIPE; @@ -953,18 +958,12 @@ static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder, int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG; int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian); - *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t)); + *message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize; if (header_body_size_bytes < 0) { ArrowErrorSet( error, "Expected message body size > 0 but found message body size of %ld bytes", (long)header_body_size_bytes); return EINVAL; - } else if (header_body_size_bytes > data_mut->size_bytes) { - ArrowErrorSet(error, - "Expected 0 <= message body size <= %ld bytes but found message " - "body size of %ld bytes", - (long)data_mut->size_bytes, (long)header_body_size_bytes); - return ESPIPE; } if (header_body_size_bytes == 0) { @@ -982,8 +981,8 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder, (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); - NANOARROW_RETURN_NOT_OK( - ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix( + decoder, &data, &decoder->header_size_bytes, error)); return NANOARROW_OK; } @@ -994,13 +993,23 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder, (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); - NANOARROW_RETURN_NOT_OK( - ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix( + decoder, &data, &decoder->header_size_bytes, error)); + + // Check that data contains at least the entire header (return ESPIPE to signal + // that reading more data may help). + int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize; + if (data.size_bytes < message_body_size) { + ArrowErrorSet(error, + "Expected >= %ld bytes of remaining data but found %ld bytes in buffer", + (long)message_body_size + kMessageHeaderPrefixSize, + (long)data.size_bytes + kMessageHeaderPrefixSize); + return ESPIPE; + } // Run flatbuffers verification - if (ns(Message_verify_as_root(data.data.as_uint8, - decoder->header_size_bytes - (2 * sizeof(int32_t)))) != - flatcc_verify_ok) { + if (ns(Message_verify_as_root(data.data.as_uint8, message_body_size) != + flatcc_verify_ok)) { ArrowErrorSet(error, "Message flatbuffer verification failed"); return EINVAL; } @@ -1022,8 +1031,19 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); - NANOARROW_RETURN_NOT_OK( - ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix( + decoder, &data, &decoder->header_size_bytes, error)); + + // Check that data contains at least the entire header (return ESPIPE to signal + // that reading more data may help). + int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize; + if (data.size_bytes < message_body_size) { + ArrowErrorSet(error, + "Expected >= %ld bytes of remaining data but found %ld bytes in buffer", + (long)message_body_size + kMessageHeaderPrefixSize, + (long)data.size_bytes + kMessageHeaderPrefixSize); + return ESPIPE; + } ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8)); if (!message) { diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc index 3f8653d..de1e3cb 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc @@ -127,12 +127,19 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) { data.data.as_uint8 = kSimpleSchema; data.size_bytes = 1; + // For each error, check both Verify and Decode + ArrowIpcDecoderInit(&decoder); EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE); EXPECT_STREQ(error.message, "Expected data of at least 8 bytes but only 1 bytes remain"); + ArrowErrorInit(&error); + EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE); + EXPECT_STREQ(error.message, + "Expected data of at least 8 bytes but only 1 bytes remain"); + uint32_t eight_bad_bytes[] = {0, 0}; data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes); data.size_bytes = 8; @@ -140,23 +147,38 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) { EXPECT_STREQ(error.message, "Expected 0xFFFFFFFF at start of message but found 0x00000000"); + ArrowErrorInit(&error); + EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL); + EXPECT_STREQ(error.message, + "Expected 0xFFFFFFFF at start of message but found 0x00000000"); + eight_bad_bytes[0] = 0xFFFFFFFF; eight_bad_bytes[1] = negative_one_le; EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL); EXPECT_STREQ(error.message, "Expected message body size > 0 but found message body size of -1 bytes"); + ArrowErrorInit(&error); + EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL); + EXPECT_STREQ(error.message, + "Expected message body size > 0 but found message body size of -1 bytes"); + eight_bad_bytes[1] = one_le; EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE); - EXPECT_STREQ(error.message, - "Expected 0 <= message body size <= 0 bytes but found message body size " - "of 1 bytes"); + "Expected >= 9 bytes of remaining data but found 8 bytes in buffer"); + ArrowErrorInit(&error); + EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE); + EXPECT_STREQ(error.message, + "Expected >= 9 bytes of remaining data but found 8 bytes in buffer"); eight_bad_bytes[0] = 0xFFFFFFFF; eight_bad_bytes[1] = 0; EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA); EXPECT_STREQ(error.message, "End of Arrow stream"); + ArrowErrorInit(&error); + EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA); + EXPECT_STREQ(error.message, "End of Arrow stream"); ArrowIpcDecoderReset(&decoder); } @@ -191,15 +213,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) { EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema)); EXPECT_EQ(decoder.body_size_bytes, 0); - uint8_t simple_schema_invalid[280]; - memcpy(simple_schema_invalid, kSimpleSchema, sizeof(simple_schema_invalid)); - memset(simple_schema_invalid + 8, 0xFF, sizeof(simple_schema_invalid) - 8); - - data.data.as_uint8 = simple_schema_invalid; - data.size_bytes = sizeof(kSimpleSchema); - EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL); - EXPECT_STREQ(error.message, "Message flatbuffer verification failed"); - ArrowIpcDecoderReset(&decoder); } @@ -221,6 +234,35 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleRecordBatch) { ArrowIpcDecoderReset(&decoder); } +TEST(NanoarrowIpcTest, NanoarrowIpcVerifyInvalid) { + struct ArrowIpcDecoder decoder; + struct ArrowError error; + + uint8_t simple_schema_invalid[sizeof(kSimpleSchema)]; + struct ArrowBufferView data; + data.data.as_uint8 = simple_schema_invalid; + data.size_bytes = sizeof(simple_schema_invalid); + + ArrowIpcDecoderInit(&decoder); + + // Create invalid data by removing bytes one at a time and ensuring an error code and + // a null-terminated error. After byte 265 this passes because the values being modified + // are parts of the flatbuffer that won't cause overrun. + for (int64_t i = 1; i < 265; i++) { + SCOPED_TRACE(i); + + memcpy(simple_schema_invalid, kSimpleSchema, i); + memcpy(simple_schema_invalid + i, kSimpleSchema + (i + 1), + (sizeof(simple_schema_invalid) - i)); + + ArrowErrorInit(&error); + ASSERT_NE(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK); + ASSERT_GT(strlen(error.message), 0); + } + + ArrowIpcDecoderReset(&decoder); +} + TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) { struct ArrowIpcDecoder decoder; struct ArrowError error; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 2d18900..812cb8c 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -226,11 +226,8 @@ static int ArrowIpcArrayStreamReaderNextHeader( input_view.size_bytes = private_data->header.size_bytes; // Use PeekHeader to fill in decoder.header_size_bytes - int result = - ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, &private_data->error); - if (result == ENODATA) { - return result; - } + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, + &private_data->error)); // Read the header bytes int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8; @@ -348,7 +345,7 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) { struct ArrowIpcArrayStreamReaderPrivate* private_data = (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; - private_data->error.message[0] = '\0'; + ArrowErrorInit(&private_data->error); NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); // Read + decode the next header @@ -359,6 +356,9 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, // end of stream bytes were read. out->release = NULL; return NANOARROW_OK; + } else if (result != NANOARROW_OK) { + // Other error + return result; } // Make sure we have a RecordBatch message @@ -376,10 +376,11 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, struct ArrowIpcSharedBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); - NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared( + result = ArrowIpcDecoderDecodeArrayFromShared( &private_data->decoder, &shared, private_data->field_index, &tmp, - NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error)); + NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error); ArrowIpcSharedBufferReset(&shared); + NANOARROW_RETURN_NOT_OK(result); } else { struct ArrowBufferView body_view; body_view.data.data = private_data->body.data; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index f3e4a5a..9ad918d 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -291,6 +291,64 @@ TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) { stream.release(&stream); } +TEST(NanoarrowIpcTest, StreamReaderInvalidBuffer) { + struct ArrowBuffer input_buffer; + struct ArrowIpcInputStream input; + struct ArrowArrayStream stream; + struct ArrowSchema schema; + struct ArrowArray array; + + uint8_t simple_stream_invalid[sizeof(kSimpleSchema) + sizeof(kSimpleRecordBatch)]; + struct ArrowBufferView data; + data.data.as_uint8 = simple_stream_invalid; + + // Create invalid data by removing bytes one at a time and ensuring an error code and + // a null-terminated error. After byte 273/280 this passes because the bytes are just + // padding. + data.size_bytes = sizeof(kSimpleSchema); + for (int64_t i = 1; i < 273; i++) { + SCOPED_TRACE(i); + + memcpy(simple_stream_invalid, kSimpleSchema, i); + memcpy(simple_stream_invalid + i, kSimpleSchema + (i + 1), + (sizeof(kSimpleSchema) - i)); + + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK); + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + ASSERT_NE(stream.get_schema(&stream, &schema), NANOARROW_OK); + ASSERT_GT(strlen(stream.get_last_error(&stream)), 0); + + stream.release(&stream); + } + + // Do the same exercise removing bytes of the record batch message. + // Similarly, this succeeds if the byte removed is part of the padding at the end. + memcpy(simple_stream_invalid, kSimpleSchema, sizeof(kSimpleSchema)); + data.size_bytes = sizeof(simple_stream_invalid); + for (int64_t i = 1; i < 144; i++) { + SCOPED_TRACE(i); + + memcpy(simple_stream_invalid + sizeof(kSimpleSchema), kSimpleRecordBatch, i); + memcpy(simple_stream_invalid + sizeof(kSimpleSchema) + i, + kSimpleRecordBatch + (i + 1), (sizeof(kSimpleRecordBatch) - i)); + + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK); + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); + schema.release(&schema); + ASSERT_NE(stream.get_next(&stream, &array), NANOARROW_OK); + ASSERT_GT(strlen(stream.get_last_error(&stream)), 0); + + stream.release(&stream); + } +} + TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) { struct ArrowBuffer input_buffer; ArrowBufferInit(&input_buffer);