misiek1984 commented on code in PR #50121:
URL: https://github.com/apache/arrow/pull/50121#discussion_r3441556547


##########
cpp/src/arrow/extension/variant_internal.h:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow::extension::variant_internal {
+
+/// \file variant_internal.h
+/// \brief Utilities for Variant binary encoding/decoding.
+///
+/// Implements parsing logic per the Variant Encoding Spec:
+/// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
+///
+/// The "internal" in the filename refers to the binary encoding internals
+/// of the Variant type, not the visibility of this header. This header is
+/// installed and provides the public C++ API for working with Variant
+/// binary data (independent of the VariantExtensionType in parquet_variant.h).
+
+// ---------------------------------------------------------------------------
+// Constants
+// ---------------------------------------------------------------------------
+
+/// Variant encoding spec version 1.
+constexpr uint8_t kVariantVersion = 1;
+
+/// Maximum nesting depth for recursive value decoding.
+/// Prevents stack overflow on deeply nested (possibly malicious) input.
+constexpr int32_t kMaxNestingDepth = 128;
+
+// ---------------------------------------------------------------------------
+// Enumerations
+// ---------------------------------------------------------------------------
+
+/// \brief Basic type codes from bits 0-1 of the value header byte.
+///
+/// Variant Encoding Spec §3: "Value encoding"

Review Comment:
   nit: The current version of the spec does not contain paragraph §3 and §3.1. 
I would just add a link to the section with tables: 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types



##########
cpp/src/arrow/extension/variant_internal.h:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow::extension::variant_internal {
+
+/// \file variant_internal.h
+/// \brief Utilities for Variant binary encoding/decoding.
+///
+/// Implements parsing logic per the Variant Encoding Spec:
+/// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
+///
+/// The "internal" in the filename refers to the binary encoding internals
+/// of the Variant type, not the visibility of this header. This header is
+/// installed and provides the public C++ API for working with Variant
+/// binary data (independent of the VariantExtensionType in parquet_variant.h).
+
+// ---------------------------------------------------------------------------
+// Constants
+// ---------------------------------------------------------------------------
+
+/// Variant encoding spec version 1.
+constexpr uint8_t kVariantVersion = 1;
+
+/// Maximum nesting depth for recursive value decoding.
+/// Prevents stack overflow on deeply nested (possibly malicious) input.
+constexpr int32_t kMaxNestingDepth = 128;
+
+// ---------------------------------------------------------------------------
+// Enumerations
+// ---------------------------------------------------------------------------
+
+/// \brief Basic type codes from bits 0-1 of the value header byte.
+///
+/// Variant Encoding Spec §3: "Value encoding"
+enum class BasicType : uint8_t {
+  kPrimitive = 0,
+  kShortString = 1,
+  kObject = 2,
+  kArray = 3,
+};
+
+/// \brief Primitive type codes from bits 2-7 when basic_type == kPrimitive.
+///
+/// Variant Encoding Spec §3.1: "Primitive types"
+enum class PrimitiveType : uint8_t {
+  kNull = 0,
+  kTrue = 1,
+  kFalse = 2,
+  kInt8 = 3,
+  kInt16 = 4,
+  kInt32 = 5,
+  kInt64 = 6,
+  kDouble = 7,
+  kDecimal4 = 8,
+  kDecimal8 = 9,
+  kDecimal16 = 10,
+  kDate = 11,
+  kTimestampMicros = 12,
+  kTimestampMicrosNTZ = 13,
+  kFloat = 14,
+  kBinary = 15,
+  kString = 16,
+  kTimeNTZ = 17,
+  kTimestampNanos = 18,
+  kTimestampNanosNTZ = 19,
+  kUUID = 20,
+};
+
+// ---------------------------------------------------------------------------
+// Metadata
+// ---------------------------------------------------------------------------
+
+/// \brief Parsed variant metadata (string dictionary).
+///
+/// The metadata buffer contains a header byte followed by a dictionary of
+/// interned strings. String views reference the raw buffer and are valid
+/// only as long as the underlying buffer is alive.
+struct ARROW_EXPORT VariantMetadata {
+  /// Spec version (must be kVariantVersion).
+  uint8_t version = 0;
+
+  /// Whether the dictionary strings are sorted lexicographically.
+  bool is_sorted = false;
+
+  /// Number of bytes used for each offset (1, 2, 3, or 4).
+  int32_t offset_size = 0;
+
+  /// Dictionary of interned strings. Views into the raw metadata buffer.
+  std::vector<std::string_view> strings;
+};
+
+/// \brief Decode a variant metadata buffer.
+///
+/// Parses the header byte and string dictionary from the raw metadata
+/// buffer. The returned VariantMetadata contains string_views that
+/// reference the input buffer directly (zero-copy).
+///
+/// \param[in] data Pointer to the metadata buffer (must not be null)
+/// \param[in] length Length of the metadata buffer in bytes
+/// \return Parsed VariantMetadata on success, Status::Invalid on
+///         malformed input
+///
+/// \note The input buffer must outlive the returned VariantMetadata.
+ARROW_EXPORT Result<VariantMetadata> DecodeMetadata(const uint8_t* data, 
int64_t length);
+
+// ---------------------------------------------------------------------------
+// Value header utilities
+// ---------------------------------------------------------------------------
+
+/// \brief Extract the basic type from a value header byte.
+///
+/// \param[in] header The first byte of a variant value
+/// \return The BasicType (bits 0-1)
+inline BasicType GetBasicType(uint8_t header) {
+  return static_cast<BasicType>(header & 0x03);
+}
+
+/// \brief Extract the primitive type from a value header byte.
+///
+/// Only valid when GetBasicType(header) == BasicType::kPrimitive.
+///
+/// \param[in] header The first byte of a variant value
+/// \return The PrimitiveType (bits 2-7)
+inline PrimitiveType GetPrimitiveType(uint8_t header) {
+  return static_cast<PrimitiveType>((header >> 2) & 0x3F);
+}
+
+/// \brief Get the byte size of a primitive value (excluding header).
+///
+/// \param[in] primitive_type The primitive type code
+/// \return Number of bytes for the value payload, or -1 for
+///         variable-length types (Binary, String)
+ARROW_EXPORT int32_t PrimitiveValueSize(PrimitiveType primitive_type);
+
+// ---------------------------------------------------------------------------
+// Value decoding
+// ---------------------------------------------------------------------------
+
+/// \brief Visitor interface for variant value decoding.
+///
+/// Implement this interface to receive callbacks during variant value
+/// traversal. The visitor pattern avoids materializing a tree of objects,
+/// which is important when scanning millions of rows.
+///
+/// All methods return Status::OK() to continue traversal, or any error
+/// Status to abort.
+///
+/// \note String values passed to String() and FieldName() are raw bytes from
+///       the variant buffer without UTF-8 validation. Per spec, all strings
+///       must be valid UTF-8, but validation is the responsibility of a
+///       higher-level consumer (e.g., when materializing to Arrow 
StringArray).
+class ARROW_EXPORT VariantVisitor {
+ public:
+  virtual ~VariantVisitor() = default;
+
+  /// @name Primitive value callbacks
+  /// @{
+  virtual Status Null() = 0;
+  virtual Status Bool(bool value) = 0;
+  virtual Status Int8(int8_t value) = 0;
+  virtual Status Int16(int16_t value) = 0;
+  virtual Status Int32(int32_t value) = 0;
+  virtual Status Int64(int64_t value) = 0;
+  virtual Status Float(float value) = 0;
+  virtual Status Double(double value) = 0;
+  virtual Status Decimal4(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Decimal8(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Decimal16(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Date(int32_t days_since_epoch) = 0;
+  virtual Status TimestampMicros(int64_t micros_since_epoch) = 0;
+  virtual Status TimestampMicrosNTZ(int64_t micros_since_epoch) = 0;
+  virtual Status String(std::string_view value) = 0;
+  virtual Status Binary(std::string_view value) = 0;
+  virtual Status TimeNTZ(int64_t micros_since_midnight) = 0;
+  virtual Status TimestampNanos(int64_t nanos_since_epoch) = 0;
+  virtual Status TimestampNanosNTZ(int64_t nanos_since_epoch) = 0;
+  virtual Status UUID(const uint8_t* bytes) = 0;
+  /// @}
+
+  /// @name Container callbacks
+  /// @{
+
+  /// \brief Called at the start of an object with the number of fields.
+  virtual Status StartObject(int32_t num_fields) = 0;
+
+  /// \brief Called for each object field name, before the field value.
+  virtual Status FieldName(std::string_view name) = 0;
+
+  /// \brief Called after all fields of an object have been visited.
+  virtual Status EndObject() = 0;
+
+  /// \brief Called at the start of an array with the number of elements.
+  virtual Status StartArray(int32_t num_elements) = 0;
+
+  /// \brief Called after all elements of an array have been visited.
+  virtual Status EndArray() = 0;
+  /// @}
+};
+
+/// \brief Decode a variant value buffer using a visitor.
+///
+/// Recursively traverses the variant value, calling the appropriate
+/// visitor methods for each element. Objects and arrays trigger
+/// Start/End pairs with nested visits for their contents.
+///
+/// \param[in] metadata Parsed metadata (for resolving string dictionary)
+/// \param[in] data Pointer to the value buffer
+/// \param[in] length Length of the value buffer in bytes
+/// \param[in] visitor Callback interface for decoded values
+/// \return Status::OK on success, Status::Invalid on malformed input
+///
+/// \note The data buffer must remain valid for the duration of the call.
+ARROW_EXPORT Status DecodeVariantValue(const VariantMetadata& metadata,
+                                       const uint8_t* data, int64_t length,
+                                       VariantVisitor* visitor);
+
+/// \brief Get the basic type of a variant value without full decoding.
+///
+/// \param[in] data Pointer to the value buffer
+/// \param[in] length Length of the value buffer in bytes
+/// \return The BasicType of the value, or Status::Invalid if the
+///         buffer is empty
+ARROW_EXPORT Result<BasicType> GetValueBasicType(const uint8_t* data, int64_t 
length);
+
+/// \brief Get the number of fields in a variant object.
+///
+/// \param[in] data Pointer to the value buffer (must start with an object)
+/// \param[in] length Length of the value buffer in bytes
+/// \return The number of fields, or Status::Invalid if not an object
+ARROW_EXPORT Result<int32_t> GetObjectFieldCount(const uint8_t* data, int64_t 
length);
+
+/// \brief Get the number of elements in a variant array.
+///
+/// \param[in] data Pointer to the value buffer (must start with an array)
+/// \param[in] length Length of the value buffer in bytes
+/// \return The number of elements, or Status::Invalid if not an array
+ARROW_EXPORT Result<int32_t> GetArrayElementCount(const uint8_t* data, int64_t 
length);
+
+// ---------------------------------------------------------------------------
+// Value size computation
+// ---------------------------------------------------------------------------
+
+/// \brief Compute the total byte size of a variant value (header + data).
+///
+/// Determines how many bytes a variant value occupies by examining
+/// its header and (for containers/variable-length types) reading
+/// size information. Does NOT recursively validate the contents.
+///
+/// \param[in] data Pointer to the start of a variant value
+/// \param[in] length Maximum bytes available
+/// \return Total byte count of the value, or Status::Invalid if truncated
+ARROW_EXPORT Result<int64_t> ValueSize(const uint8_t* data, int64_t length);
+
+// ---------------------------------------------------------------------------
+// Random access utilities
+// ---------------------------------------------------------------------------
+
+/// \brief Find an object field by name and return the offset/size of its 
value.
+///
+/// Searches the field IDs in the object, resolving each against the
+/// metadata dictionary. Per spec, field IDs are in lexicographic order
+/// of their corresponding key names, enabling binary search for large
+/// objects (>=32 fields). For smaller objects, linear scan is used.

Review Comment:
   How was the 32 threshold determined?



##########
cpp/src/arrow/extension/variant_internal.cc:
##########
@@ -0,0 +1,1020 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/extension/variant_internal.h"
+
+#include <cstring>
+
+#include "arrow/util/endian.h"
+#include "arrow/util/logging_internal.h"
+
+namespace arrow::extension::variant_internal {
+
+namespace {
+
+// ---------------------------------------------------------------------------
+// Helpers for reading little-endian integers of variable size (1-4 bytes)
+// ---------------------------------------------------------------------------
+
+/// \brief Read an unsigned integer of 1-4 bytes in little-endian order.
+///
+/// On big-endian platforms, FromLittleEndian byte-swaps the full 32-bit
+/// word after memcpy; the mask then discards any bytes beyond num_bytes.
+///
+/// \param[in] data Pointer to the bytes (must have at least num_bytes valid)
+/// \param[in] num_bytes Number of bytes to read (1, 2, 3, or 4)
+/// \return The decoded unsigned integer value
+inline uint32_t ReadUnsignedLE(const uint8_t* data, int32_t num_bytes) {
+  uint32_t result = 0;
+  std::memcpy(&result, data, num_bytes);
+  result = bit_util::FromLittleEndian(result);
+  if (num_bytes < 4) {
+    result &= (static_cast<uint32_t>(1) << (num_bytes * 8)) - 1;
+  }
+  return result;
+}
+
+/// \brief Validate that an offset array is monotonically non-decreasing
+///        and within the buffer bounds.
+Status ValidateOffsets(const std::vector<uint32_t>& offsets, int64_t 
data_length) {
+  for (size_t i = 1; i < offsets.size(); ++i) {
+    if (offsets[i] < offsets[i - 1]) {
+      return Status::Invalid(
+          "Variant metadata: string offsets are not monotonically "
+          "non-decreasing at index ",
+          i);
+    }
+  }
+  if (!offsets.empty() && offsets.back() > static_cast<uint32_t>(data_length)) 
{
+    return Status::Invalid("Variant metadata: last string offset ", 
offsets.back(),
+                           " exceeds data length ", data_length);
+  }
+  return Status::OK();
+}
+
+// ---------------------------------------------------------------------------
+// Value decoding helpers
+// ---------------------------------------------------------------------------
+
+/// \brief Decode a single variant value at the given offset and invoke
+///        the visitor. Returns the number of bytes consumed.
+///
+/// This is the core recursive function.
+Status DecodeValueAt(const VariantMetadata& metadata, const uint8_t* data, 
int64_t length,

Review Comment:
   I think this function should be public. Let's assume I want to read the 
value of a specific nested field from a Variant using a path (e.g., 
field_1.field_2.field_3).
   
   My current understanding is that I would first need to call 
`FindObjectField` to locate "field_1". If it exists, I then have to find 
"field_2", and finally "field_3". However, I have to implement the last 
step—reading the actual value—on my own because `DecodeValueAt` is not public, 
and `DecodeVariantValue` only allows for decoding the entire Variant.



##########
cpp/src/arrow/extension/variant_internal_test.cc:
##########
@@ -0,0 +1,2128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/extension/variant_internal.h"
+#include "arrow/extension/variant_test_util.h"
+
+#include <cmath>
+#include <cstring>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "arrow/testing/gtest_util.h"
+
+namespace arrow::extension::variant_internal {
+
+// ===========================================================================
+// Test helpers
+// ===========================================================================
+
+/// \brief Build a metadata buffer from a list of strings.
+///
+/// Uses offset_size=1, version=1, sorted flag as specified.
+std::vector<uint8_t> BuildMetadataBuffer(const std::vector<std::string>& 
strings,
+                                         bool sorted = false, int32_t 
offset_size = 1) {
+  std::vector<uint8_t> buffer;
+
+  // Header byte: version=1, sorted flag, offset_size
+  uint8_t header = kVariantVersion;
+  if (sorted) {
+    header |= (1 << 4);
+  }
+  header |= static_cast<uint8_t>((offset_size - 1) << 6);
+  buffer.push_back(header);
+
+  // Dictionary size
+  auto dict_size = static_cast<uint32_t>(strings.size());
+  for (int32_t b = 0; b < offset_size; ++b) {
+    buffer.push_back(static_cast<uint8_t>((dict_size >> (b * 8)) & 0xFF));
+  }
+
+  // Compute string offsets
+  std::vector<uint32_t> offsets(dict_size + 1);
+  offsets[0] = 0;
+  for (uint32_t i = 0; i < dict_size; ++i) {
+    offsets[i + 1] = offsets[i] + static_cast<uint32_t>(strings[i].size());
+  }
+
+  // Write offsets
+  for (uint32_t i = 0; i <= dict_size; ++i) {
+    for (int32_t b = 0; b < offset_size; ++b) {
+      buffer.push_back(static_cast<uint8_t>((offsets[i] >> (b * 8)) & 0xFF));
+    }
+  }
+
+  // Write string data
+  for (const auto& s : strings) {
+    buffer.insert(buffer.end(), s.begin(), s.end());
+  }
+
+  return buffer;
+}
+
+/// \brief Build a primitive value header byte.
+uint8_t PrimitiveHeader(PrimitiveType type) {
+  return static_cast<uint8_t>(BasicType::kPrimitive) | 
(static_cast<uint8_t>(type) << 2);
+}
+
+/// \brief Build a short string value buffer.
+std::vector<uint8_t> BuildShortString(const std::string& s) {
+  std::vector<uint8_t> buffer;
+  auto len = static_cast<uint8_t>(s.size());
+  uint8_t header = static_cast<uint8_t>(BasicType::kShortString) | (len << 2);
+  buffer.push_back(header);
+  buffer.insert(buffer.end(), s.begin(), s.end());
+  return buffer;
+}
+
+/// \brief Build an object value buffer.
+///
+/// \param field_ids Dictionary indices for each field name
+/// \param field_values Serialized variant values for each field
+/// \param field_id_size Bytes per field ID (1-4)
+/// \param field_offset_size Bytes per offset (1-4)
+std::vector<uint8_t> BuildObject(const std::vector<uint32_t>& field_ids,
+                                 const std::vector<std::vector<uint8_t>>& 
field_values,
+                                 int32_t field_id_size = 1,
+                                 int32_t field_offset_size = 1) {
+  auto num_fields = static_cast<uint32_t>(field_ids.size());
+  bool is_large = (num_fields > 255);
+
+  std::vector<uint8_t> buffer;
+
+  // Header per spec: basic_type=2 in bits 0-1,
+  //   bits 2-3: field_offset_size-1
+  //   bits 4-5: field_id_size-1
+  //   bit 6: is_large
+  uint8_t header = static_cast<uint8_t>(BasicType::kObject);
+  header |= static_cast<uint8_t>((field_offset_size - 1) << 2);
+  header |= static_cast<uint8_t>((field_id_size - 1) << 4);
+  if (is_large) {
+    header |= (1 << 6);
+  }
+  buffer.push_back(header);
+
+  // num_fields: 1 byte or 4 bytes depending on is_large
+  int32_t num_fields_size = is_large ? 4 : 1;
+  for (int32_t b = 0; b < num_fields_size; ++b) {
+    buffer.push_back(static_cast<uint8_t>((num_fields >> (b * 8)) & 0xFF));
+  }
+
+  // field_ids
+  for (auto fid : field_ids) {
+    for (int32_t b = 0; b < field_id_size; ++b) {
+      buffer.push_back(static_cast<uint8_t>((fid >> (b * 8)) & 0xFF));
+    }
+  }
+
+  // Compute offsets
+  std::vector<uint32_t> offsets(num_fields + 1);
+  offsets[0] = 0;
+  for (uint32_t i = 0; i < num_fields; ++i) {
+    offsets[i + 1] = offsets[i] + 
static_cast<uint32_t>(field_values[i].size());
+  }
+
+  // Write offsets
+  for (uint32_t i = 0; i <= num_fields; ++i) {
+    for (int32_t b = 0; b < field_offset_size; ++b) {
+      buffer.push_back(static_cast<uint8_t>((offsets[i] >> (b * 8)) & 0xFF));
+    }
+  }
+
+  // Write field value data
+  for (const auto& fv : field_values) {
+    buffer.insert(buffer.end(), fv.begin(), fv.end());
+  }
+
+  return buffer;
+}
+
+/// \brief Build an array value buffer.
+///
+/// \param elements Serialized variant values for each element
+/// \param field_offset_size Bytes per offset (1-4)
+std::vector<uint8_t> BuildArray(const std::vector<std::vector<uint8_t>>& 
elements,
+                                int32_t field_offset_size = 1) {
+  auto num_elements = static_cast<uint32_t>(elements.size());
+  bool is_large = (num_elements > 255);
+
+  std::vector<uint8_t> buffer;
+
+  // Header per spec: basic_type=3 in bits 0-1,
+  //   bits 2-3: field_offset_size-1
+  //   bit 4: is_large
+  uint8_t header = static_cast<uint8_t>(BasicType::kArray);
+  header |= static_cast<uint8_t>((field_offset_size - 1) << 2);
+  if (is_large) {
+    header |= (1 << 4);
+  }
+  buffer.push_back(header);
+
+  // num_elements: 1 byte or 4 bytes depending on is_large
+  int32_t num_elements_size = is_large ? 4 : 1;
+  for (int32_t b = 0; b < num_elements_size; ++b) {
+    buffer.push_back(static_cast<uint8_t>((num_elements >> (b * 8)) & 0xFF));
+  }
+
+  // Compute offsets
+  std::vector<uint32_t> offsets(num_elements + 1);
+  offsets[0] = 0;
+  for (uint32_t i = 0; i < num_elements; ++i) {
+    offsets[i + 1] = offsets[i] + static_cast<uint32_t>(elements[i].size());
+  }
+
+  // Write offsets
+  for (uint32_t i = 0; i <= num_elements; ++i) {
+    for (int32_t b = 0; b < field_offset_size; ++b) {
+      buffer.push_back(static_cast<uint8_t>((offsets[i] >> (b * 8)) & 0xFF));
+    }
+  }
+
+  // Write element data
+  for (const auto& elem : elements) {
+    buffer.insert(buffer.end(), elem.begin(), elem.end());
+  }
+
+  return buffer;
+}
+
+// ===========================================================================
+// Metadata decoding tests
+// ===========================================================================
+
+class VariantMetadataTest : public ::testing::Test {};
+
+TEST_F(VariantMetadataTest, EmptyDictionary) {
+  auto buf = BuildMetadataBuffer({});
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.version, 1);
+  ASSERT_FALSE(metadata.is_sorted);
+  ASSERT_EQ(metadata.offset_size, 1);
+  ASSERT_EQ(metadata.strings.size(), 0);
+}
+
+TEST_F(VariantMetadataTest, SingleString) {
+  auto buf = BuildMetadataBuffer({"hello"});
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.strings.size(), 1);
+  ASSERT_EQ(metadata.strings[0], "hello");
+}
+
+TEST_F(VariantMetadataTest, MultipleStrings) {
+  auto buf = BuildMetadataBuffer({"name", "age", "scores"});
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.strings.size(), 3);
+  ASSERT_EQ(metadata.strings[0], "name");
+  ASSERT_EQ(metadata.strings[1], "age");
+  ASSERT_EQ(metadata.strings[2], "scores");
+}
+
+TEST_F(VariantMetadataTest, SortedFlag) {
+  auto buf = BuildMetadataBuffer({"age", "name", "score"}, true);
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_TRUE(metadata.is_sorted);
+}
+
+TEST_F(VariantMetadataTest, OffsetSize2) {
+  auto buf = BuildMetadataBuffer({"key1", "key2"}, false, 2);
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.offset_size, 2);
+  ASSERT_EQ(metadata.strings.size(), 2);
+  ASSERT_EQ(metadata.strings[0], "key1");
+  ASSERT_EQ(metadata.strings[1], "key2");
+}
+
+TEST_F(VariantMetadataTest, OffsetSize4) {
+  auto buf = BuildMetadataBuffer({"a", "bb", "ccc"}, false, 4);
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.offset_size, 4);
+  ASSERT_EQ(metadata.strings.size(), 3);
+  ASSERT_EQ(metadata.strings[0], "a");
+  ASSERT_EQ(metadata.strings[1], "bb");
+  ASSERT_EQ(metadata.strings[2], "ccc");
+}
+
+TEST_F(VariantMetadataTest, EmptyStrings) {
+  auto buf = BuildMetadataBuffer({"", "nonempty", ""});
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.strings.size(), 3);
+  ASSERT_EQ(metadata.strings[0], "");
+  ASSERT_EQ(metadata.strings[1], "nonempty");
+  ASSERT_EQ(metadata.strings[2], "");
+}
+
+// Error cases
+
+TEST_F(VariantMetadataTest, NullBuffer) {
+  ASSERT_RAISES(Invalid, DecodeMetadata(nullptr, 0));
+}
+
+TEST_F(VariantMetadataTest, EmptyBuffer) {
+  uint8_t data = 0;
+  ASSERT_RAISES(Invalid, DecodeMetadata(&data, 0));
+}
+
+TEST_F(VariantMetadataTest, UnsupportedVersion) {
+  // Version 2 (unsupported)
+  uint8_t data[] = {0x02, 0x00};
+  ASSERT_RAISES(Invalid, DecodeMetadata(data, sizeof(data)));
+}
+
+TEST_F(VariantMetadataTest, TruncatedDictionarySize) {
+  // Header says offset_size=2 (bits 6-7 = 01), but only 1 byte follows
+  uint8_t data[] = {0x41, 0x00};  // version=1, offset_size=2
+  ASSERT_RAISES(Invalid, DecodeMetadata(data, sizeof(data)));
+}
+
+TEST_F(VariantMetadataTest, TruncatedStringOffsets) {
+  // Claims dict_size=5 but buffer is too short for offsets
+  uint8_t data[] = {0x01, 0x05, 0x00};
+  ASSERT_RAISES(Invalid, DecodeMetadata(data, sizeof(data)));
+}
+
+TEST_F(VariantMetadataTest, OffsetSize3) {
+  auto buf = BuildMetadataBuffer({"foo", "bar"}, false, 3);
+  ASSERT_OK_AND_ASSIGN(auto metadata, DecodeMetadata(buf.data(), buf.size()));
+  ASSERT_EQ(metadata.offset_size, 3);
+  ASSERT_EQ(metadata.strings.size(), 2);
+  ASSERT_EQ(metadata.strings[0], "foo");
+  ASSERT_EQ(metadata.strings[1], "bar");
+}
+
+TEST_F(VariantMetadataTest, ReservedBit5Set) {
+  // Header with bit 5 set: 0x21 = version=1, bit5=1
+  uint8_t data[] = {0x21, 0x00, 0x00};
+  ASSERT_RAISES(Invalid, DecodeMetadata(data, sizeof(data)));
+}
+
+TEST_F(VariantMetadataTest, NonMonotonicStringOffsets) {
+  // Manually construct metadata where string offsets are NOT monotonically
+  // non-decreasing. ValidateOffsets should reject this.
+  // Header: version=1, offset_size=1
+  // dict_size=2, offsets=[0, 5, 3] — 3 < 5, non-monotonic
+  // String data: "helloabc" (8 bytes, but offsets claim 3 as last)
+  uint8_t data[] = {
+      0x01,              // header: version=1, offset_size=1
+      0x02,              // dict_size = 2
+      0x00, 0x05, 0x03,  // offsets: [0, 5, 3] — non-monotonic
+      'h', 'e', 'l', 'l', 'o', 'a', 'b', 'c'};
+  ASSERT_RAISES(Invalid, DecodeMetadata(data, sizeof(data)));
+}
+
+// ===========================================================================
+// Primitive value decoding tests
+// ===========================================================================
+
+class VariantPrimitiveTest : public ::testing::Test {
+ protected:
+  VariantMetadata empty_metadata_;
+
+  void SetUp() override {
+    empty_metadata_.version = 1;
+    empty_metadata_.is_sorted = false;
+    empty_metadata_.offset_size = 1;
+  }
+};
+
+TEST_F(VariantPrimitiveTest, DecodeNull) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kNull)};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events.size(), 1);
+  ASSERT_EQ(visitor.events[0], "Null");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeTrue) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kTrue)};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events.size(), 1);
+  ASSERT_EQ(visitor.events[0], "Bool(true)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeFalse) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kFalse)};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events.size(), 1);
+  ASSERT_EQ(visitor.events[0], "Bool(false)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt8) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt8), 0x2A};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int8(42)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt8Negative) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt8), 0xD6};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int8(-42)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt16) {
+  // 300 = 0x012C in little-endian: 0x2C, 0x01
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt16), 0x2C, 0x01};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int16(300)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt32) {
+  // 100000 = 0x000186A0 in LE: A0 86 01 00
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt32), 0xA0, 0x86, 0x01, 
0x00};
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int32(100000)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt32Max) {
+  int32_t val = std::numeric_limits<int32_t>::max();
+  uint8_t data[5];
+  data[0] = PrimitiveHeader(PrimitiveType::kInt32);
+  std::memcpy(data + 1, &val, 4);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int32(" + std::to_string(val) + ")");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeInt64) {
+  int64_t val = 1234567890123LL;
+  uint8_t data[9];
+  data[0] = PrimitiveHeader(PrimitiveType::kInt64);
+  std::memcpy(data + 1, &val, 8);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Int64(" + std::to_string(val) + ")");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeFloat) {
+  float val = 3.14f;
+  uint8_t data[5];
+  data[0] = PrimitiveHeader(PrimitiveType::kFloat);
+  std::memcpy(data + 1, &val, 4);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  // Float string representation may vary; just check it starts with Float(
+  ASSERT_TRUE(visitor.events[0].find("Float(") == 0);
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDouble) {
+  double val = 2.718281828459045;
+  uint8_t data[9];
+  data[0] = PrimitiveHeader(PrimitiveType::kDouble);
+  std::memcpy(data + 1, &val, 8);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_TRUE(visitor.events[0].find("Double(") == 0);
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDate) {
+  // Days since epoch: 19000 (approximately 2022-01-01)
+  int32_t days = 19000;
+  uint8_t data[5];
+  data[0] = PrimitiveHeader(PrimitiveType::kDate);
+  std::memcpy(data + 1, &days, 4);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Date(19000)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeTimestampMicros) {
+  int64_t micros = 1654041600000000LL;  // some timestamp
+  uint8_t data[9];
+  data[0] = PrimitiveHeader(PrimitiveType::kTimestampMicros);
+  std::memcpy(data + 1, &micros, 8);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "TimestampMicros(" + std::to_string(micros) + 
")");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeTimestampMicrosNTZ) {
+  int64_t micros = 1654041600000000LL;
+  uint8_t data[9];
+  data[0] = PrimitiveHeader(PrimitiveType::kTimestampMicrosNTZ);
+  std::memcpy(data + 1, &micros, 8);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "TimestampMicrosNTZ(" + std::to_string(micros) 
+ ")");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDecimal4) {
+  // Spec layout: 1 byte scale, then 4 bytes LE unscaled value
+  uint8_t data[6];
+  data[0] = PrimitiveHeader(PrimitiveType::kDecimal4);
+  data[1] = 2;  // scale = 2
+  int32_t val = 12345;
+  std::memcpy(data + 2, &val, 4);  // unscaled value
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Decimal4(scale=2)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDecimal4MaxScale) {
+  // Scale at maximum per spec: 38
+  uint8_t data[6];
+  data[0] = PrimitiveHeader(PrimitiveType::kDecimal4);
+  data[1] = 38;  // scale = 38 (maximum per spec)
+  int32_t val = 12345;
+  std::memcpy(data + 2, &val, 4);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Decimal4(scale=38)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDecimal8) {
+  // Spec layout: 1 byte scale, then 8 bytes LE unscaled value
+  uint8_t data[10];
+  data[0] = PrimitiveHeader(PrimitiveType::kDecimal8);
+  data[1] = 5;  // scale = 5
+  int64_t val = 123456789012345LL;
+  std::memcpy(data + 2, &val, 8);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Decimal8(scale=5)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeDecimal16) {
+  // Spec layout: 1 byte scale, then 16 bytes LE unscaled value
+  uint8_t data[18];
+  data[0] = PrimitiveHeader(PrimitiveType::kDecimal16);
+  data[1] = 10;  // scale = 10
+  std::memset(data + 2, 0, 16);
+  data[2] = 0x01;  // low byte = 1
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data, sizeof(data), &visitor));
+  ASSERT_EQ(visitor.events[0], "Decimal16(scale=10)");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeLongString) {
+  // Long string: primitive type kString with 4-byte length prefix
+  std::string test_str = "hello world, this is a long string";
+  auto str_len = static_cast<uint32_t>(test_str.size());
+
+  std::vector<uint8_t> data;
+  data.push_back(PrimitiveHeader(PrimitiveType::kString));
+  // 4-byte little-endian length
+  for (int b = 0; b < 4; ++b) {
+    data.push_back(static_cast<uint8_t>((str_len >> (b * 8)) & 0xFF));
+  }
+  data.insert(data.end(), test_str.begin(), test_str.end());
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events[0], "String(\"hello world, this is a long 
string\")");
+}
+
+TEST_F(VariantPrimitiveTest, DecodeBinary) {
+  std::vector<uint8_t> bin_bytes = {0x00, 0x01, 0x02, 0x03};
+  auto bin_len = static_cast<uint32_t>(bin_bytes.size());
+
+  std::vector<uint8_t> data;
+  data.push_back(PrimitiveHeader(PrimitiveType::kBinary));
+  for (int b = 0; b < 4; ++b) {
+    data.push_back(static_cast<uint8_t>((bin_len >> (b * 8)) & 0xFF));
+  }
+  data.insert(data.end(), bin_bytes.begin(), bin_bytes.end());
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events[0], "Binary(len=4)");
+}
+
+// Truncation errors
+
+TEST_F(VariantPrimitiveTest, TruncatedInt32) {
+  // Only 2 bytes after header, but Int32 needs 4
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt32), 0x00, 0x00};
+  RecordingVisitor visitor;
+  ASSERT_RAISES(Invalid,
+                DecodeVariantValue(empty_metadata_, data, sizeof(data), 
&visitor));
+}
+
+TEST_F(VariantPrimitiveTest, EmptyValueBuffer) {
+  RecordingVisitor visitor;
+  ASSERT_RAISES(Invalid, DecodeVariantValue(empty_metadata_, nullptr, 0, 
&visitor));
+}
+
+// ===========================================================================
+// Short string tests
+// ===========================================================================
+
+class VariantShortStringTest : public ::testing::Test {
+ protected:
+  VariantMetadata empty_metadata_;
+
+  void SetUp() override {
+    empty_metadata_.version = 1;
+    empty_metadata_.is_sorted = false;
+    empty_metadata_.offset_size = 1;
+  }
+};
+
+TEST_F(VariantShortStringTest, EmptyShortString) {
+  auto data = BuildShortString("");
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events[0], "String(\"\")");
+}
+
+TEST_F(VariantShortStringTest, SimpleShortString) {
+  auto data = BuildShortString("hi");
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events[0], "String(\"hi\")");
+}
+
+TEST_F(VariantShortStringTest, MaxLengthShortString) {
+  // Maximum short string is 63 bytes
+  std::string max_str(63, 'x');
+  auto data = BuildShortString(max_str);
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events[0], "String(\"" + max_str + "\")");
+}
+
+TEST_F(VariantShortStringTest, TruncatedShortString) {
+  // Header says length=10 but buffer only has 3 bytes total
+  uint8_t data[] = {static_cast<uint8_t>(BasicType::kShortString) | (10 << 2), 
'a', 'b'};
+  RecordingVisitor visitor;
+  ASSERT_RAISES(Invalid,
+                DecodeVariantValue(empty_metadata_, data, sizeof(data), 
&visitor));
+}
+
+// ===========================================================================
+// Object decoding tests
+// ===========================================================================
+
+class VariantObjectTest : public ::testing::Test {
+ protected:
+  VariantMetadata metadata_;
+
+  void SetUp() override {
+    metadata_.version = 1;
+    metadata_.is_sorted = false;
+    metadata_.offset_size = 1;
+    metadata_.strings = {"name", "age", "scores"};
+  }
+};
+
+TEST_F(VariantObjectTest, EmptyObject) {
+  auto data = BuildObject({}, {});
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+  ASSERT_EQ(visitor.events.size(), 2);
+  ASSERT_EQ(visitor.events[0], "StartObject(0)");
+  ASSERT_EQ(visitor.events[1], "EndObject");
+}
+
+TEST_F(VariantObjectTest, SingleField) {
+  // Object with one field: name -> "Alice" (short string)
+  auto value = BuildShortString("Alice");
+  auto data = BuildObject({0}, {value});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+  ASSERT_EQ(visitor.events.size(), 4);
+  ASSERT_EQ(visitor.events[0], "StartObject(1)");
+  ASSERT_EQ(visitor.events[1], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[2], "String(\"Alice\")");
+  ASSERT_EQ(visitor.events[3], "EndObject");
+}
+
+TEST_F(VariantObjectTest, MultipleFields) {
+  // Object: {name: "Bob", age: 30}
+  auto name_val = BuildShortString("Bob");
+  // age: Int32(30)
+  std::vector<uint8_t> age_val = {PrimitiveHeader(PrimitiveType::kInt32), 30, 
0, 0, 0};
+
+  auto data = BuildObject({0, 1}, {name_val, age_val});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+  ASSERT_EQ(visitor.events.size(), 6);
+  ASSERT_EQ(visitor.events[0], "StartObject(2)");
+  ASSERT_EQ(visitor.events[1], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[2], "String(\"Bob\")");
+  ASSERT_EQ(visitor.events[3], "FieldName(\"age\")");
+  ASSERT_EQ(visitor.events[4], "Int32(30)");
+  ASSERT_EQ(visitor.events[5], "EndObject");
+}
+
+TEST_F(VariantObjectTest, InvalidFieldId) {
+  // field_id=99 exceeds dictionary size of 3
+  auto value = BuildShortString("oops");
+  auto data = BuildObject({99}, {value});
+
+  RecordingVisitor visitor;
+  ASSERT_RAISES(Invalid, DecodeVariantValue(metadata_, data.data(),
+                                            static_cast<int64_t>(data.size()), 
&visitor));
+}
+
+TEST_F(VariantObjectTest, ThreeByteOffsetSize) {
+  // Exercises value decoding with 3-byte field_offset_size and field_id_size.
+  // Object with 2 fields: {name: "test", age: 42}
+  auto name_val = BuildShortString("test");
+  std::vector<uint8_t> age_val = {PrimitiveHeader(PrimitiveType::kInt32), 42, 
0, 0, 0};
+  auto data = BuildObject({0, 1}, {name_val, age_val},
+                          /*field_id_size=*/3, /*field_offset_size=*/3);
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+  ASSERT_EQ(visitor.events.size(), 6);
+  ASSERT_EQ(visitor.events[0], "StartObject(2)");
+  ASSERT_EQ(visitor.events[1], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[2], "String(\"test\")");
+  ASSERT_EQ(visitor.events[3], "FieldName(\"age\")");
+  ASSERT_EQ(visitor.events[4], "Int32(42)");
+  ASSERT_EQ(visitor.events[5], "EndObject");
+}
+
+// ===========================================================================
+// Array decoding tests
+// ===========================================================================
+
+class VariantArrayTest : public ::testing::Test {
+ protected:
+  VariantMetadata empty_metadata_;
+
+  void SetUp() override {
+    empty_metadata_.version = 1;
+    empty_metadata_.is_sorted = false;
+    empty_metadata_.offset_size = 1;
+  }
+};
+
+TEST_F(VariantArrayTest, EmptyArray) {
+  auto data = BuildArray({});
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events.size(), 2);
+  ASSERT_EQ(visitor.events[0], "StartArray(0)");
+  ASSERT_EQ(visitor.events[1], "EndArray");
+}
+
+TEST_F(VariantArrayTest, SingleElement) {
+  std::vector<uint8_t> elem = {PrimitiveHeader(PrimitiveType::kInt32), 42, 0, 
0, 0};
+  auto data = BuildArray({elem});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events.size(), 3);
+  ASSERT_EQ(visitor.events[0], "StartArray(1)");
+  ASSERT_EQ(visitor.events[1], "Int32(42)");
+  ASSERT_EQ(visitor.events[2], "EndArray");
+}
+
+TEST_F(VariantArrayTest, HeterogeneousElements) {
+  // Array with mixed types: [42, "hello", true]
+  std::vector<uint8_t> int_elem = {PrimitiveHeader(PrimitiveType::kInt32), 42, 
0, 0, 0};
+  auto str_elem = BuildShortString("hello");
+  std::vector<uint8_t> bool_elem = {PrimitiveHeader(PrimitiveType::kTrue)};
+
+  auto data = BuildArray({int_elem, str_elem, bool_elem});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  ASSERT_EQ(visitor.events.size(), 5);
+  ASSERT_EQ(visitor.events[0], "StartArray(3)");
+  ASSERT_EQ(visitor.events[1], "Int32(42)");
+  ASSERT_EQ(visitor.events[2], "String(\"hello\")");
+  ASSERT_EQ(visitor.events[3], "Bool(true)");
+  ASSERT_EQ(visitor.events[4], "EndArray");
+}
+
+TEST_F(VariantArrayTest, LargeArrayIsLargeFlag) {
+  // Build an array with 256 elements to exercise is_large=true (4-byte
+  // num_elements). Each element is a Null primitive (1 byte each).
+  // Use field_offset_size=2 since total data (256 bytes) exceeds 1-byte max.
+  std::vector<std::vector<uint8_t>> elements;
+  elements.reserve(256);
+  for (int i = 0; i < 256; ++i) {
+    elements.push_back({PrimitiveHeader(PrimitiveType::kNull)});
+  }
+  auto data = BuildArray(elements, /*field_offset_size=*/2);
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(empty_metadata_, data.data(),
+                               static_cast<int64_t>(data.size()), &visitor));
+  // StartArray(256) + 256 Nulls + EndArray = 258 events
+  ASSERT_EQ(visitor.events.size(), 258);
+  ASSERT_EQ(visitor.events[0], "StartArray(256)");
+  ASSERT_EQ(visitor.events[1], "Null");
+  ASSERT_EQ(visitor.events[256], "Null");
+  ASSERT_EQ(visitor.events[257], "EndArray");
+}
+
+// ===========================================================================
+// Nested structure tests
+// ===========================================================================
+
+class VariantNestedTest : public ::testing::Test {
+ protected:
+  VariantMetadata metadata_;
+
+  void SetUp() override {
+    metadata_.version = 1;
+    metadata_.is_sorted = false;
+    metadata_.offset_size = 1;
+    metadata_.strings = {"name", "scores", "inner"};
+  }
+};
+
+TEST_F(VariantNestedTest, ObjectWithNestedArray) {
+  // {name: "Alice", scores: [95, 87]}
+  auto name_val = BuildShortString("Alice");
+
+  // scores array: [Int32(95), Int32(87)]
+  std::vector<uint8_t> score1 = {PrimitiveHeader(PrimitiveType::kInt32), 95, 
0, 0, 0};
+  std::vector<uint8_t> score2 = {PrimitiveHeader(PrimitiveType::kInt32), 87, 
0, 0, 0};
+  auto scores_val = BuildArray({score1, score2});
+
+  auto data = BuildObject({0, 1}, {name_val, scores_val});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+
+  // Expected events:
+  // StartObject(2), FieldName("name"), String("Alice"),
+  // FieldName("scores"), StartArray(2), Int32(95), Int32(87), EndArray,
+  // EndObject
+  ASSERT_EQ(visitor.events.size(), 9);
+  ASSERT_EQ(visitor.events[0], "StartObject(2)");
+  ASSERT_EQ(visitor.events[1], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[2], "String(\"Alice\")");
+  ASSERT_EQ(visitor.events[3], "FieldName(\"scores\")");
+  ASSERT_EQ(visitor.events[4], "StartArray(2)");
+  ASSERT_EQ(visitor.events[5], "Int32(95)");
+  ASSERT_EQ(visitor.events[6], "Int32(87)");
+  ASSERT_EQ(visitor.events[7], "EndArray");
+  ASSERT_EQ(visitor.events[8], "EndObject");
+}
+
+TEST_F(VariantNestedTest, NestedObjects) {
+  // {inner: {name: "deep"}}
+  auto deep_name = BuildShortString("deep");
+  auto inner_obj = BuildObject({0}, {deep_name});
+  auto data = BuildObject({2}, {inner_obj});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+
+  ASSERT_EQ(visitor.events.size(), 7);
+  ASSERT_EQ(visitor.events[0], "StartObject(1)");
+  ASSERT_EQ(visitor.events[1], "FieldName(\"inner\")");
+  ASSERT_EQ(visitor.events[2], "StartObject(1)");
+  ASSERT_EQ(visitor.events[3], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[4], "String(\"deep\")");
+  ASSERT_EQ(visitor.events[5], "EndObject");
+  ASSERT_EQ(visitor.events[6], "EndObject");
+}
+
+TEST_F(VariantNestedTest, ArrayOfObjects) {
+  // [{name: "a"}, {name: "b"}]
+  auto val_a = BuildShortString("a");
+  auto obj_a = BuildObject({0}, {val_a});
+
+  auto val_b = BuildShortString("b");
+  auto obj_b = BuildObject({0}, {val_b});
+
+  auto data = BuildArray({obj_a, obj_b});
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, data.data(), 
static_cast<int64_t>(data.size()),
+                               &visitor));
+
+  ASSERT_EQ(visitor.events.size(), 10);
+  ASSERT_EQ(visitor.events[0], "StartArray(2)");
+  ASSERT_EQ(visitor.events[1], "StartObject(1)");
+  ASSERT_EQ(visitor.events[2], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[3], "String(\"a\")");
+  ASSERT_EQ(visitor.events[4], "EndObject");
+  ASSERT_EQ(visitor.events[5], "StartObject(1)");
+  ASSERT_EQ(visitor.events[6], "FieldName(\"name\")");
+  ASSERT_EQ(visitor.events[7], "String(\"b\")");
+  ASSERT_EQ(visitor.events[8], "EndObject");
+  ASSERT_EQ(visitor.events[9], "EndArray");
+}
+
+// ===========================================================================
+// Recursion depth limit test
+// ===========================================================================
+
+class VariantDepthTest : public ::testing::Test {
+ protected:
+  VariantMetadata metadata_;
+
+  void SetUp() override {
+    metadata_.version = 1;
+    metadata_.is_sorted = false;
+    metadata_.offset_size = 1;
+    metadata_.strings = {"x"};
+  }
+};
+
+TEST_F(VariantDepthTest, ExceedsMaxNestingDepth) {
+  // Build a deeply nested array: [[[[...]]]]
+  // Each level wraps the inner in a 1-element array with offset_size=2
+  // to allow buffers larger than 255 bytes.
+  std::vector<uint8_t> inner = {PrimitiveHeader(PrimitiveType::kNull)};
+
+  // Wrap 130 times (exceeds kMaxNestingDepth=128)
+  for (int i = 0; i < 130; ++i) {
+    inner = BuildArray({inner}, /*field_offset_size=*/2);
+  }
+
+  RecordingVisitor visitor;
+  ASSERT_RAISES(Invalid,
+                DecodeVariantValue(metadata_, inner.data(),
+                                   static_cast<int64_t>(inner.size()), 
&visitor));
+}
+
+TEST_F(VariantDepthTest, AtMaxNestingDepthSucceeds) {
+  // Build 50 levels of nesting — well within kMaxNestingDepth=128
+  // and within offset_size=1 limits (each level adds ~4 bytes).
+  std::vector<uint8_t> inner = {PrimitiveHeader(PrimitiveType::kNull)};
+
+  for (int i = 0; i < 50; ++i) {
+    inner = BuildArray({inner});
+  }
+
+  RecordingVisitor visitor;
+  ASSERT_OK(DecodeVariantValue(metadata_, inner.data(),
+                               static_cast<int64_t>(inner.size()), &visitor));
+}
+
+// ===========================================================================
+// Utility function tests
+// ===========================================================================
+
+class VariantUtilTest : public ::testing::Test {};
+
+TEST_F(VariantUtilTest, GetValueBasicTypePrimitive) {
+  uint8_t data[] = {PrimitiveHeader(PrimitiveType::kInt32), 0, 0, 0, 0};
+  ASSERT_OK_AND_ASSIGN(auto bt, GetValueBasicType(data, sizeof(data)));
+  ASSERT_EQ(bt, BasicType::kPrimitive);
+}
+
+TEST_F(VariantUtilTest, GetValueBasicTypeShortString) {
+  auto data = BuildShortString("test");
+  ASSERT_OK_AND_ASSIGN(auto bt,
+                       GetValueBasicType(data.data(), 
static_cast<int64_t>(data.size())));
+  ASSERT_EQ(bt, BasicType::kShortString);
+}
+
+TEST_F(VariantUtilTest, GetValueBasicTypeObject) {
+  VariantMetadata meta;
+  meta.version = 1;
+  meta.strings = {"key"};
+  auto val = BuildShortString("val");
+  auto data = BuildObject({0}, {val});
+  ASSERT_OK_AND_ASSIGN(auto bt,
+                       GetValueBasicType(data.data(), 
static_cast<int64_t>(data.size())));
+  ASSERT_EQ(bt, BasicType::kObject);
+}
+
+TEST_F(VariantUtilTest, GetValueBasicTypeArray) {
+  auto data = BuildArray({});
+  ASSERT_OK_AND_ASSIGN(auto bt,
+                       GetValueBasicType(data.data(), 
static_cast<int64_t>(data.size())));
+  ASSERT_EQ(bt, BasicType::kArray);
+}
+
+TEST_F(VariantUtilTest, GetValueBasicTypeEmptyBuffer) {
+  ASSERT_RAISES(Invalid, GetValueBasicType(nullptr, 0));
+}
+
+TEST_F(VariantUtilTest, GetObjectFieldCount) {
+  VariantMetadata meta;
+  meta.version = 1;
+  meta.strings = {"a", "b", "c"};
+  auto v1 = BuildShortString("x");
+  auto v2 = BuildShortString("y");
+  auto data = BuildObject({0, 1}, {v1, v2});
+  ASSERT_OK_AND_ASSIGN(
+      auto count, GetObjectFieldCount(data.data(), 
static_cast<int64_t>(data.size())));
+  ASSERT_EQ(count, 2);
+}
+
+TEST_F(VariantUtilTest, GetArrayElementCount) {
+  std::vector<uint8_t> e1 = {PrimitiveHeader(PrimitiveType::kNull)};
+  std::vector<uint8_t> e2 = {PrimitiveHeader(PrimitiveType::kTrue)};
+  std::vector<uint8_t> e3 = {PrimitiveHeader(PrimitiveType::kFalse)};
+  auto data = BuildArray({e1, e2, e3});
+  ASSERT_OK_AND_ASSIGN(
+      auto count, GetArrayElementCount(data.data(), 
static_cast<int64_t>(data.size())));
+  ASSERT_EQ(count, 3);
+}
+
+TEST_F(VariantUtilTest, PrimitiveValueSizes) {
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kNull), 0);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTrue), 0);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kFalse), 0);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kInt8), 1);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kInt16), 2);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kInt32), 4);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kInt64), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kFloat), 4);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kDouble), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kDate), 4);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTimestampMicros), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTimestampMicrosNTZ), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTimeNTZ), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTimestampNanos), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kTimestampNanosNTZ), 8);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kUUID), 16);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kDecimal4), 5);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kDecimal8), 9);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kDecimal16), 17);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kBinary), -1);
+  ASSERT_EQ(PrimitiveValueSize(PrimitiveType::kString), -1);
+}
+
+// ===========================================================================
+// Integration: Metadata + Value decoding together
+// ===========================================================================
+
+class VariantIntegrationTest : public ::testing::Test {};
+
+TEST_F(VariantIntegrationTest, FullRoundTrip) {

Review Comment:
   I would also add more tests demonstrating how to use all these new functions 
together. For example, let's assume we have the following Variant:
   
   ```
   {
     "name": "Alice",
     "age": 30,
     "addresses": {
       "postal": {
         "country": "USA",
         "city": "New York"
       },
       "billing": {
         "country": "USA",
         "city": "Chicago"
       }
     }
   }
   ```
   If we want to find the city for the postal address, we would first need to 
use `FindObjectField` to find "addresses", then "postal", and finally "city". 
After that, we would read the value of the "city" field.



##########
cpp/src/arrow/extension/variant_internal.h:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow::extension::variant_internal {
+
+/// \file variant_internal.h
+/// \brief Utilities for Variant binary encoding/decoding.
+///
+/// Implements parsing logic per the Variant Encoding Spec:
+/// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
+///
+/// The "internal" in the filename refers to the binary encoding internals

Review Comment:
   I don't have a strong opinion here. But maybe instead of explaining in the 
comment what "internal" means it would be better to rename a file e.g. to 
variant_binary_encoding, variant_internal_encoding etc.



##########
cpp/src/arrow/extension/variant_internal.h:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow::extension::variant_internal {
+
+/// \file variant_internal.h
+/// \brief Utilities for Variant binary encoding/decoding.
+///
+/// Implements parsing logic per the Variant Encoding Spec:
+/// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
+///
+/// The "internal" in the filename refers to the binary encoding internals
+/// of the Variant type, not the visibility of this header. This header is
+/// installed and provides the public C++ API for working with Variant
+/// binary data (independent of the VariantExtensionType in parquet_variant.h).
+
+// ---------------------------------------------------------------------------
+// Constants
+// ---------------------------------------------------------------------------
+
+/// Variant encoding spec version 1.
+constexpr uint8_t kVariantVersion = 1;
+
+/// Maximum nesting depth for recursive value decoding.
+/// Prevents stack overflow on deeply nested (possibly malicious) input.
+constexpr int32_t kMaxNestingDepth = 128;
+
+// ---------------------------------------------------------------------------
+// Enumerations
+// ---------------------------------------------------------------------------
+
+/// \brief Basic type codes from bits 0-1 of the value header byte.
+///
+/// See: 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types
+enum class BasicType : uint8_t {
+  kPrimitive = 0,
+  kShortString = 1,
+  kObject = 2,
+  kArray = 3,
+};
+
+/// \brief Primitive type codes from bits 2-7 when basic_type == kPrimitive.
+///
+/// See: 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types
+enum class PrimitiveType : uint8_t {
+  kNull = 0,
+  kTrue = 1,
+  kFalse = 2,
+  kInt8 = 3,
+  kInt16 = 4,
+  kInt32 = 5,
+  kInt64 = 6,
+  kDouble = 7,
+  kDecimal4 = 8,
+  kDecimal8 = 9,
+  kDecimal16 = 10,
+  kDate = 11,
+  kTimestampMicros = 12,
+  kTimestampMicrosNTZ = 13,
+  kFloat = 14,
+  kBinary = 15,
+  kString = 16,
+  kTimeNTZ = 17,
+  kTimestampNanos = 18,
+  kTimestampNanosNTZ = 19,
+  kUUID = 20,
+};
+
+// ---------------------------------------------------------------------------
+// Metadata
+// ---------------------------------------------------------------------------
+
+/// \brief Parsed variant metadata (string dictionary).
+///
+/// The metadata buffer contains a header byte followed by a dictionary of
+/// interned strings. String views reference the raw buffer and are valid
+/// only as long as the underlying buffer is alive.
+struct ARROW_EXPORT VariantMetadata {
+  /// Spec version (must be kVariantVersion).
+  uint8_t version = 0;
+
+  /// Whether the dictionary strings are sorted lexicographically.
+  bool is_sorted = false;
+
+  /// Number of bytes used for each offset (1, 2, 3, or 4).
+  int32_t offset_size = 0;
+
+  /// Dictionary of interned strings. Views into the raw metadata buffer.
+  std::vector<std::string_view> strings;
+};
+
+/// \brief Decode a variant metadata buffer.
+///
+/// Parses the header byte and string dictionary from the raw metadata
+/// buffer. The returned VariantMetadata contains string_views that
+/// reference the input buffer directly (zero-copy).
+///
+/// \param[in] data Pointer to the metadata buffer (must not be null)
+/// \param[in] length Length of the metadata buffer in bytes
+/// \return Parsed VariantMetadata on success, Status::Invalid on
+///         malformed input
+///
+/// \note The input buffer must outlive the returned VariantMetadata.
+ARROW_EXPORT Result<VariantMetadata> DecodeMetadata(const uint8_t* data, 
int64_t length);
+
+// ---------------------------------------------------------------------------
+// Value header utilities
+// ---------------------------------------------------------------------------
+
+/// \brief Extract the basic type from a value header byte.
+///
+/// \param[in] header The first byte of a variant value
+/// \return The BasicType (bits 0-1)
+inline BasicType GetBasicType(uint8_t header) {
+  return static_cast<BasicType>(header & 0x03);
+}
+
+/// \brief Extract the primitive type from a value header byte.
+///
+/// Only valid when GetBasicType(header) == BasicType::kPrimitive.
+///
+/// \param[in] header The first byte of a variant value
+/// \return The PrimitiveType (bits 2-7)
+inline PrimitiveType GetPrimitiveType(uint8_t header) {
+  return static_cast<PrimitiveType>((header >> 2) & 0x3F);
+}
+
+/// \brief Get the byte size of a primitive value (excluding header).
+///
+/// \param[in] primitive_type The primitive type code
+/// \return Number of bytes for the value payload, or -1 for
+///         variable-length types (Binary, String)
+ARROW_EXPORT int32_t PrimitiveValueSize(PrimitiveType primitive_type);
+
+// ---------------------------------------------------------------------------
+// Value decoding
+// ---------------------------------------------------------------------------
+
+/// \brief Visitor interface for variant value decoding.
+///
+/// Implement this interface to receive callbacks during variant value
+/// traversal. The visitor pattern avoids materializing a tree of objects,
+/// which is important when scanning millions of rows.
+///
+/// All methods return Status::OK() to continue traversal, or any error
+/// Status to abort.
+///
+/// \note String values passed to String() and FieldName() are raw bytes from
+///       the variant buffer without UTF-8 validation. Per spec, all strings
+///       must be valid UTF-8, but validation is the responsibility of a
+///       higher-level consumer (e.g., when materializing to Arrow 
StringArray).
+class ARROW_EXPORT VariantVisitor {
+ public:
+  virtual ~VariantVisitor() = default;
+
+  /// @name Primitive value callbacks
+  /// @{
+  virtual Status Null() = 0;
+  virtual Status Bool(bool value) = 0;
+  virtual Status Int8(int8_t value) = 0;
+  virtual Status Int16(int16_t value) = 0;
+  virtual Status Int32(int32_t value) = 0;
+  virtual Status Int64(int64_t value) = 0;
+  virtual Status Float(float value) = 0;
+  virtual Status Double(double value) = 0;
+  virtual Status Decimal4(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Decimal8(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Decimal16(const uint8_t* bytes, int32_t scale) = 0;
+  virtual Status Date(int32_t days_since_epoch) = 0;
+  virtual Status TimestampMicros(int64_t micros_since_epoch) = 0;
+  virtual Status TimestampMicrosNTZ(int64_t micros_since_epoch) = 0;
+  virtual Status String(std::string_view value) = 0;
+  virtual Status Binary(std::string_view value) = 0;
+  virtual Status TimeNTZ(int64_t micros_since_midnight) = 0;
+  virtual Status TimestampNanos(int64_t nanos_since_epoch) = 0;
+  virtual Status TimestampNanosNTZ(int64_t nanos_since_epoch) = 0;
+  virtual Status UUID(const uint8_t* bytes) = 0;
+  /// @}
+
+  /// @name Container callbacks
+  /// @{
+
+  /// \brief Called at the start of an object with the number of fields.
+  virtual Status StartObject(int32_t num_fields) = 0;
+
+  /// \brief Called for each object field name, before the field value.
+  virtual Status FieldName(std::string_view name) = 0;
+
+  /// \brief Called after all fields of an object have been visited.
+  virtual Status EndObject() = 0;
+
+  /// \brief Called at the start of an array with the number of elements.
+  virtual Status StartArray(int32_t num_elements) = 0;
+
+  /// \brief Called after all elements of an array have been visited.
+  virtual Status EndArray() = 0;
+  /// @}
+};
+
+/// \brief Decode a variant value buffer using a visitor.
+///
+/// Recursively traverses the variant value, calling the appropriate
+/// visitor methods for each element. Objects and arrays trigger
+/// Start/End pairs with nested visits for their contents.
+///
+/// \param[in] metadata Parsed metadata (for resolving string dictionary)
+/// \param[in] data Pointer to the value buffer
+/// \param[in] length Length of the value buffer in bytes
+/// \param[in] visitor Callback interface for decoded values
+/// \return Status::OK on success, Status::Invalid on malformed input
+///
+/// \note The data buffer must remain valid for the duration of the call.
+ARROW_EXPORT Status DecodeVariantValue(const VariantMetadata& metadata,

Review Comment:
   Do you have a plan to also support reading/decoding shredded variants?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to