This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 86a0b07a feat: Decode dictionary batches (#858)
86a0b07a is described below
commit 86a0b07af924071eeaace806911b02d3c220f6a3
Author: Dewey Dunnington <[email protected]>
AuthorDate: Mon Apr 6 20:15:22 2026 -0500
feat: Decode dictionary batches (#858)
After https://github.com/apache/arrow-nanoarrow/pull/856 we can know
which nodes in an ArrowSchema are dictionary encoded and what their IDs
are. This PR implements the `ArrowIpcDictionaries` tracker and the
actual decode step.
Doesn't quite solve #845 because we need one more step (thread the
`ArrowIpcDictionaries` through the decode, which involves solving some
things about cloning arrays and array views/validation.
---
src/nanoarrow/ipc/decoder.c | 376 +++++++++++++++++++++++++++++++++++++-
src/nanoarrow/ipc/decoder_test.cc | 149 ++++++++++++++-
src/nanoarrow/ipc/ipc_hpp_test.cc | 27 +++
src/nanoarrow/nanoarrow_ipc.h | 75 +++++++-
src/nanoarrow/nanoarrow_ipc.hpp | 22 +++
5 files changed, 627 insertions(+), 22 deletions(-)
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 400d483e..520219ed 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -57,6 +57,8 @@
#define NANOARROW_IPC_NO_DICTIONARY_ID INT64_MIN
+#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
+
// Internal representation of a parsed "Field" from flatbuffers. This
// represents a field in a depth-first walk of column arrays and their
// children.
@@ -285,10 +287,10 @@ void ArrowIpcDictionaryEncodingsInit(
}
ArrowErrorCode ArrowIpcDictionaryEncodingsAppend(
- struct ArrowIpcDictionaryEncodings* dictionaries,
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings,
struct ArrowIpcDictionaryEncoding encoding) {
- NANOARROW_DCHECK(dictionaries != NULL);
- NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(&dictionaries->encodings,
&encoding,
+ NANOARROW_DCHECK(dictionary_encodings != NULL);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(&dictionary_encodings->encodings,
&encoding,
sizeof(struct
ArrowIpcDictionaryEncoding)));
return NANOARROW_OK;
}
@@ -312,12 +314,302 @@ const struct ArrowIpcDictionaryEncoding*
ArrowIpcDictionaryEncodingsFind(
return NULL;
}
+const struct ArrowIpcDictionaryEncoding* ArrowIpcDictionaryEncodingsFindById(
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings, int64_t
id) {
+ NANOARROW_DCHECK(dictionary_encodings != NULL);
+ int64_t length = dictionary_encodings->encodings.size_bytes /
+ sizeof(struct ArrowIpcDictionaryEncoding);
+ const struct ArrowIpcDictionaryEncoding* data =
+ (const struct
ArrowIpcDictionaryEncoding*)dictionary_encodings->encodings.data;
+
+ for (int64_t i = 0; i < length; i++) {
+ const struct ArrowIpcDictionaryEncoding* encoding = data + i;
+ if (encoding->id == id) {
+ return encoding;
+ }
+ }
+
+ return NULL;
+}
+
+static int ArrowIpcIdsListContains(const struct ArrowBuffer* ids_buffer,
int64_t num_ids,
+ int64_t id) {
+ const int64_t* ids = (const int64_t*)ids_buffer->data;
+ for (int64_t i = 0; i < num_ids; i++) {
+ if (ids[i] == id) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+ArrowErrorCode ArrowIpcDictionaryEncodingsUniqueIds(
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ struct ArrowBuffer* out) {
+ NANOARROW_DCHECK(dictionary_encodings != NULL);
+ int64_t length = dictionary_encodings->encodings.size_bytes /
+ sizeof(struct ArrowIpcDictionaryEncoding);
+ const struct ArrowIpcDictionaryEncoding* data =
+ (const struct
ArrowIpcDictionaryEncoding*)dictionary_encodings->encodings.data;
+
+ struct ArrowBuffer tmp;
+ ArrowBufferInit(&tmp);
+
+ ArrowErrorCode result;
+ int64_t num_ids = 0;
+ for (int64_t i = 0; i < length; i++) {
+ const struct ArrowIpcDictionaryEncoding* encoding = data + i;
+ if (!ArrowIpcIdsListContains(&tmp, num_ids, encoding->id)) {
+ result = ArrowBufferAppendInt64(&tmp, encoding->id);
+ if (result != NANOARROW_OK) {
+ ArrowBufferReset(&tmp);
+ return result;
+ }
+
+ ++num_ids;
+ }
+ }
+
+ ArrowBufferMove(&tmp, out);
+ return NANOARROW_OK;
+}
+
void ArrowIpcDictionaryEncodingsReset(
struct ArrowIpcDictionaryEncodings* dictionary_encodings) {
NANOARROW_DCHECK(dictionary_encodings != NULL);
ArrowBufferReset(&dictionary_encodings->encodings);
}
+/// \brief Internal member of an ArrowIpcDictionaries
+///
+/// This structure stores a dictionary-specific decoder and the current value
+/// of the dictionary (if one exists).
+struct ArrowIpcDictionary {
+ int64_t id;
+ struct ArrowIpcDecoder decoder;
+ struct ArrowArray current_value;
+};
+
+static ArrowErrorCode ArrowIpcDictionaryInit(struct ArrowIpcDictionary*
dictionary) {
+ dictionary->id = NANOARROW_IPC_NO_DICTIONARY_ID;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderInit(&dictionary->decoder));
+ memset(&dictionary->current_value, 0, sizeof(struct ArrowArray));
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcDictionaryReplace(struct ArrowIpcDictionary*
dictionary,
+ struct ArrowArray* value,
+ struct ArrowError* error) {
+ NANOARROW_UNUSED(error);
+
+ if (dictionary->current_value.release != NULL) {
+ ArrowArrayRelease(&dictionary->current_value);
+ }
+
+ ArrowArrayMove(value, &dictionary->current_value);
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcDictionaryAppend(struct ArrowIpcDictionary*
dictionary,
+ struct ArrowArray* value,
+ struct ArrowError* error) {
+ if (dictionary->current_value.release != NULL) {
+ ArrowErrorSet(error, "Dictionary concatenation is not yet supported");
+ return ENOTSUP;
+ }
+
+ ArrowArrayMove(value, &dictionary->current_value);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcDictionaryReset(struct ArrowIpcDictionary* dictionary) {
+ ArrowIpcDecoderReset(&dictionary->decoder);
+ if (dictionary->current_value.release) {
+ ArrowArrayRelease(&dictionary->current_value);
+ }
+}
+
+struct ArrowIpcDictionariesPrivate {
+ struct ArrowIpcDictionary* dictionaries;
+ int64_t num_dictionaries;
+};
+
+static void ArrowIpcDictionariesNoOpSchemaRelease(struct ArrowSchema* schema) {
+ NANOARROW_UNUSED(schema);
+}
+
+static ArrowErrorCode ArrowIpcDictionariesInitDictionaries(
+ struct ArrowIpcDictionariesPrivate* private_data,
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ const struct ArrowBuffer* unique_ids_buffer, int64_t*
num_initialized_decoders_out,
+ struct ArrowError* error) {
+ ArrowErrorCode result;
+ const int64_t* unique_ids = (const int64_t*)unique_ids_buffer->data;
+
+ // To set the schema of each ArrowIpcDictionary's encoder, we need to
construct a
+ // const ArrowSchema* that points to a struct with exactly one child that is
the
+ // value type. To support nested dictionaries that may exist within
+ // ArrowIpcDictionaryEncodings, we can't deep copy the value type because it
would
+ // invalidate pointer references within dictionary_encodings (i.e., nested
dictionaries
+ // would fail to resolve when we call
ArrowIpcDecoderSetSchemaWithDictionaries()).
+ // To make this work, we create an ArrowSchema with a noop release to pass to
+ // ArrowIpcDecoderSetSchemaWithDictionaries(). This works because that
function
+ // accepts a const value (i.e., never releases the schema argument).
+ struct ArrowSchema decoder_schema;
+ memset(&decoder_schema, 0, sizeof(struct ArrowSchema));
+ decoder_schema.name = "";
+ decoder_schema.format = "+s";
+ decoder_schema.n_children = 1;
+ decoder_schema.release = &ArrowIpcDictionariesNoOpSchemaRelease;
+
+ for (int64_t i = 0; i < private_data->num_dictionaries; i++) {
+ struct ArrowIpcDictionary* dictionary = private_data->dictionaries + i;
+ result = ArrowIpcDictionaryInit(dictionary);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Internal error (failed to initialize dictionary)");
+ *num_initialized_decoders_out = i;
+ return result;
+ }
+
+ const struct ArrowIpcDictionaryEncoding* encoding =
+ ArrowIpcDictionaryEncodingsFindById(dictionary_encodings,
unique_ids[i]);
+ if (encoding == NULL) {
+ ArrowErrorSet(error,
+ "Internal error (dictionary with id not present in
encodings list)");
+ *num_initialized_decoders_out = i + 1;
+ return EINVAL;
+ }
+
+ // The decoder's schema will be a struct with one column (the value type of
+ // the dictionary-encoded field)
+ decoder_schema.children = (struct
ArrowSchema**)&encoding->schema->dictionary;
+
+ result = ArrowIpcDecoderSetSchemaWithDictionaries(
+ &dictionary->decoder, &decoder_schema, dictionary_encodings, error);
+ if (result != NANOARROW_OK) {
+ *num_initialized_decoders_out = i + 1;
+ return result;
+ }
+
+ dictionary->id = unique_ids[i];
+ }
+
+ *num_initialized_decoders_out = private_data->num_dictionaries;
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDictionariesInit(
+ struct ArrowIpcDictionaries* dictionaries,
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(dictionaries != NULL);
+ NANOARROW_DCHECK(dictionary_encodings != NULL);
+
+ memset(dictionaries, 0, sizeof(struct ArrowIpcDictionaries));
+ struct ArrowIpcDictionariesPrivate* private_data =
+ (struct ArrowIpcDictionariesPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcDictionariesPrivate));
+ if (private_data == NULL) {
+ ArrowErrorSet(error, "Failed to allocate ArrowIpcDictionariesPrivate");
+ return ENOMEM;
+ }
+
+ // Count the number of unique IDs. This will usually be the number of
dictionary
+ // encodings but may be much smaller if there are a lot of dictionary
encoded fields
+ // that use the same dictionary.
+ struct ArrowBuffer unique_ids;
+ ArrowErrorCode result =
+ ArrowIpcDictionaryEncodingsUniqueIds(dictionary_encodings, &unique_ids);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Failed to extract unique identifiers");
+ ArrowFree(private_data);
+ return result;
+ }
+
+ // unique_ids is a buffer of int64_t
+ private_data->num_dictionaries = unique_ids.size_bytes / sizeof(int64_t);
+
+ // Allocate the array of ArrowIpcDictionary
+ if (private_data->num_dictionaries > 0) {
+ private_data->dictionaries =
+ ArrowMalloc(private_data->num_dictionaries * sizeof(struct
ArrowIpcDictionary));
+ if (private_data->dictionaries == NULL) {
+ ArrowErrorSet(error, "Failed to allocate ArrowIpcDictionary array");
+ ArrowBufferReset(&unique_ids);
+ ArrowFree(private_data);
+ return ENOMEM;
+ }
+ } else {
+ private_data->dictionaries = NULL;
+ }
+
+ int64_t num_initialized_dictionaries = 0;
+ result = ArrowIpcDictionariesInitDictionaries(private_data,
dictionary_encodings,
+ &unique_ids,
+ &num_initialized_dictionaries,
error);
+ ArrowBufferReset(&unique_ids);
+ if (result != NANOARROW_OK) {
+ for (int64_t i = 0; i < num_initialized_dictionaries; i++) {
+ ArrowIpcDictionaryReset(private_data->dictionaries + i);
+ }
+
+ ArrowFree(private_data->dictionaries);
+ ArrowFree(private_data);
+ return result;
+ }
+
+ dictionaries->private_data = private_data;
+ return NANOARROW_OK;
+}
+
+static struct ArrowIpcDictionary* ArrowIpcDictionariesFindById(
+ struct ArrowIpcDictionaries* dictionaries, int64_t id) {
+ NANOARROW_DCHECK(dictionaries != NULL);
+
+ struct ArrowIpcDictionariesPrivate* private_data =
+ (struct ArrowIpcDictionariesPrivate*)dictionaries->private_data;
+ for (int64_t i = 0; i < private_data->num_dictionaries; i++) {
+ struct ArrowIpcDictionary* dictionary = private_data->dictionaries + i;
+ if (dictionary->id == id) {
+ return dictionary;
+ }
+ }
+
+ return NULL;
+}
+
+ArrowErrorCode ArrowIpcDictionariesFindCurrentValue(
+ struct ArrowIpcDictionaries* dictionaries, int64_t id, const struct
ArrowArray** out,
+ struct ArrowError* error) {
+ struct ArrowIpcDictionary* dictionary =
ArrowIpcDictionariesFindById(dictionaries, id);
+ if (dictionary == NULL) {
+ ArrowErrorSet(error,
+ "Can't find value for dictionary with ID %" PRId64
+ " (dictionary definition not found)",
+ id);
+ return EINVAL;
+ }
+
+ *out = &dictionary->current_value;
+ return NANOARROW_OK;
+}
+
+void ArrowIpcDictionariesReset(struct ArrowIpcDictionaries* dictionaries) {
+ NANOARROW_DCHECK(dictionaries != NULL);
+ struct ArrowIpcDictionariesPrivate* private_data =
+ (struct ArrowIpcDictionariesPrivate*)dictionaries->private_data;
+
+ for (int64_t i = 0; i < private_data->num_dictionaries; i++) {
+ struct ArrowIpcDictionary* dictionary = private_data->dictionaries + i;
+ ArrowIpcDictionaryReset(dictionary);
+ }
+
+ ArrowFree(private_data->dictionaries);
+ ArrowFree(private_data);
+ dictionaries->private_data = NULL;
+}
+
ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {
memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
struct ArrowIpcDecoderPrivate* private_data =
@@ -397,8 +689,6 @@ static inline int32_t ArrowIpcReadInt32LE(struct
ArrowBufferView* data, int swap
return value;
}
-#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
-
static int ArrowIpcDecoderSetMetadata(struct ArrowSchema* schema,
ns(KeyValue_vec_t) kv_vec,
struct ArrowError* error) {
@@ -1788,7 +2078,14 @@ struct ArrowIpcBufferFactory {
/// should be made.
struct ArrowIpcDecompressor* decompressor;
- /// \brief Caller-defined private data to be used in the callback.
+ /// \brief Buffer factory provided buffer length
+ ///
+ /// Rather than use the body length declared by the flatbuffer message, this
is the
+ /// length that should be used to check the bounds of a buffer source (i.e.,
this is
+ /// the actual available length as opposed to the theoretical length).
+ int64_t buffer_length;
+
+ /// \brief User-defined private data to be used in the callback.
///
/// Usually this would be a description of where the body has been read into
memory or
/// information required to do so.
@@ -1878,6 +2175,7 @@ static struct ArrowIpcBufferFactory
ArrowIpcBufferFactoryFromView(
struct ArrowIpcBufferFactory out;
out.make_buffer = &ArrowIpcMakeBufferFromView;
out.decompressor = NULL;
+ out.buffer_length = buffer_view->size_bytes;
out.private_data = buffer_view;
return out;
}
@@ -1918,6 +2216,7 @@ static struct ArrowIpcBufferFactory
ArrowIpcBufferFactoryFromShared(
struct ArrowIpcBufferFactory out;
out.make_buffer = &ArrowIpcMakeBufferFromShared;
out.decompressor = NULL;
+ out.buffer_length = shared->private_src.size_bytes;
out.private_data = shared;
return out;
}
@@ -2052,7 +2351,6 @@ struct ArrowIpcArraySetter {
int64_t field_i;
ns(Buffer_vec_t) buffers;
int64_t buffer_i;
- int64_t body_size_bytes;
struct ArrowIpcBufferSource src;
struct ArrowIpcBufferFactory factory;
enum ArrowIpcMetadataVersion version;
@@ -2071,11 +2369,11 @@ static int ArrowIpcDecoderMakeBuffer(struct
ArrowIpcArraySetter* setter, int64_t
// Check that this buffer fits within the body
int64_t buffer_start = offset;
int64_t buffer_end = buffer_start + length;
- if (buffer_start < 0 || buffer_end > setter->body_size_bytes) {
+ if (buffer_start < 0 || buffer_end > setter->factory.buffer_length) {
ArrowErrorSet(error,
"Buffer requires body offsets [%" PRId64 "..%" PRId64
") but body has size %" PRId64,
- buffer_start, buffer_end, setter->body_size_bytes);
+ buffer_start, buffer_end, setter->factory.buffer_length);
return EINVAL;
}
@@ -2261,7 +2559,6 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
setter.field_i = field_i;
setter.buffers = ns(RecordBatch_buffers(batch));
setter.buffer_i = root->buffer_offset - 1;
- setter.body_size_bytes = decoder->body_size_bytes;
setter.factory = factory;
setter.src.codec = decoder->codec;
setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
@@ -2375,3 +2672,62 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
+
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
dictionaries,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(decoder != NULL);
+ NANOARROW_DCHECK(shared != NULL);
+ NANOARROW_DCHECK(dictionaries != NULL);
+
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH) {
+ ArrowErrorSet(error, "decoder did not just decode a DictionaryBatch
message");
+ return EINVAL;
+ }
+
+ struct ArrowIpcDictionary* dictionary =
+ ArrowIpcDictionariesFindById(dictionaries, decoder->dictionary->id);
+ if (dictionary == NULL) {
+ ArrowErrorSet(error,
+ "Can't decode DictionaryBatch with ID %" PRId64
+ " (dictionary definition not found)",
+ decoder->dictionary->id);
+ return EINVAL;
+ }
+
+ // Set the dictionary->decoder's last message type and last message so that
we can
+ // decode the value.
+ ns(DictionaryBatch_table_t) dictionary_batch =
+ (ns(DictionaryBatch_table_t))private_data->last_message;
+ ns(RecordBatch_table_t) record_batch =
ns(DictionaryBatch_data(dictionary_batch));
+
+ struct ArrowIpcDecoderPrivate* dictionary_decoder_private_data =
+ (struct ArrowIpcDecoderPrivate*)dictionary->decoder.private_data;
+ dictionary->decoder.message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH;
+ dictionary_decoder_private_data->last_message = record_batch;
+
+ struct ArrowArray tmp;
+
+ // TODO: provide ArrowIpcDecoderDecodeArrayInternalWithDictionaries to
handle nested
+ // dictionaries
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+ &dictionary->decoder, shared, 0, &tmp, validation_level, error));
+
+ ArrowErrorCode result;
+ if (decoder->dictionary->is_delta) {
+ result = ArrowIpcDictionaryAppend(dictionary, &tmp, error);
+ } else {
+ result = ArrowIpcDictionaryReplace(dictionary, &tmp, error);
+ }
+
+ if (result != NANOARROW_OK) {
+ ArrowArrayRelease(&tmp);
+ return result;
+ }
+
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index 5f958367..b6aeddb7 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -32,6 +32,7 @@
// For bswap32()
#include "flatcc/portable/pendian.h"
+#include "nanoarrow/nanoarrow_gtest_util.hpp"
#include "nanoarrow/nanoarrow_ipc.hpp"
#if defined(NANOARROW_BUILD_TESTS_WITH_ARROW)
@@ -566,7 +567,7 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeSimpleRecordBatchErrors) {
ArrowArrayRelease(&array);
// Field extract should fail if body is too small
- decoder.body_size_bytes = 0;
+ body.size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
NANOARROW_VALIDATION_LEVEL_FULL,
&error),
EINVAL);
@@ -623,6 +624,9 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionarySchema) {
ASSERT_EQ(encoding->id, 0);
ASSERT_EQ(encoding->kind, NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY);
+ // The dictionary encodings should fail to locate anything except the
decoded ID
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsFindById(&dictionary_encodings, 100),
nullptr);
+
// If we try to set the schema without the dictionaries, we should get an
error
ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
ASSERT_STREQ(error.message,
@@ -642,16 +646,93 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeDictionarySchema) {
ArrowIpcDecoderReset(&decoder);
}
-TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) {
+TEST(NanoarrowIpcTest, NanoarrowIpcDictionaryEncodingsUniqueIds) {
+ // Create dictionary encodings there is at least one duplicated identifier
+ struct ArrowIpcDictionaryEncodings dictionary_encodings;
+ ArrowIpcDictionaryEncodingsInit(&dictionary_encodings);
+
+ struct ArrowIpcDictionaryEncoding encoding;
+ encoding.id = 42;
+ encoding.kind = NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY;
+ encoding.schema = nullptr;
+
+ struct ArrowIpcDictionaryEncoding encoding2;
+ encoding2.id = 142;
+ encoding2.kind = NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY;
+ encoding2.schema = nullptr;
+
+ // Add the encodings a few times to simulate sharedness
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(&dictionary_encodings, encoding),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(&dictionary_encodings,
encoding2),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(&dictionary_encodings, encoding),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(&dictionary_encodings,
encoding2),
+ NANOARROW_OK);
+
+ struct ArrowBuffer unique_ids;
+ ArrowBufferInit(&unique_ids);
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsUniqueIds(&dictionary_encodings,
&unique_ids),
+ NANOARROW_OK);
+
+ // 4 encodings (IDs: 42, 142, 42, 142) should produce 2 unique IDs
+ ASSERT_EQ(unique_ids.size_bytes, 2 * sizeof(int64_t));
+ const int64_t* ids = reinterpret_cast<const int64_t*>(unique_ids.data);
+ EXPECT_EQ(ids[0], 42);
+ EXPECT_EQ(ids[1], 142);
+
+ ArrowBufferReset(&unique_ids);
+ ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) {
+ using namespace nanoarrow::literals;
+
+ struct ArrowIpcDictionaryEncodings dictionary_encodings;
+ struct ArrowSchema schema;
+ struct ArrowIpcDictionaries dictionaries;
struct ArrowIpcDecoder decoder;
struct ArrowError error;
+ // Make a dictionary encoded schema that matches that of the dictionary
example batch
+ ArrowSchemaInit(&schema);
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaAllocateDictionary(schema.children[0]), NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowSchemaInitFromType(schema.children[0]->dictionary,
NANOARROW_TYPE_STRING),
+ NANOARROW_OK);
+
+ // Initialize the dictionary encodings with the single dictionary
+ ArrowIpcDictionaryEncodingsInit(&dictionary_encodings);
+ struct ArrowIpcDictionaryEncoding encoding;
+ encoding.id = 0;
+ encoding.kind = NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY;
+ encoding.schema = schema.children[0];
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(&dictionary_encodings, encoding),
+ NANOARROW_OK);
+
+ // Initialize the dictionaires with the encodings
+ ASSERT_EQ(ArrowIpcDictionariesInit(&dictionaries, &dictionary_encodings,
&error),
+ NANOARROW_OK)
+ << error.message;
+
+ // Check that we can't decode a dictionary batch if we haven't read a
dictionary batch
+ // message
+ ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);
+ struct ArrowIpcSharedBuffer shared;
+ ASSERT_EQ(
+ ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
+ &dictionaries, &error),
+ EINVAL);
+ ASSERT_STREQ(error.message, "decoder did not just decode a DictionaryBatch
message");
+
+ // Decode a dictionary batch and inspect metadata
struct ArrowBufferView data;
data.data.as_uint8 = kDictionaryBatch;
data.size_bytes = sizeof(kDictionaryBatch);
- ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);
-
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
ASSERT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH);
@@ -659,9 +740,65 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) {
EXPECT_EQ(decoder.dictionary->id, 0);
EXPECT_FALSE(decoder.dictionary->is_delta);
- // TODO: Access RecordBatch content
- // https://github.com/apache/arrow-nanoarrow/issues/845
+ // Decode the dictionary batch
+ data.data.as_uint8 += decoder.header_size_bytes;
+ data.size_bytes = decoder.body_size_bytes;
+ struct ArrowBuffer body;
+ ArrowBufferInit(&body);
+ ASSERT_EQ(ArrowBufferAppendBufferView(&body, data), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
+ &dictionaries, &error),
+ NANOARROW_OK);
+
+ // If we find the current value of the dictionary we should get the correct
array
+ const struct ArrowArray* dictionary_value;
+ ASSERT_EQ(
+ ArrowIpcDictionariesFindCurrentValue(&dictionaries, 0,
&dictionary_value, &error),
+ NANOARROW_OK);
+
+ struct ArrowArrayView array_view;
+ ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_STRING);
+ ASSERT_EQ(ArrowArrayViewSetArray(&array_view, dictionary_value, &error),
NANOARROW_OK);
+
+ ASSERT_EQ(array_view.length, 3);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 0), "zero"_asv);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 1), "one"_asv);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 2), "two"_asv);
+ // If we try to find the current value of a different dictionary we should
get an error
+ ASSERT_EQ(ArrowIpcDictionariesFindCurrentValue(&dictionaries, 9999,
&dictionary_value,
+ &error),
+ EINVAL);
+
+ // If we try to decode the dictionary again it should succeed (because the
dictionary
+ // is in replacement mode)
+ ASSERT_EQ(
+ ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
+ &dictionaries, &error),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayViewSetArray(&array_view, dictionary_value, &error),
NANOARROW_OK);
+
+ ASSERT_EQ(array_view.length, 3);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 0), "zero"_asv);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 1), "one"_asv);
+ ASSERT_EQ(ArrowArrayViewGetStringUnsafe(&array_view, 2), "two"_asv);
+
+ // If we try to decode a delta dictionary, we should fail with a reasonable
message
+ const_cast<struct ArrowIpcDictionaryBatch*>(decoder.dictionary)->is_delta =
1;
+ ASSERT_EQ(
+ ArrowIpcDecoderDecodeDictionary(&decoder, &shared,
NANOARROW_VALIDATION_LEVEL_FULL,
+ &dictionaries, &error),
+ ENOTSUP);
+ ASSERT_STREQ(error.message, "Dictionary concatenation is not yet supported");
+
+ ArrowArrayViewReset(&array_view);
+ ArrowIpcSharedBufferReset(&shared);
+ ArrowIpcDictionariesReset(&dictionaries);
+ ArrowIpcDictionaryEncodingsReset(&dictionary_encodings);
+ ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}
diff --git a/src/nanoarrow/ipc/ipc_hpp_test.cc
b/src/nanoarrow/ipc/ipc_hpp_test.cc
index b0a84090..0ff57ded 100644
--- a/src/nanoarrow/ipc/ipc_hpp_test.cc
+++ b/src/nanoarrow/ipc/ipc_hpp_test.cc
@@ -101,3 +101,30 @@ TEST(NanoarrowIpcHppTest,
NanoarrowIpcHppTestUniqueDictionaryEncodings) {
dictionary_encodings->encodings.data, //
NOLINT(clang-analyzer-cplusplus.Move)
nullptr);
}
+
+TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueDictionaries) {
+ nanoarrow::UniqueSchema schema;
+ ArrowSchemaInit(schema.get());
+ ASSERT_EQ(ArrowSchemaAllocateDictionary(schema.get()), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaInitFromType(schema->dictionary, NANOARROW_TYPE_STRING),
+ NANOARROW_OK);
+
+ nanoarrow::ipc::UniqueDictionaryEncodings dictionary_encodings;
+ ASSERT_EQ(ArrowIpcDictionaryEncodingsAppend(
+ dictionary_encodings.get(),
+ {schema.get(), 1, NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY}),
+ NANOARROW_OK);
+
+ nanoarrow::ipc::UniqueDictionaries dictionaries;
+ struct ArrowError error;
+ ASSERT_EQ(
+ ArrowIpcDictionariesInit(dictionaries.get(), dictionary_encodings.get(),
&error),
+ NANOARROW_OK)
+ << error.message;
+ ASSERT_NE(dictionaries->private_data, nullptr);
+
+ nanoarrow::ipc::UniqueDictionaries dictionaries2 = std::move(dictionaries);
+ EXPECT_NE(dictionaries2->private_data, nullptr);
+ EXPECT_EQ(dictionaries->private_data, //
NOLINT(clang-analyzer-cplusplus.Move)
+ nullptr);
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index ba94e5b5..e2b114cd 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -115,8 +115,20 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsAppend)
#define ArrowIpcDictionaryEncodingsFind \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsFind)
+#define ArrowIpcDictionaryEncodingsFindById \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsFindById)
+#define ArrowIpcDictionaryEncodingsUniqueIds \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsUniqueIds)
#define ArrowIpcDictionaryEncodingsReset \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsReset)
+#define ArrowIpcDictionariesInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionariesInit)
+#define ArrowIpcDictionariesFindCurrentValue \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionariesFindCurrentValue)
+#define ArrowIpcDictionariesReset \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionariesReset)
+#define ArrowIpcDecoderDecodeDictionary \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionary)
#endif
@@ -224,24 +236,70 @@ struct ArrowIpcDictionaryEncodings {
/// \brief Initialize an ArrowIpcDictionaryEncodings list
NANOARROW_DLL void ArrowIpcDictionaryEncodingsInit(
- struct ArrowIpcDictionaryEncodings* dictionaries);
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings);
/// \brief Append a given ArrowIpcDictionaryEncoding to this list
-NANOARROW_DLL ArrowErrorCode
-ArrowIpcDictionaryEncodingsAppend(struct ArrowIpcDictionaryEncodings*
dictionaries,
- struct ArrowIpcDictionaryEncoding encoding);
+NANOARROW_DLL ArrowErrorCode ArrowIpcDictionaryEncodingsAppend(
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ struct ArrowIpcDictionaryEncoding encoding);
/// \brief Resolve a ArrowIpcDictionaryEncoding for a given dictionary encoded
field
///
/// Returns NULL if the pointed to schema does not match any of the pointed to
/// schemas contained in this list.
NANOARROW_DLL const struct ArrowIpcDictionaryEncoding*
ArrowIpcDictionaryEncodingsFind(
- const struct ArrowIpcDictionaryEncodings* dictionaries,
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
const struct ArrowSchema* schema);
+/// \brief Resolve the first ArrowIpcDictionaryEncoding for a given identifier
+///
+/// Note that there may be multiple dictionary-encoded fields with the same
+/// identifier in a given schema; however, all of them should have an identical
+/// value and index data type.
+///
+/// Returns NULL if id does not refer to any of the encodings in this list.
+NANOARROW_DLL const struct ArrowIpcDictionaryEncoding*
+ArrowIpcDictionaryEncodingsFindById(
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings, int64_t
id);
+
+/// \brief Append a list of unique int64_t identifiers to out
+///
+/// Walk all dictionary encodings in this list to find a set of unique
identifiers.
+/// This is primarily for internal use to initialize the ArrowIpcDictionaries
but
+/// is exposed for testing purposes.
+NANOARROW_DLL ArrowErrorCode ArrowIpcDictionaryEncodingsUniqueIds(
+ const struct ArrowIpcDictionaryEncodings* dictionary_encodings,
+ struct ArrowBuffer* out);
+
/// \brief Release an encodings list and associated resources
NANOARROW_DLL void ArrowIpcDictionaryEncodingsReset(
- struct ArrowIpcDictionaryEncodings* dictionaries);
+ struct ArrowIpcDictionaryEncodings* dictionary_encodings);
+
+/// \brief Dictionaries and their current values
+///
+/// This structure is the basis for decoding dictionary batches and decoding
+/// record batch messages where some columns are dictionary-encoded. This
+/// structure stores a dictionary-specific ArrowIpcDecoder for each dictionary
+/// identifier and its last seen value.
+struct ArrowIpcDictionaries {
+ void* private_data;
+};
+
+/// \brief Initialize an ArrowIpcDictionaries
+///
+/// Initialize the structure with the dictionaries from an
ArrowIpcDictionaryEncodings.
+NANOARROW_DLL ArrowErrorCode
+ArrowIpcDictionariesInit(struct ArrowIpcDictionaries* dictionaries,
+ const struct ArrowIpcDictionaryEncodings*
dictionary_encodings,
+ struct ArrowError* error);
+
+/// \brief Find the current value of a dictionary with the given ID as an
ArrowArray
+NANOARROW_DLL ArrowErrorCode ArrowIpcDictionariesFindCurrentValue(
+ struct ArrowIpcDictionaries* dictionaries, int64_t id, const struct
ArrowArray** out,
+ struct ArrowError* error);
+
+/// \brief Release resources associated with an ArrowIpcDictionaries
+NANOARROW_DLL void ArrowIpcDictionariesReset(struct ArrowIpcDictionaries*
dictionaries);
/// \brief Checks the nanoarrow runtime to make sure the run/build versions
match
NANOARROW_DLL ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
@@ -574,6 +632,11 @@ NANOARROW_DLL ArrowErrorCode
ArrowIpcDecoderDecodeArrayFromShared(
struct ArrowArray* out, enum ArrowValidationLevel validation_level,
struct ArrowError* error);
+NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared,
+ enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries*
out,
+ struct ArrowError* error);
+
/// \brief An user-extensible input data source
struct ArrowIpcInputStream {
/// \brief Read up to buf_size_bytes from stream into buf
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index 84f1d90b..604a2147 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -74,6 +74,25 @@ inline void release_pointer(struct
ArrowIpcDictionaryEncodings* data) {
ArrowIpcDictionaryEncodingsReset(data);
}
+template <>
+inline void init_pointer(struct ArrowIpcDictionaries* data) {
+ data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcDictionaries* src,
+ struct ArrowIpcDictionaries* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcDictionaries));
+ src->private_data = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcDictionaries* data) {
+ if (data->private_data != nullptr) {
+ ArrowIpcDictionariesReset(data);
+ }
+}
+
template <>
inline void init_pointer(struct ArrowIpcFooter* data) {
ArrowIpcFooterInit(data);
@@ -206,6 +225,9 @@ using UniqueFooter = internal::Unique<struct
ArrowIpcFooter>;
/// \brief Class wrapping a unique struct ArrowIpcDictionaryEncodings
using UniqueDictionaryEncodings = internal::Unique<struct
ArrowIpcDictionaryEncodings>;
+/// \brief Class wrapping a unique struct UniqueDictionaries
+using UniqueDictionaries = internal::Unique<struct ArrowIpcDictionaries>;
+
/// \brief Class wrapping a unique struct ArrowIpcEncoder
using UniqueEncoder = internal::Unique<struct ArrowIpcEncoder>;