This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5bb90b15 feat: Actually decode dictionary arrays (#861)
5bb90b15 is described below
commit 5bb90b15f04281ba71fb78f11957bd5d3c34f907
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Apr 17 21:51:09 2026 -0500
feat: Actually decode dictionary arrays (#861)
This PR uses the previous steps to output arrays (including the
ArrowArrayStream reader). This lets us wire it in to all the tests as
well.
The main follow up is that this PR currently deep copies the dictionary
for every batch that arrives, negating much of the point of dictionary
encoding. This is a fairly self-contained change that I'll do
separately: https://github.com/apache/arrow-nanoarrow/pull/864
I also added dictionary index validation while I was here! It is fairly
compact (compared to the other code).
Closes #845.
---
src/nanoarrow/common/array.c | 20 ++-
src/nanoarrow/common/array_test.cc | 54 +++++++
src/nanoarrow/ipc/decoder.c | 297 +++++++++++++++++++++++--------------
src/nanoarrow/ipc/decoder_test.cc | 241 ++++++++++++++++++++++++------
src/nanoarrow/ipc/files_test.cc | 61 +++++---
src/nanoarrow/ipc/reader.c | 153 +++++++++++++------
src/nanoarrow/ipc/reader_test.cc | 163 +++++++++++++++++++-
src/nanoarrow/nanoarrow_ipc.h | 76 +++++++++-
8 files changed, 844 insertions(+), 221 deletions(-)
diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c
index 2f0f824c..ec7bdb66 100644
--- a/src/nanoarrow/common/array.c
+++ b/src/nanoarrow/common/array.c
@@ -1513,10 +1513,26 @@ static int ArrowArrayViewValidateFull(struct
ArrowArrayView* array_view,
NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidateFull(array_view->children[i],
error));
}
- // Dictionary validation not implemented
+ // Dictionary index validation
if (array_view->dictionary != NULL) {
NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidateFull(array_view->dictionary,
error));
- // TODO: validate the indices
+
+ // Validate that all non-null indices are within the dictionary bounds
+ int64_t dictionary_length = array_view->dictionary->length;
+ for (int64_t i = 0; i < array_view->length; i++) {
+ if (ArrowArrayViewIsNull(array_view, i)) {
+ continue;
+ }
+
+ int64_t index = ArrowArrayViewGetIntUnsafe(array_view, i);
+ if (index < 0 || index >= dictionary_length) {
+ ArrowErrorSet(error,
+ "[%" PRId64 "] Expected dictionary index >= 0 and < %"
PRId64
+ " but found value %" PRId64,
+ i, dictionary_length, index);
+ return EINVAL;
+ }
+ }
}
return NANOARROW_OK;
diff --git a/src/nanoarrow/common/array_test.cc
b/src/nanoarrow/common/array_test.cc
index 35a93787..b50e7266 100644
--- a/src/nanoarrow/common/array_test.cc
+++ b/src/nanoarrow/common/array_test.cc
@@ -144,6 +144,60 @@ TEST(ArrayTest, ArrayTestAllocateDictionary) {
ArrowArrayRelease(&array);
}
+TEST(ArrayTest, ArrayTestValidateDictionaryIndices) {
+ struct ArrowArray array;
+ struct ArrowSchema schema;
+ struct ArrowArrayView array_view;
+ struct ArrowError error;
+
+ // Create a schema for dictionary-encoded int32 with string dictionary
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT32),
NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaAllocateDictionary(&schema), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaInitFromType(schema.dictionary, NANOARROW_TYPE_STRING),
+ NANOARROW_OK);
+
+ // Initialize array_view from schema
+ ASSERT_EQ(ArrowArrayViewInitFromSchema(&array_view, &schema, &error),
NANOARROW_OK);
+
+ // Create a dictionary-encoded int32 array with a string dictionary
+ ASSERT_EQ(ArrowArrayInitFromSchema(&array, &schema, &error), NANOARROW_OK);
+
+ // Build the array with dictionary values: ["zero", "one"] and indices [0,
1, 0]
+ ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendString(array.dictionary, "zero"_asv),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendString(array.dictionary, "one"_asv), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(&array, 0), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(&array, 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(&array, 0), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array, &error), NANOARROW_OK);
+
+ // Valid indices should pass validation
+ ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK);
+ EXPECT_EQ(ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error),
+ NANOARROW_OK);
+
+ // Now modify index to be out of bounds (index 2 when dictionary has length
2)
+ int32_t* indices = reinterpret_cast<int32_t*>(ArrowArrayBuffer(&array,
1)->data);
+ indices[1] = 2; // Out of bounds (valid range is 0-1)
+ ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK);
+ EXPECT_EQ(ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error),
+ EINVAL);
+ EXPECT_STREQ(error.message,
+ "[1] Expected dictionary index >= 0 and < 2 but found value 2");
+
+ // Test negative index
+ indices[1] = -1;
+ ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK);
+ EXPECT_EQ(ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error),
+ EINVAL);
+ EXPECT_STREQ(error.message,
+ "[1] Expected dictionary index >= 0 and < 2 but found value
-1");
+
+ ArrowArrayViewReset(&array_view);
+ ArrowSchemaRelease(&schema);
+ ArrowArrayRelease(&array);
+}
+
TEST(ArrayTest, ArrayTestInitFromSchema) {
struct ArrowArray array;
struct ArrowSchema schema;
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 520219ed..4b0239c1 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -295,6 +295,44 @@ ArrowErrorCode ArrowIpcDictionaryEncodingsAppend(
return NANOARROW_OK;
}
+static ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchemaInternal(
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ const struct ArrowSchema* schema, int64_t* next_id) {
+ if (schema->dictionary != NULL) {
+ struct ArrowIpcDictionaryEncoding encoding;
+ encoding.schema = schema;
+ encoding.id = (*next_id)++;
+ encoding.kind = NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDictionaryEncodingsAppend(dictionary_encodings, encoding));
+ }
+
+ for (int64_t i = 0; i < schema->n_children; i++) {
+ NANOARROW_DCHECK(schema->children != NULL && schema->children[i] != NULL);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal(
+ dictionary_encodings, schema->children[i], next_id));
+ }
+
+ if (schema->dictionary != NULL) {
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal(
+ dictionary_encodings, schema->dictionary, next_id));
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchema(
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ const struct ArrowSchema* schema) {
+ NANOARROW_DCHECK(dictionary_encodings != NULL);
+
+ int64_t next_id = 0;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal(
+ dictionary_encodings, schema, &next_id));
+
+ return NANOARROW_OK;
+}
+
const struct ArrowIpcDictionaryEncoding* ArrowIpcDictionaryEncodingsFind(
const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
const struct ArrowSchema* schema) {
@@ -414,12 +452,13 @@ static ArrowErrorCode ArrowIpcDictionaryReplace(struct
ArrowIpcDictionary* dicti
static ArrowErrorCode ArrowIpcDictionaryAppend(struct ArrowIpcDictionary*
dictionary,
struct ArrowArray* value,
struct ArrowError* error) {
- if (dictionary->current_value.release != NULL) {
+ if (dictionary->current_value.release != NULL &&
+ dictionary->current_value.length != 0) {
ArrowErrorSet(error, "Dictionary concatenation is not yet supported");
return ENOTSUP;
}
- ArrowArrayMove(value, &dictionary->current_value);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryReplace(dictionary, value, error));
return NANOARROW_OK;
}
@@ -492,7 +531,24 @@ static ArrowErrorCode ArrowIpcDictionariesInitDictionaries(
return result;
}
+ // Set the ID
dictionary->id = unique_ids[i];
+
+ // Set the initial array value to a valid array with zero length. This is
+ // needed because empty and/or all null columns may not have a dictionary
+ // message emitted before a record batch arrives.
+ result = ArrowArrayInitFromSchema(&dictionary->current_value,
+ encoding->schema->dictionary, error);
+ if (result != NANOARROW_OK) {
+ *num_initialized_decoders_out = i + 1;
+ return result;
+ }
+
+ result = ArrowArrayFinishBuildingDefault(&dictionary->current_value,
error);
+ if (result != NANOARROW_OK) {
+ *num_initialized_decoders_out = i + 1;
+ return result;
+ }
}
*num_initialized_decoders_out = private_data->num_dictionaries;
@@ -1203,78 +1259,15 @@ static int ArrowIpcDecoderSetType(struct ArrowSchema*
schema, ns(Field_table_t)
}
}
-// A fun corner case when decoding dictionaries: the extension metadata lives
with
-// the dictionary (i.e., the non-index type); however, the field metadata still
-// needs to exist on the field.
-static int ArrowIpcMoveNonExtensionFieldMetadataBackToFieldIfNeeded(
- struct ArrowSchema* schema) {
+// When decoding dictionaries, we move the value type to the schema->dictionary
+// member, but we need to move the field metadata back because in IPC there
+// is no such thing as dictionary metadata (even extension metadata)
+// https://github.com/apache/arrow/issues/49704
+static int ArrowIpcMoveDictionaryMetadataBackToField(struct ArrowSchema*
schema) {
NANOARROW_DCHECK(schema->dictionary != NULL);
- struct ArrowMetadataReader reader;
- NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&reader,
schema->dictionary->metadata));
-
- // For the most common case (no metadata), nothing needs to be done here
- if (reader.remaining_keys == 0) {
- return NANOARROW_OK;
- }
-
- struct ArrowBuffer field_metadata;
- struct ArrowBuffer extension_metadata;
- NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&field_metadata, NULL));
- ArrowErrorCode result = ArrowMetadataBuilderInit(&extension_metadata, NULL);
- if (result != NANOARROW_OK) {
- ArrowBufferReset(&field_metadata);
- return result;
- }
-
- const struct ArrowStringView extension_name_key =
ArrowCharView("ARROW:extension:name");
- const struct ArrowStringView extension_metadata_key =
- ArrowCharView("ARROW:extension:metadata");
-
- struct ArrowStringView key;
- struct ArrowStringView value;
- while (reader.remaining_keys > 0) {
- result = ArrowMetadataReaderRead(&reader, &key, &value);
- if (result != NANOARROW_OK) {
- ArrowBufferReset(&field_metadata);
- ArrowBufferReset(&extension_metadata);
- return result;
- }
-
- int key_is_extension_name =
- key.size_bytes == extension_name_key.size_bytes &&
- strncmp(key.data, extension_name_key.data, key.size_bytes) == 0;
- int key_is_extension_metadata =
- key.size_bytes == extension_metadata_key.size_bytes &&
- strncmp(key.data, extension_metadata_key.data, key.size_bytes) == 0;
- if (!key_is_extension_name && !key_is_extension_metadata) {
- result = ArrowMetadataBuilderAppend(&field_metadata, key, value);
- if (result != NANOARROW_OK) {
- ArrowBufferReset(&field_metadata);
- ArrowBufferReset(&extension_metadata);
- return result;
- }
- } else {
- result = ArrowMetadataBuilderAppend(&extension_metadata, key, value);
- if (result != NANOARROW_OK) {
- ArrowBufferReset(&field_metadata);
- ArrowBufferReset(&extension_metadata);
- return result;
- }
- }
- }
-
- result = ArrowSchemaSetMetadata(schema, (char*)field_metadata.data);
- if (result != NANOARROW_OK) {
- ArrowBufferReset(&field_metadata);
- ArrowBufferReset(&extension_metadata);
- return result;
- }
-
- result = ArrowSchemaSetMetadata(schema->dictionary,
(char*)extension_metadata.data);
- ArrowBufferReset(&field_metadata);
- ArrowBufferReset(&extension_metadata);
-
- return result;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(schema,
schema->dictionary->metadata));
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(schema->dictionary, NULL));
+ return NANOARROW_OK;
}
static int ArrowIpcSetDictionaryEncoding(
@@ -1313,10 +1306,9 @@ static int ArrowIpcSetDictionaryEncoding(
schema->flags |= ARROW_FLAG_DICTIONARY_ORDERED;
}
- // Field metadata should stay with the field; however, we need the extension
metadata
- // to stay with the dictionary.
- NANOARROW_RETURN_NOT_OK_WITH_ERROR(
- ArrowIpcMoveNonExtensionFieldMetadataBackToFieldIfNeeded(schema), error);
+ // Sort out field metadata between the schema and the dictionary member
+
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowIpcMoveDictionaryMetadataBackToField(schema),
+ error);
// Track the identifier if we have a dictionaries object in which to track it
if (dictionaries != NULL) {
@@ -2354,6 +2346,7 @@ struct ArrowIpcArraySetter {
struct ArrowIpcBufferSource src;
struct ArrowIpcBufferFactory factory;
enum ArrowIpcMetadataVersion version;
+ struct ArrowIpcDictionaries* dictionaries;
};
static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
@@ -2420,6 +2413,12 @@ static int ArrowIpcDecoderWalkGetArray(struct
ArrowArrayView* array_view,
array_view->children[i], array->children[i], out->children[i], error));
}
+ if (array_view->dictionary != NULL) {
+ // TODO: this currently copies the array for every output.
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkGetArray(
+ array_view->dictionary, array->dictionary, out->dictionary, error));
+ }
+
return NANOARROW_OK;
}
@@ -2430,10 +2429,32 @@ static int ArrowIpcDecoderWalkSetArrayView(struct
ArrowIpcDecoder* decoder,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
- struct ArrowIpcField* ipc_field = private_data->fields + setter->field_i;
+
+ // setter->field_i indexes the flatbuffer FieldNode vector (which excludes
the root
+ // struct), but private_data->fields includes the root at index 0, so add 1.
+ struct ArrowIpcField* ipc_field = private_data->fields + setter->field_i + 1;
if (ipc_field->dictionary_id != NANOARROW_IPC_NO_DICTIONARY_ID) {
- ArrowErrorSet(error, "Decoding a dictionary-encoding field is not
supported");
- return ENOTSUP;
+ if (setter->dictionaries == NULL) {
+ ArrowErrorSet(
+ error, "Can't decode a dictionary-encoded field without
ArrowIpcDictionaries");
+ return ENOTSUP;
+ }
+
+ const struct ArrowArray* dictionary;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDictionariesFindCurrentValue(
+ setter->dictionaries, ipc_field->dictionary_id, &dictionary, error));
+
+ if (dictionary->release == NULL) {
+ ArrowErrorSet(error, "Dictionary with ID %" PRId64 " is marked as
released",
+ ipc_field->dictionary_id);
+ return EINVAL;
+ }
+
+ // Set the dictionary array view from the value. We may be able to skip
this
+ // if we can somehow detect that the dictionary hasn't changed since the
last
+ // decode.
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewSetArray(array_view->dictionary, dictionary, error));
}
ns(FieldNode_struct_t) field =
@@ -2538,7 +2559,8 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal(
static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal(
struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
- int64_t field_i, struct ArrowArrayView** out_view, struct ArrowError*
error) {
+ int64_t field_i, struct ArrowIpcDictionaries* dictionaries,
+ struct ArrowArrayView** out_view, struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -2563,6 +2585,7 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
setter.src.codec = decoder->codec;
setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
setter.version = decoder->metadata_version;
+ setter.dictionaries = dictionaries;
// If we are going to need a decompressor here, ensure the default one is
// initialized.
@@ -2600,10 +2623,10 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder,
- struct ArrowBufferView body,
int64_t i,
- struct ArrowArrayView** out,
- struct ArrowError* error) {
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayViewWithDictionaries(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArrayView** out,
+ struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
if (private_data->last_message == NULL ||
@@ -2613,14 +2636,21 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct
ArrowIpcDecoder* decoder,
}
return ArrowIpcDecoderDecodeArrayViewInternal(
- decoder, ArrowIpcBufferFactoryFromView(&body), i, out, error);
+ decoder, ArrowIpcBufferFactoryFromView(&body), i, dictionaries, out,
error);
}
-ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
- struct ArrowBufferView body, int64_t
i,
- struct ArrowArray* out,
- enum ArrowValidationLevel
validation_level,
- struct ArrowError* error) {
+ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body,
int64_t i,
+ struct ArrowArrayView** out,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayViewWithDictionaries(decoder, body, i,
NULL, out,
+ error);
+}
+
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayWithDictionaries(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out,
+ enum ArrowValidationLevel validation_level, struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
if (private_data->last_message == NULL ||
@@ -2631,7 +2661,8 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
struct ArrowArrayView* array_view;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal(
- decoder, ArrowIpcBufferFactoryFromView(&body), i, &array_view, error));
+ decoder, ArrowIpcBufferFactoryFromView(&body), i, dictionaries,
&array_view,
+ error));
NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level,
error));
@@ -2649,13 +2680,23 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
i,
+ struct ArrowArray* out,
+ enum ArrowValidationLevel
validation_level,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayWithDictionaries(decoder, body, i, NULL,
out,
+ validation_level, error);
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(
struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body,
int64_t i,
- struct ArrowArray* out, enum ArrowValidationLevel validation_level,
- struct ArrowError* error) {
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out,
+ enum ArrowValidationLevel validation_level, struct ArrowError* error) {
struct ArrowArrayView* array_view;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal(
- decoder, ArrowIpcBufferFactoryFromShared(body), i, &array_view, error));
+ decoder, ArrowIpcBufferFactoryFromShared(body), i, dictionaries,
&array_view,
+ error));
NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level,
error));
@@ -2673,14 +2714,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
return NANOARROW_OK;
}
-NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary(
- struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
- enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
dictionaries,
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body,
int64_t i,
+ struct ArrowArray* out, enum ArrowValidationLevel validation_level,
struct ArrowError* error) {
- NANOARROW_DCHECK(decoder != NULL);
- NANOARROW_DCHECK(shared != NULL);
- NANOARROW_DCHECK(dictionaries != NULL);
+ return ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(decoder, body,
i, NULL, out,
+
validation_level, error);
+}
+static ArrowErrorCode ArrowIpcDecoderDecodeDictionaryInternal(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
dictionaries,
+ struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -2709,15 +2754,26 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderDecodeDictionary(
(struct ArrowIpcDecoderPrivate*)dictionary->decoder.private_data;
dictionary->decoder.message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH;
dictionary_decoder_private_data->last_message = record_batch;
+ // Transfer the endianness setting so that buffers are byte-swapped if needed
+ dictionary_decoder_private_data->endianness = private_data->endianness;
- struct ArrowArray tmp;
+ struct ArrowArrayView* array_view;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal(
+ &dictionary->decoder, factory, 0, dictionaries, &array_view, error));
- // TODO: provide ArrowIpcDecoderDecodeArrayInternalWithDictionaries to
handle nested
- // dictionaries
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
- &dictionary->decoder, shared, 0, &tmp, validation_level, error));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level,
error));
+
+ struct ArrowArray tmp;
+ tmp.release = NULL;
+ int result = ArrowIpcDecoderDecodeArrayInternal(&dictionary->decoder, 0,
&tmp,
+ validation_level, error);
+ if (result != NANOARROW_OK && tmp.release != NULL) {
+ ArrowArrayRelease(&tmp);
+ return result;
+ } else if (result != NANOARROW_OK) {
+ return result;
+ }
- ArrowErrorCode result;
if (decoder->dictionary->is_delta) {
result = ArrowIpcDictionaryAppend(dictionary, &tmp, error);
} else {
@@ -2731,3 +2787,28 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderDecodeDictionary(
return NANOARROW_OK;
}
+
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
dictionaries,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(decoder != NULL);
+ NANOARROW_DCHECK(dictionaries != NULL);
+
+ return ArrowIpcDecoderDecodeDictionaryInternal(decoder,
+
ArrowIpcBufferFactoryFromView(&body),
+ validation_level,
dictionaries, error);
+}
+
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
dictionaries,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(decoder != NULL);
+ NANOARROW_DCHECK(shared != NULL);
+ NANOARROW_DCHECK(dictionaries != NULL);
+
+ return ArrowIpcDecoderDecodeDictionaryInternal(decoder,
+
ArrowIpcBufferFactoryFromShared(shared),
+ validation_level,
dictionaries, error);
+}
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index b6aeddb7..2143b610 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -208,6 +208,19 @@ alignas(8) static uint8_t kDictionaryBatch[] = {
0x6f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
};
+alignas(8) static uint8_t kDictionaryRecordBatch[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00,
0x08, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00,
0x0c, 0x00,
+ 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00};
+
TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
@@ -698,7 +711,7 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionaryBatchDecode) {
// Make a dictionary encoded schema that matches that of the dictionary
example batch
ArrowSchemaInit(&schema);
ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT8),
NANOARROW_OK);
ASSERT_EQ(ArrowSchemaAllocateDictionary(schema.children[0]), NANOARROW_OK);
ASSERT_EQ(
ArrowSchemaInitFromType(schema.children[0]->dictionary,
NANOARROW_TYPE_STRING),
@@ -721,11 +734,12 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionaryBatchDecode) {
// Check that we can't decode a dictionary batch if we haven't read a
dictionary batch
// message
ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);
- struct ArrowIpcSharedBuffer shared;
- ASSERT_EQ(
- ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
- &dictionaries, &error),
- EINVAL);
+ struct ArrowBufferView body;
+ body.data.data = nullptr;
+ body.size_bytes = 0;
+ ASSERT_EQ(ArrowIpcDecoderDecodeDictionary(
+ &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL,
&dictionaries, &error),
+ EINVAL);
ASSERT_STREQ(error.message, "decoder did not just decode a DictionaryBatch
message");
// Decode a dictionary batch and inspect metadata
@@ -741,17 +755,12 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionaryBatchDecode) {
EXPECT_FALSE(decoder.dictionary->is_delta);
// Decode the dictionary batch
- data.data.as_uint8 += decoder.header_size_bytes;
- data.size_bytes = decoder.body_size_bytes;
- struct ArrowBuffer body;
- ArrowBufferInit(&body);
- ASSERT_EQ(ArrowBufferAppendBufferView(&body, data), NANOARROW_OK);
+ body.data.as_uint8 = data.data.as_uint8 + decoder.header_size_bytes;
+ body.size_bytes = decoder.body_size_bytes;
- ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK);
- ASSERT_EQ(
- ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
- &dictionaries, &error),
- NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderDecodeDictionary(
+ &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL,
&dictionaries, &error),
+ NANOARROW_OK);
// If we find the current value of the dictionary we should get the correct
array
const struct ArrowArray* dictionary_value;
@@ -775,10 +784,9 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionaryBatchDecode) {
// If we try to decode the dictionary again it should succeed (because the
dictionary
// is in replacement mode)
- ASSERT_EQ(
- ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
- &dictionaries, &error),
- NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderDecodeDictionary(
+ &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL,
&dictionaries, &error),
+ NANOARROW_OK);
ASSERT_EQ(ArrowArrayViewSetArray(&array_view, dictionary_value, &error),
NANOARROW_OK);
ASSERT_EQ(array_view.length, 3);
@@ -788,14 +796,75 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionaryBatchDecode) {
// If we try to decode a delta dictionary, we should fail with a reasonable
message
const_cast<struct ArrowIpcDictionaryBatch*>(decoder.dictionary)->is_delta =
1;
- ASSERT_EQ(
- ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
- &dictionaries, &error),
- ENOTSUP);
+ ASSERT_EQ(ArrowIpcDecoderDecodeDictionary(
+ &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL,
&dictionaries, &error),
+ ENOTSUP);
ASSERT_STREQ(error.message, "Dictionary concatenation is not yet supported");
+ // After all of this, we should be able to actually decode a RecordBatch
+ ASSERT_EQ(ArrowIpcDecoderSetSchemaWithDictionaries(&decoder, &schema,
+ &dictionary_encodings,
&error),
+ NANOARROW_OK);
+ data.data.data = kDictionaryRecordBatch;
+ data.size_bytes = sizeof(kDictionaryRecordBatch);
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+ data.data.as_uint8 += decoder.header_size_bytes;
+ data.size_bytes -= decoder.header_size_bytes;
+
+ // Decode the entire batch and check the dictionary
+ struct ArrowArrayView* batch_view;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayViewWithDictionaries(
+ &decoder, data, -1, &dictionaries, &batch_view, &error),
+ NANOARROW_OK)
+ << error.message;
+
+ ASSERT_NE(batch_view->children[0]->dictionary, nullptr);
+ ASSERT_EQ(batch_view->children[0]->dictionary->length, 3);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(batch_view->children[0]->dictionary,
0),
+ "zero"_asv);
+
+ // Decode the specific column and check the dictionary
+ struct ArrowArrayView* column_view;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayViewWithDictionaries(
+ &decoder, data, 0, &dictionaries, &column_view, &error),
+ NANOARROW_OK)
+ << error.message;
+
+ ASSERT_NE(column_view->dictionary, nullptr);
+ ASSERT_EQ(column_view->dictionary->length, 3);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(column_view->dictionary, 0),
"zero"_asv);
+
+ // Decode the array from the ArrowBufferView
+ struct ArrowArray batch;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries(
+ &decoder, data, -1, &dictionaries, &batch,
+ NANOARROW_VALIDATION_LEVEL_FULL, &error),
+ NANOARROW_OK)
+ << error.message;
+ ASSERT_NE(batch.children[0]->dictionary, nullptr);
+ ASSERT_EQ(batch.children[0]->dictionary->length, 3);
+ ArrowArrayRelease(&batch);
+
+ // Decode the array from a shared buffer
+ struct ArrowBuffer record_batch_body;
+ ArrowBufferInit(&record_batch_body);
+ ASSERT_EQ(ArrowBufferAppendBufferView(&record_batch_body, data),
NANOARROW_OK);
+
+ struct ArrowIpcSharedBuffer record_batch_shared;
+ ASSERT_EQ(ArrowIpcSharedBufferInit(&record_batch_shared, &record_batch_body),
+ NANOARROW_OK);
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(
+ &decoder, &record_batch_shared, -1, &dictionaries, &batch,
+ NANOARROW_VALIDATION_LEVEL_FULL, &error),
+ NANOARROW_OK)
+ << error.message;
+ ASSERT_NE(batch.children[0]->dictionary, nullptr);
+ ASSERT_EQ(batch.children[0]->dictionary->length, 3);
+ ArrowArrayRelease(&batch);
+ ArrowIpcSharedBufferReset(&record_batch_shared);
+
ArrowArrayViewReset(&array_view);
- ArrowIpcSharedBufferReset(&shared);
ArrowIpcDictionariesReset(&dictionaries);
ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
ArrowSchemaRelease(&schema);
@@ -1140,7 +1209,14 @@ std::string ArrowSchemaToString(const struct
ArrowSchema* schema) {
#if defined(NANOARROW_BUILD_TESTS_WITH_ARROW)
TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowTypeRoundtrip) {
if (GetParam()->id() == arrow::Type::DICTIONARY) {
- GTEST_SKIP() << "Dictionary array decode is not yet supported";
+ GTEST_SKIP() << "Dictionary array encode is not yet supported";
+ }
+
+ if (GetParam()->id() == arrow::Type::EXTENSION &&
+
std::static_pointer_cast<arrow::ExtensionType>(GetParam())->storage_type()->id()
==
+ arrow::Type::DICTIONARY) {
+ GTEST_SKIP()
+ << "nanoarrow encoder cannot yet encode extension types with
dictionary storage";
}
nanoarrow::UniqueSchema schema;
@@ -1182,16 +1258,20 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcNanoarrowTypeRoundtrip) {
#if defined(NANOARROW_BUILD_TESTS_WITH_ARROW)
TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
- if (GetParam()->id() == arrow::Type::DICTIONARY) {
- GTEST_SKIP() << "Dictionary array decode is not yet supported";
+ const std::shared_ptr<arrow::DataType>& data_type = GetParam();
+
+ if (data_type->id() == arrow::Type::DICTIONARY &&
+
std::static_pointer_cast<arrow::DictionaryType>(data_type)->value_type()->id()
==
+ Type::EXTENSION) {
+ GTEST_SKIP()
+ << "Arrow C++ MakeEmpty() doesn't support dictionary with extension
value types";
}
- const std::shared_ptr<arrow::DataType>& data_type = GetParam();
std::shared_ptr<arrow::Schema> dummy_schema =
arrow::schema({arrow::field("dummy_name", data_type)});
auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema);
- ASSERT_TRUE(maybe_empty.ok());
+ ASSERT_TRUE(maybe_empty.ok()) << maybe_empty.status();
auto empty = maybe_empty.ValueUnsafe();
auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3);
@@ -1206,10 +1286,24 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
struct ArrowBufferView buffer_view;
struct ArrowArray array;
- // Initialize the decoder
+ // Initialize the schema
ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok());
+
+ // Initialize the dictionaries
+ struct ArrowIpcDictionaryEncodings dictionary_encodings;
+ struct ArrowIpcDictionaries dictionaries;
+ ArrowIpcDictionaryEncodingsInit(&dictionary_encodings);
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppendSchema(&dictionary_encodings,
&schema),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDictionariesInit(&dictionaries, &dictionary_encodings,
nullptr),
+ NANOARROW_OK);
+
+ // Initialize the decoder
ArrowIpcDecoderInit(&decoder);
- ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderSetSchemaWithDictionaries(&decoder, &schema,
+ &dictionary_encodings,
nullptr),
+ NANOARROW_OK);
+ ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
// Check the empty array
auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options);
@@ -1220,14 +1314,22 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
buffer_view.data.as_uint8 += decoder.header_size_bytes;
buffer_view.size_bytes -= decoder.header_size_bytes;
- ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array,
- NANOARROW_VALIDATION_LEVEL_FULL,
nullptr),
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries(
+ &decoder, buffer_view, -1, &dictionaries, &array,
+ NANOARROW_VALIDATION_LEVEL_FULL, nullptr),
NANOARROW_OK);
auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
ASSERT_TRUE(maybe_batch.ok());
EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString());
- EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty));
+
+ // Arrow C++ MakeEmpty() loses the ordered=1 flag for dictionary types.
+ // https://github.com/apache/arrow/issues/49674
+ // So for ordered dictionaries, we only check ToString() equality for empty
batches.
+ if (data_type->id() != arrow::Type::DICTIONARY ||
+ !std::static_pointer_cast<arrow::DictionaryType>(data_type)->ordered()) {
+ EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty)) <<
empty->ToString();
+ }
// Check the array with 3 null values
maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options);
@@ -1238,8 +1340,9 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
buffer_view.data.as_uint8 += decoder.header_size_bytes;
buffer_view.size_bytes -= decoder.header_size_bytes;
- ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array,
- NANOARROW_VALIDATION_LEVEL_FULL,
nullptr),
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries(
+ &decoder, buffer_view, -1, &dictionaries, &array,
+ NANOARROW_VALIDATION_LEVEL_FULL, nullptr),
NANOARROW_OK);
maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
@@ -1248,6 +1351,7 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));
ArrowSchemaRelease(&schema);
+ ArrowIpcDictionariesReset(&dictionaries);
ArrowIpcDecoderReset(&decoder);
}
#endif
@@ -1281,6 +1385,13 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcNanoarrowArrayRoundtrip) {
GTEST_SKIP() << "nanoarrow encoder cannot yet encode dictionaries";
}
+ if (GetParam()->id() == arrow::Type::EXTENSION &&
+
std::static_pointer_cast<arrow::ExtensionType>(GetParam())->storage_type()->id()
==
+ arrow::Type::DICTIONARY) {
+ GTEST_SKIP()
+ << "nanoarrow encoder cannot yet encode extension types with
dictionary storage";
+ }
+
struct ArrowError error;
nanoarrow::UniqueSchema schema;
ASSERT_TRUE(
@@ -1334,6 +1445,43 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcNanoarrowArrayRoundtrip) {
}
}
+// Extension type with dictionary storage for testing
+class DictExtensionType : public ExtensionType {
+ public:
+ explicit DictExtensionType() : ExtensionType(dictionary(int32(), utf8())) {}
+
+ std::string extension_name() const override { return "dict-extension"; }
+
+ bool ExtensionEquals(const ExtensionType& other) const override {
+ return other.extension_name() == extension_name();
+ }
+
+ std::shared_ptr<Array> MakeArray(std::shared_ptr<ArrayData> data) const
override {
+ return std::make_shared<ExtensionArray>(data);
+ }
+
+ Result<std::shared_ptr<DataType>> Deserialize(
+ std::shared_ptr<DataType> storage_type,
+ const std::string& serialized) const override {
+ return std::make_shared<DictExtensionType>();
+ }
+
+ std::string Serialize() const override { return ""; }
+};
+
+std::shared_ptr<DataType> dict_extension() {
+ static bool registered = false;
+ auto type = std::make_shared<DictExtensionType>();
+ if (!registered) {
+ auto status = RegisterExtensionType(type);
+ if (!status.ok() && !status.IsKeyError()) {
+ status.Abort();
+ }
+ registered = true;
+ }
+ return type;
+}
+
INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, ArrowTypeParameterizedTestFixture,
::testing::Values(
@@ -1383,8 +1531,12 @@ INSTANTIATE_TEST_SUITE_P(
arrow::dictionary(arrow::int32(), arrow::utf8(), true),
// Extension type
arrow::extension::uuid(),
- // Dictionary-encoded extension
- arrow::dictionary(arrow::int32(), arrow::extension::uuid())));
+ // Extension type with dictionary as the storage type
+ dict_extension()
+ // Dictionary-encoded extension is not supported in IPC
+ // https://github.com/apache/arrow/issues/49704
+ // arrow::dictionary(arrow::int32(), arrow::extension::uuid()))
+ ));
class ArrowSchemaParameterizedTestFixture
: public ::testing::TestWithParam<std::shared_ptr<arrow::Schema>> {
@@ -1545,11 +1697,12 @@ INSTANTIATE_TEST_SUITE_P(
// Dictionary with field metadata
arrow::schema({arrow::field(
"some_name", arrow::dictionary(arrow::int32(), arrow::utf8()),
- arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1",
"value2"}))}),
- // Dictionary with field metadata
- arrow::schema({arrow::field(
- "some_name", arrow::dictionary(arrow::int32(),
arrow::extension::uuid()),
- arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1",
"value2"}))})));
+ arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1",
"value2"}))})
+ // Dictionary with extension storage and field metadata is not
supported in IPC
+ // arrow::schema({arrow::field(
+ // "some_name", arrow::dictionary(arrow::int32(),
arrow::extension::uuid()),
+ // arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1",
"value2"}))})
+ ));
class ArrowTypeIdParameterizedTestFixture
: public ::testing::TestWithParam<enum ArrowType> {
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index 6a1d7c78..6b70a0c7 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -24,6 +24,7 @@
#if defined(NANOARROW_BUILD_TESTS_WITH_ARROW)
#include <arrow/buffer.h>
#include <arrow/c/bridge.h>
+#include <arrow/extension_type.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <arrow/table.h>
@@ -51,10 +52,12 @@ using namespace arrow;
// would read.
class TestFile {
public:
- TestFile(std::string path, int expected_return_code, std::string
expected_error_message)
+ TestFile(std::string path, int expected_return_code, std::string
expected_error_message,
+ bool write_supported = true)
: path_(path),
expected_return_code_(expected_return_code),
- expected_error_message_(expected_error_message) {}
+ expected_error_message_(expected_error_message),
+ write_supported_(write_supported) {}
TestFile(std::string path) : TestFile(path, NANOARROW_OK, "") {}
@@ -62,6 +65,10 @@ class TestFile {
static TestFile OK(std::string path) { return TestFile(path); }
+ static TestFile ReadOnly(std::string path) {
+ return TestFile(path, NANOARROW_OK, "", false);
+ }
+
static TestFile Err(int code, std::string path, std::string message =
"__any__") {
return TestFile(path, code, message);
}
@@ -228,7 +235,8 @@ class TestFile {
return ArrowIpcWriterWriteArrayView(writer.get(), nullptr, error);
}
- void TestEqualsArrowCpp(const std::string& dir_prefix) {
+ void TestEqualsArrowCpp(const std::string& dir_prefix,
+ bool check_write_roundtrip = true) {
std::stringstream path_builder;
path_builder << dir_prefix << "/" << path_;
@@ -251,11 +259,15 @@ class TestFile {
GTEST_FAIL() << MakeError(NANOARROW_OK, "");
}
- // Write back to a buffer using nanoarrow
+ // Write back to a buffer using nanoarrow if supported. We do this here
+ // because we need to move the arrays into the comparison for the Arrow C++
+ // read.
nanoarrow::UniqueBuffer roundtripped;
- ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error),
- NANOARROW_OK)
- << error.message;
+ if (write_supported_) {
+ ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(),
&error),
+ NANOARROW_OK)
+ << error.message;
+ }
// Read the same file with Arrow C++
auto maybe_table_arrow =
ReadTable(io::ReadableFile::Open(path_builder.str()));
@@ -266,6 +278,11 @@ class TestFile {
maybe_table_arrow.ValueUnsafe());
}
+ // For types that aren't supported by the writer yet
+ if (!write_supported_) {
+ return;
+ }
+
auto maybe_table_roundtripped =
ReadTable(BufferInputStream(roundtripped.get()));
{
SCOPED_TRACE("Read the roundtripped buffer using Arrow C++");
@@ -348,6 +365,7 @@ class TestFile {
// Use testing utility to compare
nanoarrow::testing::TestingJSONComparison comparison;
+ comparison.set_compare_metadata_order(false);
ASSERT_EQ(comparison.CompareArrayStream(ipc_stream.get(),
json_stream.get(), &error),
NANOARROW_OK)
<< error.message;
@@ -378,6 +396,7 @@ class TestFile {
std::string path_;
int expected_return_code_;
std::string expected_error_message_;
+ bool write_supported_;
};
// For better testing output
@@ -404,7 +423,13 @@ class TestEndianFileFixture : public
::testing::TestWithParam<TestFile> {
TestFile test_file;
};
+bool EnsureUuidIsNotRegistered() {
+ return arrow::UnregisterExtensionType("arrow.uuid").ok();
+}
+
TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileNativeEndian) {
+ EnsureUuidIsNotRegistered();
+
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -422,6 +447,8 @@ TEST_P(TestEndianFileFixture,
NanoarrowIpcTestFileNativeEndian) {
}
TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileSwapEndian) {
+ EnsureUuidIsNotRegistered();
+
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -439,6 +466,8 @@ TEST_P(TestEndianFileFixture,
NanoarrowIpcTestFileSwapEndian) {
}
TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileCheckJSON) {
+ EnsureUuidIsNotRegistered();
+
std::stringstream dir_builder;
ArrowError error;
ArrowErrorInit(&error);
@@ -477,20 +506,10 @@ INSTANTIATE_TEST_SUITE_P(
TestFile::OK("generated_primitive.stream"),
TestFile::OK("generated_recursive_nested.stream"),
TestFile::OK("generated_union.stream"),
-
- // Files with features that are not yet supported (Dictionary encoding)
- TestFile::NotSupported(
- "generated_dictionary_unsigned.stream",
- "Found valid dictionary batch but dictionary encoding is not yet
supported"),
- TestFile::NotSupported(
- "generated_dictionary.stream",
- "Found valid dictionary batch but dictionary encoding is not yet
supported"),
- TestFile::NotSupported(
- "generated_nested_dictionary.stream",
- "Found valid dictionary batch but dictionary encoding is not yet
supported"),
- TestFile::NotSupported(
- "generated_extension.stream",
- "Found valid dictionary batch but dictionary encoding is not yet
supported")
+ TestFile::ReadOnly("generated_dictionary_unsigned.stream"),
+ TestFile::ReadOnly("generated_dictionary.stream"),
+ TestFile::ReadOnly("generated_nested_dictionary.stream"),
+ TestFile::ReadOnly("generated_extension.stream")
// Comment to keep last line from wrapping
));
diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c
index c70b1448..89750824 100644
--- a/src/nanoarrow/ipc/reader.c
+++ b/src/nanoarrow/ipc/reader.c
@@ -191,6 +191,7 @@ struct ArrowIpcArrayStreamReaderPrivate {
struct ArrowBuffer header;
struct ArrowBuffer body;
int32_t expected_header_prefix_size;
+ struct ArrowIpcDictionaries dictionaries;
struct ArrowError error;
};
@@ -211,13 +212,16 @@ static void ArrowIpcArrayStreamReaderRelease(struct
ArrowArrayStream* stream) {
ArrowBufferReset(&private_data->header);
ArrowBufferReset(&private_data->body);
+ if (private_data->dictionaries.private_data != NULL) {
+ ArrowIpcDictionariesReset(&private_data->dictionaries);
+ }
+
ArrowFree(private_data);
stream->release = NULL;
}
static int ArrowIpcArrayStreamReaderNextHeader(
- struct ArrowIpcArrayStreamReaderPrivate* private_data,
- enum ArrowIpcMessageType message_type) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data, int
schema_expected) {
private_data->header.size_bytes = 0;
int64_t bytes_read = 0;
@@ -332,7 +336,10 @@ static int ArrowIpcArrayStreamReaderNextHeader(
// Don't decode the message if it's of the wrong type (because the error
message
// is better communicated by the caller)
- if (private_data->decoder.message_type != message_type) {
+ if ((schema_expected &&
+ private_data->decoder.message_type !=
NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) ||
+ (!schema_expected &&
+ private_data->decoder.message_type ==
NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)) {
return NANOARROW_OK;
}
@@ -372,8 +379,7 @@ static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
return NANOARROW_OK;
}
- NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(
- private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(private_data,
1));
// Error if this isn't a schema message
if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)
{
@@ -415,11 +421,21 @@ static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
return ENOTSUP;
}
+ // Initialize dictionary decoders
+ int result = ArrowIpcDictionariesInit(&private_data->dictionaries,
+ &dictionary_encodings,
&private_data->error);
+ if (result != NANOARROW_OK) {
+ ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
+ ArrowSchemaRelease(&tmp);
+ return result;
+ }
+
// Notify the decoder of the schema for forthcoming messages
- int result = ArrowIpcDecoderSetSchemaWithDictionaries(
+ result = ArrowIpcDecoderSetSchemaWithDictionaries(
&private_data->decoder, &tmp, &dictionary_encodings,
&private_data->error);
ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
if (result != NANOARROW_OK) {
+ ArrowIpcDictionariesReset(&private_data->dictionaries);
ArrowSchemaRelease(&tmp);
return result;
}
@@ -437,19 +453,72 @@ static int ArrowIpcArrayStreamReaderGetSchema(struct
ArrowArrayStream* stream,
return ArrowSchemaDeepCopy(&private_data->out_schema, out);
}
-static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
- struct ArrowArray* out) {
- struct ArrowIpcArrayStreamReaderPrivate* private_data =
- (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
- ArrowErrorInit(&private_data->error);
-
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+static int ArrowIpcArrayStreamReaderProcessRecordBatch(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data, struct ArrowArray*
out) {
+ // Read in the body
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+ if (private_data->use_shared_buffers) {
+ struct ArrowIpcSharedBuffer shared;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcSharedBufferInit(&shared, &private_data->body),
&private_data->error);
+ int result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(
+ &private_data->decoder, &shared, private_data->field_index,
+ &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL,
+ &private_data->error);
+ ArrowIpcSharedBufferReset(&shared);
+ NANOARROW_RETURN_NOT_OK(result);
+ } else {
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayWithDictionaries(
+ &private_data->decoder, body_view, private_data->field_index,
+ &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL,
+ &private_data->error));
+ }
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderProcessDictionary(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+ // Read in the body
+ NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+ if (private_data->use_shared_buffers) {
+ // Decode the dictionary
+ struct ArrowIpcSharedBuffer shared;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcSharedBufferInit(&shared, &private_data->body),
&private_data->error);
+ int result = ArrowIpcDecoderDecodeDictionaryFromShared(
+ &private_data->decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL,
+ &private_data->dictionaries, &private_data->error);
+ ArrowIpcSharedBufferReset(&shared);
+ NANOARROW_RETURN_NOT_OK(result);
+ } else {
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeDictionary(
+ &private_data->decoder, body_view, NANOARROW_VALIDATION_LEVEL_FULL,
+ &private_data->dictionaries, &private_data->error));
+ }
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderProcessMessage(
+ struct ArrowIpcArrayStreamReaderPrivate* private_data,
+ enum ArrowIpcMessageType* message_type, struct ArrowArray* out) {
// Read + decode the next header
- int result = ArrowIpcArrayStreamReaderNextHeader(
- private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+ int result = ArrowIpcArrayStreamReaderNextHeader(private_data, 0);
if (result == ENODATA) {
// Stream is finished either because there is no input or because
- // end of stream bytes were read.
+ // end of stream bytes were read. Read this as a RecordBatch in the
+ // sense that we populate out->release to NULL and return OK.
+ *message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH;
out->release = NULL;
return NANOARROW_OK;
} else if (result != NANOARROW_OK) {
@@ -457,44 +526,43 @@ static int ArrowIpcArrayStreamReaderGetNext(struct
ArrowArrayStream* stream,
return result;
}
- // Make sure we have a RecordBatch message
+ // Make sure we have a RecordBatch message or DictionaryBatch message
switch (private_data->decoder.message_type) {
case NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH:
- break;
+ *message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH;
+ return ArrowIpcArrayStreamReaderProcessRecordBatch(private_data, out);
case NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH:
- ArrowErrorSet(
- &private_data->error,
- "Found valid dictionary batch but dictionary encoding is not yet
supported");
- return ENOTSUP;
+ *message_type = NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH;
+ return ArrowIpcArrayStreamReaderProcessDictionary(private_data);
default:
ArrowErrorSet(&private_data->error,
- "Unexpected message type (expected RecordBatch)");
+ "Unexpected message type (expected RecordBatch or
DictionaryBatch)");
return EINVAL;
}
+}
- // Read in the body
- NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
+ struct ArrowArray* out) {
+ struct ArrowIpcArrayStreamReaderPrivate* private_data =
+ (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+ ArrowErrorInit(&private_data->error);
+
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+ enum ArrowIpcMessageType message_type;
struct ArrowArray tmp;
+ tmp.release = NULL;
+
+ do {
+ int result =
+ ArrowIpcArrayStreamReaderProcessMessage(private_data, &message_type,
&tmp);
+ if (result != NANOARROW_OK) {
+ if (tmp.release != NULL) {
+ ArrowArrayRelease(&tmp);
+ }
- if (private_data->use_shared_buffers) {
- struct ArrowIpcSharedBuffer shared;
- NANOARROW_RETURN_NOT_OK_WITH_ERROR(
- ArrowIpcSharedBufferInit(&shared, &private_data->body),
&private_data->error);
- result = ArrowIpcDecoderDecodeArrayFromShared(
- &private_data->decoder, &shared, private_data->field_index, &tmp,
- NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error);
- ArrowIpcSharedBufferReset(&shared);
- NANOARROW_RETURN_NOT_OK(result);
- } else {
- struct ArrowBufferView body_view;
- body_view.data.data = private_data->body.data;
- body_view.size_bytes = private_data->body.size_bytes;
-
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(
- &private_data->decoder, body_view, private_data->field_index, &tmp,
- NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error));
- }
+ return result;
+ }
+ } while (message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
ArrowArrayMove(&tmp, out);
return NANOARROW_OK;
@@ -528,6 +596,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
private_data->out_schema.release = NULL;
ArrowIpcInputStreamMove(input_stream, &private_data->input);
private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet;
+ private_data->dictionaries.private_data = NULL;
if (options != NULL) {
private_data->field_index = options->field_index;
diff --git a/src/nanoarrow/ipc/reader_test.cc b/src/nanoarrow/ipc/reader_test.cc
index 8139503a..d257e111 100644
--- a/src/nanoarrow/ipc/reader_test.cc
+++ b/src/nanoarrow/ipc/reader_test.cc
@@ -57,6 +57,64 @@ static uint8_t kSimpleRecordBatch[] = {
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+alignas(8) static uint8_t kDictionarySchema[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x50, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00,
0x00, 0x00,
+ 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00,
0x0c, 0x00,
+ 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0xb0, 0x00,
0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00,
0x08, 0x00,
+ 0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x8c, 0x00,
0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x7e, 0x00, 0x00, 0x00, 0x41, 0x0a, 0x33, 0x0a,
0x32, 0x36,
+ 0x33, 0x31, 0x37, 0x30, 0x0a, 0x31, 0x39, 0x37, 0x38, 0x38, 0x38, 0x0a,
0x35, 0x0a,
+ 0x55, 0x54, 0x46, 0x2d, 0x38, 0x0a, 0x35, 0x33, 0x31, 0x0a, 0x31, 0x0a,
0x35, 0x33,
+ 0x31, 0x0a, 0x31, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x31, 0x30, 0x32, 0x36,
0x0a, 0x31,
+ 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x35, 0x0a, 0x6e, 0x61,
0x6d, 0x65,
+ 0x73, 0x0a, 0x31, 0x36, 0x0a, 0x31, 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35,
0x33, 0x0a,
+ 0x38, 0x0a, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x0a, 0x32,
0x35, 0x34,
+ 0x0a, 0x31, 0x30, 0x32, 0x36, 0x0a, 0x35, 0x31, 0x31, 0x0a, 0x31, 0x36,
0x0a, 0x31,
+ 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x37, 0x0a, 0x63, 0x6f,
0x6c, 0x75,
+ 0x6d, 0x6e, 0x73, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00,
+ 0x72, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x18, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x10, 0x00,
0x14, 0x00,
+ 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x05, 0x14, 0x00, 0x00, 0x00,
0x48, 0x00,
+ 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c,
0x00, 0x00,
+ 0x00, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x08, 0x00,
0x00, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00,
0x08, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00,
0x04, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
+alignas(8) static uint8_t kDictionaryBatch[] = {
+ 0xff, 0xff, 0xff, 0xff, 0xa8, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x14, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x02, 0x04, 0x00, 0x14, 0x00, 0x00, 0x00,
0x20, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
0x04, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00,
0x18, 0x00,
+ 0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x4c, 0x00,
0x00, 0x00,
+ 0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x04, 0x00,
+ 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x7a, 0x65,
0x72, 0x6f,
+ 0x6f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+};
+
+alignas(8) static uint8_t kDictionaryRecordBatch[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00,
0x08, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00,
0x0c, 0x00,
+ 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00};
+
static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00,
0x00};
TEST(NanoarrowIpcReader, InputStreamBuffer) {
@@ -335,7 +393,8 @@ TEST(NanoarrowIpcReader, StreamReaderExpectedRecordBatch) {
struct ArrowArray array;
struct ArrowError error;
ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, &error), EINVAL);
- EXPECT_STREQ(error.message, "Unexpected message type (expected
RecordBatch)");
+ EXPECT_STREQ(error.message,
+ "Unexpected message type (expected RecordBatch or
DictionaryBatch)");
ArrowArrayStreamRelease(&stream);
}
@@ -482,3 +541,105 @@ TEST(NanoarrowIpcReader, StreamReaderIncompletePrefix) {
ArrowArrayStreamRelease(&stream);
}
+
+TEST(NanoarrowIpcReader, StreamReaderDictionary) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kDictionarySchema,
sizeof(kDictionarySchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryBatch,
sizeof(kDictionaryBatch)),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryRecordBatch,
+ sizeof(kDictionaryRecordBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ ASSERT_EQ(schema.n_children, 1);
+ // Dictionary-encoded field with int8 indices
+ EXPECT_STREQ(schema.children[0]->format, "c");
+ ASSERT_NE(schema.children[0]->dictionary, nullptr);
+ // Dictionary values are utf8 strings
+ EXPECT_STREQ(schema.children[0]->dictionary->format, "u");
+ ArrowSchemaRelease(&schema);
+
+ struct ArrowArray array;
+ ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, nullptr), NANOARROW_OK);
+ EXPECT_EQ(array.length, 3);
+ ASSERT_EQ(array.n_children, 1);
+ // The child should have a dictionary
+ ASSERT_NE(array.children[0]->dictionary, nullptr);
+ EXPECT_EQ(array.children[0]->dictionary->length, 3);
+ ArrowArrayRelease(&array);
+
+ ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, nullptr), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ ArrowArrayStreamRelease(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderDictionaryBatchWithoutDictionarySchema) {
+ // Send a dictionary batch when the schema has no dictionaries
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryBatch,
sizeof(kDictionaryBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr),
NANOARROW_OK);
+ ArrowSchemaRelease(&schema);
+
+ struct ArrowArray array;
+ struct ArrowError error;
+ ASSERT_NE(ArrowArrayStreamGetNext(&stream, &array, &error), NANOARROW_OK);
+ ASSERT_GT(strlen(ArrowArrayStreamGetLastError(&stream)), 0);
+
+ ArrowArrayStreamRelease(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderRecordBatchWithoutDictionaryBatch) {
+ // Send a record batch referencing a dictionary before the dictionary values
arrive
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kDictionarySchema,
sizeof(kDictionarySchema)),
+ NANOARROW_OK);
+ // Skip the dictionary batch and go straight to the record batch
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryRecordBatch,
+ sizeof(kDictionaryRecordBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr),
NANOARROW_OK);
+ ArrowSchemaRelease(&schema);
+
+ struct ArrowArray array;
+ struct ArrowError error;
+ // Should error because dictionary values were never provided
+ ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, &error), EINVAL);
+ ASSERT_GT(strlen(ArrowArrayStreamGetLastError(&stream)), 0);
+
+ ArrowArrayStreamRelease(&stream);
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index e2b114cd..f4c8ef2c 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -53,10 +53,17 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderDecodeSchemaWithDictionaries)
#define ArrowIpcDecoderDecodeArrayView \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayView)
+#define ArrowIpcDecoderDecodeArrayViewWithDictionaries \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderDecodeArrayViewWithDictionaries)
#define ArrowIpcDecoderDecodeArray \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderDecodeArrayWithDictionaries \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderDecodeArrayWithDictionaries)
#define ArrowIpcDecoderDecodeArrayFromShared \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared)
+#define ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, \
+ ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries)
#define ArrowIpcDecoderSetSchema \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetSchemaWithDictionaries \
@@ -113,6 +120,8 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsInit)
#define ArrowIpcDictionaryEncodingsAppend \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsAppend)
+#define ArrowIpcDictionaryEncodingsAppendSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDictionaryEncodingsAppendSchema)
#define ArrowIpcDictionaryEncodingsFind \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsFind)
#define ArrowIpcDictionaryEncodingsFindById \
@@ -129,6 +138,8 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionariesReset)
#define ArrowIpcDecoderDecodeDictionary \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionary)
+#define ArrowIpcDecoderDecodeDictionaryFromShared \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderDecodeDictionaryFromShared)
#endif
@@ -243,6 +254,12 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDictionaryEncodingsAppend(
struct ArrowIpcDictionaryEncodings* dictionary_encodings,
struct ArrowIpcDictionaryEncoding encoding);
+/// \brief Append all dictionaries in schema identified according to a
depth-first
+/// recursive search starting at 0
+NANOARROW_DLL ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchema(
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ const struct ArrowSchema* schema);
+
/// \brief Resolve a ArrowIpcDictionaryEncoding for a given dictionary encoded
field
///
/// Returns NULL if the pointed to schema does not match any of the pointed to
@@ -588,7 +605,7 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderSetSchemaWithDictionaries(
NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderSetEndianness(
struct ArrowIpcDecoder* decoder, enum ArrowIpcEndianness endianness);
-/// \brief Decode an ArrowArrayView
+/// \brief Decode an ArrowArrayView with dictionary decoding support
///
/// After a successful call to ArrowIpcDecoderDecodeHeader(), deserialize the
content
/// of body into an internally-managed ArrowArrayView and return it. Note that
field index
@@ -600,11 +617,23 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderSetEndianness(
/// will not perform any heap allocations; however, the buffers referred to by
the
/// returned ArrowArrayView are only valid as long as the buffer referred to
by body stays
/// valid.
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayViewWithDictionaries(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArrayView** out,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArrayView without dictionary decoding
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), deserialize the
content
+/// of body into an internally-managed ArrowArrayView and return it.
+///
+/// This is equivalent to ArrowIpcDecoderDecodeArrayViewWithDictionaries()
passing
+/// dictionaries as NULL.
NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayView(
struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
struct ArrowArrayView** out, struct ArrowError* error);
-/// \brief Decode an ArrowArray
+/// \brief Decode an ArrowArray with dictionary decoding support
///
/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an
ArrowArray given
/// a message body and a field index. Note that field index does not equate to
column
@@ -615,24 +644,65 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderDecodeArrayView(
/// Returns EINVAL if the decoder did not just decode a record batch message,
ENOTSUP
/// if the message uses features not supported by this library, or or
NANOARROW_OK
/// otherwise.
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayWithDictionaries(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out,
+ enum ArrowValidationLevel validation_level, struct ArrowError* error);
+
+/// \brief Decode an ArrowArray without dictionary decoding support
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an
ArrowArray given
+/// a message body and a field index.
+///
+/// This is equivalent to calling ArrowIpcDecoderDecodeArrayWithDictionaries()
passing
+/// dictionaries as NULL.
NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArray(
struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i,
struct ArrowArray* out, enum ArrowValidationLevel validation_level,
struct ArrowError* error);
-/// \brief Decode an ArrowArray from an owned buffer
+/// \brief Decode an ArrowArray from an owned buffer with dictionary decoding
support
///
/// This implementation takes advantage of the fact that it can avoid copying
individual
/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body
after one or
/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by
another
/// thread.
+NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
int64_t i,
+ struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out,
+ enum ArrowValidationLevel validation_level, struct ArrowError* error);
+
+/// \brief Decode an ArrowArray from an owned buffer
+///
+/// Equivalent to calling
ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries() with
+/// dictionaries as NULL.
NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
int64_t i,
struct ArrowArray* out, enum ArrowValidationLevel validation_level,
struct ArrowError* error);
+/// \brief Decode an ArrowArray from a dictionary batch into the given
+/// ArrowIpcDictionaries
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an
ArrowArray given
+/// and place it into out for the decoding of future dictionaries. Note that
other
+/// dictionaries in out may be used during the decoding if there are nested
dictionaries
+/// in this stream. The decoded value may be obtained with
+/// ArrowIpcDictionariesFindCurrentValue.
NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary(
+ struct ArrowIpcDecoder* decoder, struct ArrowBufferView body,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
out,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArray from a dictionary batch from an owned buffer
+///
+/// This implementation takes advantage of the fact that it can avoid copying
individual
+/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body
after one or
+/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
+/// ArrowIpcSharedBufferIsThreadSafe() returns 0, no batches decoded using out
may
+/// be released from another thread.
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared(
struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
out,
struct ArrowError* error);