mapleFU commented on code in PR #46372:
URL: https://github.com/apache/arrow/pull/46372#discussion_r2088188833


##########
cpp/src/parquet/variant.cc:
##########
@@ -0,0 +1,618 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/variant.h"
+
+#include <cstdint>
+#include <iostream>
+#include <string_view>
+
+#include "arrow/util/endian.h"
+#include "parquet/exception.h"
+
+namespace parquet::variant {
+
+VariantMetadata::VariantMetadata(std::string_view metadata) : 
metadata_(metadata) {
+  if (metadata.size() < 2) {
+    throw ParquetException("Invalid Variant metadata: too short: " +
+                           std::to_string(metadata.size()));
+  }
+}
+
+int8_t VariantMetadata::version() const {
+  return static_cast<int8_t>(metadata_[0]) & 0x0F;
+}
+
+bool VariantMetadata::sortedStrings() const { return (metadata_[0] & 0b10000) 
!= 0; }
+
+uint8_t VariantMetadata::offsetSize() const { return ((metadata_[0] >> 6) & 
0x3) + 1; }
+
+uint32_t VariantMetadata::dictionarySize() const {
+  uint8_t length = offsetSize();
+  if (length > 4) {
+    throw ParquetException("Invalid offset size: " + std::to_string(length));
+  }
+  if (length + 1 > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: too short for dictionary 
size");
+  }
+  uint32_t dict_size = 0;
+  memcpy(&dict_size, metadata_.data() + 1, length);
+  dict_size = arrow::bit_util::FromLittleEndian(dict_size);
+  return dict_size;
+}
+
+std::string_view VariantMetadata::getMetadataKey(int32_t variantId) const {
+  uint32_t offset_size = offsetSize();
+  uint32_t dict_size = dictionarySize();
+
+  if (variantId < 0 || variantId >= static_cast<int32_t>(dict_size)) {
+    throw ParquetException("Invalid Variant metadata: variantId out of range");
+  }
+
+  if ((dict_size + 1) * offset_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: offset out of range");
+  }
+
+  size_t offset_start_pos = 1 + offset_size + (variantId * offset_size);
+
+  uint32_t variant_offset = 0;
+  uint32_t variant_next_offset = 0;
+  memcpy(&variant_offset, metadata_.data() + offset_start_pos, offset_size);
+  variant_offset = arrow::bit_util::FromLittleEndian(variant_offset);
+  memcpy(&variant_next_offset, metadata_.data() + offset_start_pos + 
offset_size,
+         offset_size);
+  variant_next_offset = arrow::bit_util::FromLittleEndian(variant_next_offset);
+
+  uint32_t key_size = variant_next_offset - variant_offset;
+
+  size_t string_start = 1 + offset_size * (dict_size + 2) + variant_offset;
+  if (string_start + key_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: string data out of 
range");
+  }
+  return {metadata_.data() + string_start, key_size};
+}
+
+VariantBasicType VariantValue::getBasicType() const {
+  if (value.empty()) {
+    throw ParquetException("Empty variant value");
+  }
+  return static_cast<VariantBasicType>(value[0] & BASIC_TYPE_MASK);
+}
+
+VariantType VariantValue::getType() const {
+  VariantBasicType basic_type = getBasicType();
+  // std::cout << "Variant first byte:" << static_cast<int>(value[0] >> 2) << 
", "
+  //           << static_cast<int>(value[0] && BASIC_TYPE_MASK) << '\n';
+  switch (basic_type) {
+    case VariantBasicType::Primitive: {
+      auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+      switch (primitive_type) {
+        case VariantPrimitiveType::NullType:
+          return VariantType::VARIANT_NULL;
+        case VariantPrimitiveType::BooleanTrue:
+        case VariantPrimitiveType::BooleanFalse:
+          return VariantType::BOOLEAN;
+        case VariantPrimitiveType::Int8:
+          return VariantType::BYTE;
+        case VariantPrimitiveType::Int16:
+          return VariantType::SHORT;
+        case VariantPrimitiveType::Int32:
+          return VariantType::INT;
+        case VariantPrimitiveType::Int64:
+          return VariantType::LONG;
+        case VariantPrimitiveType::Double:
+          return VariantType::DOUBLE;
+        case VariantPrimitiveType::Decimal4:
+          return VariantType::DECIMAL4;
+        case VariantPrimitiveType::Decimal8:
+          return VariantType::DECIMAL8;
+        case VariantPrimitiveType::Decimal16:
+          return VariantType::DECIMAL16;
+        case VariantPrimitiveType::Date:
+          return VariantType::DATE;
+        case VariantPrimitiveType::Timestamp:
+          return VariantType::TIMESTAMP_TZ;
+        case VariantPrimitiveType::TimestampNTZ:
+          return VariantType::TIMESTAMP_NTZ;
+        case VariantPrimitiveType::Float:
+          return VariantType::FLOAT;
+        case VariantPrimitiveType::Binary:
+          return VariantType::BINARY;
+        case VariantPrimitiveType::String:
+          return VariantType::STRING;
+        case VariantPrimitiveType::TimeNTZ:
+          return VariantType::TIME;
+        case VariantPrimitiveType::TimestampTZ:
+          return VariantType::TIMESTAMP_NANOS_TZ;
+        case VariantPrimitiveType::TimestampNTZNanos:
+          return VariantType::TIMESTAMP_NANOS_NTZ;
+        case VariantPrimitiveType::Uuid:
+          return VariantType::UUID;
+        default:
+          throw ParquetException("Unknown primitive type: " +
+                                 
std::to_string(static_cast<int>(primitive_type)));
+      }
+    }
+    case VariantBasicType::ShortString:
+      return VariantType::STRING;
+    case VariantBasicType::Object:
+      return VariantType::OBJECT;
+    case VariantBasicType::Array:
+      return VariantType::ARRAY;
+    default:
+      throw ParquetException("Unknown basic type: " +
+                             std::to_string(static_cast<int>(basic_type)));
+  }
+}
+
+std::string VariantValue::typeDebugString() const {
+  VariantType type = getType();
+  switch (type) {
+    case VariantType::OBJECT:
+      return "OBJECT";
+    case VariantType::ARRAY:
+      return "ARRAY";
+    case VariantType::VARIANT_NULL:
+      return "NULL";
+    case VariantType::BOOLEAN:
+      return "BOOLEAN";
+    case VariantType::BYTE:
+      return "BYTE";
+    case VariantType::SHORT:
+      return "SHORT";
+    case VariantType::INT:
+      return "INT";
+    case VariantType::LONG:
+      return "LONG";
+    case VariantType::STRING:
+      return "STRING";
+    case VariantType::DOUBLE:
+      return "DOUBLE";
+    case VariantType::DECIMAL4:
+      return "DECIMAL4";
+    case VariantType::DECIMAL8:
+      return "DECIMAL8";
+    case VariantType::DECIMAL16:
+      return "DECIMAL16";
+    case VariantType::DATE:
+      return "DATE";
+    case VariantType::TIMESTAMP_TZ:
+      return "TIMESTAMP_TZ";
+    case VariantType::TIMESTAMP_NTZ:
+      return "TIMESTAMP_NTZ";
+    case VariantType::FLOAT:
+      return "FLOAT";
+    case VariantType::BINARY:
+      return "BINARY";
+    case VariantType::TIME:
+      return "TIME";
+    case VariantType::TIMESTAMP_NANOS_TZ:
+      return "TIMESTAMP_NANOS_TZ";
+    case VariantType::TIMESTAMP_NANOS_NTZ:
+      return "TIMESTAMP_NANOS_NTZ";
+    case VariantType::UUID:
+      return "UUID";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+bool VariantValue::getBool() const {
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Not a primitive type");
+  }
+
+  int8_t primitive_type = static_cast<int8_t>(value[0]) >> 2;
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanTrue)) {
+    return true;
+  }
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanFalse)) {
+    return false;
+  }
+
+  throw ParquetException("Not a variant primitive boolean type with primitive 
type: " +
+                         std::to_string(primitive_type));
+}
+
+template <typename PrimitiveType>
+PrimitiveType VariantValue::getPrimitiveVariantType(VariantPrimitiveType type) 
const {
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Not a primitive type");
+  }
+
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != VariantPrimitiveType::Int8) {
+    throw ParquetException("Not an correspond type");
+  }
+
+  if (value.size() < 1 + sizeof(PrimitiveType)) {
+    throw ParquetException("Invalid value: too short");
+  }
+
+  PrimitiveType decimal_value{};
+  memcpy(&decimal_value, value.data() + 1, sizeof(PrimitiveType));
+  return decimal_value;
+}
+
+int8_t VariantValue::getInt8() const {
+  return getPrimitiveVariantType<int8_t>(VariantPrimitiveType::Int8);
+}
+
+int16_t VariantValue::getInt16() const {
+  return getPrimitiveVariantType<int8_t>(VariantPrimitiveType::Int16);
+}
+
+int32_t VariantValue::getInt32() const {
+  return getPrimitiveVariantType<int8_t>(VariantPrimitiveType::Int32);
+}
+
+int64_t VariantValue::getInt64() const {
+  return getPrimitiveVariantType<int8_t>(VariantPrimitiveType::Int64);
+}
+
+float VariantValue::getFloat() const {
+  return getPrimitiveVariantType<float>(VariantPrimitiveType::Float);
+}
+
+double VariantValue::getDouble() const {
+  return getPrimitiveVariantType<float>(VariantPrimitiveType::Double);
+}
+
+std::string_view VariantValue::getPrimitiveBinaryType(VariantPrimitiveType 
type) const {
+  VariantBasicType basic_type = getBasicType();
+  if (basic_type != VariantBasicType::Primitive) {
+    throw ParquetException("Not a primitive type");
+  }
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != VariantPrimitiveType::String) {
+    throw ParquetException("Not a string type");
+  }
+
+  if (value.size() < 5) {
+    throw ParquetException("Invalid string value: too short");
+  }
+
+  uint32_t length;
+  memcpy(&length, value.data() + 1, sizeof(uint32_t));
+  length = arrow::bit_util::FromLittleEndian(length);
+
+  if (value.size() < length + 5) {
+    throw ParquetException("Invalid string value: too short for specified 
length");
+  }
+
+  return {value.data() + 5, length};
+}
+
+std::string_view VariantValue::getString() const {
+  VariantBasicType basic_type = getBasicType();
+
+  if (basic_type == VariantBasicType::ShortString) {
+    uint8_t length = (value[0] >> 2) & MAX_SHORT_STR_SIZE_MASK;
+    if (value.size() < length + 1) {
+      throw ParquetException("Invalid short string: too short");
+    }
+    return {value.data() + 1, length};
+  }
+  if (basic_type == VariantBasicType::Primitive) {
+    // TODO(mwish): Should we validate utf8 here?
+    return getPrimitiveBinaryType(VariantPrimitiveType::String);
+  }
+
+  throw ParquetException("Not a primitive or short string type calls 
getString");
+}
+
+std::string_view VariantValue::getBinary() const {
+  return getPrimitiveBinaryType(VariantPrimitiveType::Binary);
+}
+
+template <typename DecimalType>
+DecimalValue<DecimalType> VariantValue::getPrimitiveDecimalType(
+    VariantPrimitiveType type) const {
+  using DecimalValueType = typename DecimalType::ValueType;
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Not a primitive type");
+  }
+
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != type) {
+    throw ParquetException("Not a decimal type");
+  }
+
+  if (value.size() < 2 + sizeof(DecimalValueType)) {
+    throw ParquetException("Invalid decimal value: too short");
+  }
+
+  uint8_t scale = value[1];
+  DecimalValueType decimal_value;
+  memcpy(&decimal_value, value.data() + 2, sizeof(DecimalValueType));
+  decimal_value = arrow::bit_util::FromLittleEndian(decimal_value);
+
+  return {scale, DecimalType(decimal_value)};
+}
+
+DecimalValue<::arrow::Decimal32> VariantValue::getDecimal4() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal32>(VariantPrimitiveType::Decimal4);
+}
+
+DecimalValue<::arrow::Decimal64> VariantValue::getDecimal8() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal64>(VariantPrimitiveType::Decimal8);
+}
+
+DecimalValue<::arrow::Decimal128> VariantValue::getDecimal16() const {
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Not a primitive type");
+  }
+
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != VariantPrimitiveType::Decimal16) {
+    throw ParquetException("Not a decimal16 type");
+  }
+
+  if (value.size() < 2 + sizeof(int64_t) * 2) {
+    throw ParquetException("Invalid decimal16 value: too short");
+  }
+
+  uint8_t scale = value[1];
+
+  // TODO(mwish): Do we have better way for this?
+  std::array<int64_t, 2> low_high_bits;
+  memcpy(&low_high_bits[0], value.data() + 2, sizeof(int64_t));
+  memcpy(&low_high_bits[1], value.data() + 10, sizeof(int64_t));
+  ::arrow::bit_util::little_endian::ToNative(low_high_bits);
+  return {scale, ::arrow::Decimal128(low_high_bits[1], low_high_bits[0])};
+}
+
+int64_t VariantValue::timeNTZ() const {
+  return getPrimitiveVariantType<int64_t>(VariantPrimitiveType::TimeNTZ);
+}
+
+int64_t VariantValue::getTimestamp() const {
+  return getPrimitiveVariantType<int64_t>(VariantPrimitiveType::Timestamp);
+}
+
+int64_t VariantValue::getTimestampNTZ() const {
+  return getPrimitiveVariantType<int64_t>(VariantPrimitiveType::TimestampNTZ);
+}
+
+const uint8_t* VariantValue::getUuid() const {
+  throw ParquetException("VariantValue::getUuid Not implemented");
+}
+
+std::string VariantValue::ObjectInfo::toDebugString() const {
+  std::stringstream ss;
+  ss << "ObjectInfo{"
+     << "num_elements=" << num_elements
+     << ", id_size=" << static_cast<int>(id_size)
+     << ", offset_size=" << static_cast<int>(offset_size)
+     << ", id_start_offset=" << id_start_offset
+     << ", offset_start_offset=" << offset_start_offset
+     << ", data_start_offset=" << data_start_offset
+     << "}";
+  return ss.str();
+}
+
+
+VariantValue::ObjectInfo VariantValue::getObjectInfo() const {
+  if (getBasicType() != VariantBasicType::Object) {
+    throw ParquetException("Not an object type");
+  }
+  uint8_t value_header = value[0] >> 2;
+  uint8_t field_offset_size = (value_header & 0b11) + 1;
+  uint8_t field_id_size = ((value_header >> 2) & 0b11) + 1;
+  bool is_large = ((value_header >> 4) & 0b1);
+  uint8_t num_elements_size = is_large ? 4 : 1;
+  if (value.size() < 1 + num_elements_size) {
+    throw ParquetException("Invalid object value: too short: " +
+                           std::to_string(value.size()) + " for at least " +
+                           std::to_string(1 + num_elements_size));
+  }
+  // parse num_elements
+  uint32_t num_elements = 0;
+  {
+    memcpy(&num_elements, value.data() + 1, num_elements_size);
+    num_elements = arrow::bit_util::FromLittleEndian(num_elements);
+  }
+  ObjectInfo info{};
+  info.num_elements = num_elements;
+  info.id_size = field_id_size;
+  info.offset_size = field_offset_size;
+  info.id_start_offset = 1 + num_elements_size;
+  info.offset_start_offset = info.id_start_offset + num_elements * 
field_id_size;
+  info.data_start_offset = info.offset_start_offset + (num_elements + 1) * 
field_offset_size;
+  // Check the boundary with the final offset
+  if (info.data_start_offset > value.size()) {
+    throw ParquetException("Invalid object value: data_start_offset=" +
+                                 std::to_string(info.data_start_offset) +
+                                 ", value_size=" + 
std::to_string(value.size()));
+  }
+  {
+    uint32_t final_offset = 0;

Review Comment:
   Should we delay this check to another function?



##########
cpp/src/parquet/variant.h:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include <arrow/util/decimal.h>
+
+namespace parquet::variant {
+
+// TODO(mwish): Should I use parquet::ByteArray rather than
+//  std::string_view?
+
+enum class VariantBasicType {
+  /// One of the primitive types
+  Primitive = 0,
+  /// A string with a length less than 64 bytes
+  ShortString = 1,
+  /// A collection of (string-key, variant-value) pairs
+  Object = 2,
+  /// An ordered sequence of variant values
+  Array = 3
+};
+
+std::string variantBasicTypeToString(VariantBasicType type);
+
+enum class VariantPrimitiveType : int8_t {
+  /// Equivalent Parquet Type: UNKNOWN
+  NullType = 0,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanTrue = 1,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanFalse = 2,
+  /// Equivalent Parquet Type: INT(8, signed)
+  Int8 = 3,
+  /// Equivalent Parquet Type: INT(16, signed)
+  Int16 = 4,
+  /// Equivalent Parquet Type: INT(32, signed)
+  Int32 = 5,
+  /// Equivalent Parquet Type: INT(64, signed)
+  Int64 = 6,
+  /// Equivalent Parquet Type: DOUBLE
+  Double = 7,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal4 = 8,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal8 = 9,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal16 = 10,
+  /// Equivalent Parquet Type: DATE
+  Date = 11,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, MICROS)
+  Timestamp = 12,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, MICROS)
+  TimestampNTZ = 13,
+  /// Equivalent Parquet Type: FLOAT
+  Float = 14,
+  /// Equivalent Parquet Type: BINARY
+  Binary = 15,
+  /// Equivalent Parquet Type: STRING
+  String = 16,
+  /// Equivalent Parquet Type: TIME(isAdjustedToUTC=false, MICROS)
+  TimeNTZ = 17,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, NANOS)
+  TimestampTZ = 18,  // Assuming TZ stands for TimeZone, and follows the 
document's
+                     // 'timestamp with time zone'
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, NANOS)
+  TimestampNTZNanos = 19,  // Differentiating from TimestampNTZ (MICROS)
+  /// Equivalent Parquet Type: UUID
+  Uuid = 20
+};
+
+std::string variantPrimitiveTypeToString(VariantPrimitiveType type);
+
+/// VariantType is from basic type and primitive type.
+enum class VariantType {
+  OBJECT,
+  ARRAY,
+  VARIANT_NULL,
+  BOOLEAN,
+  BYTE,
+  SHORT,
+  INT,
+  LONG,
+  STRING,
+  DOUBLE,
+  DECIMAL4,
+  DECIMAL8,
+  DECIMAL16,
+  DATE,
+  TIMESTAMP_TZ,
+  TIMESTAMP_NTZ,
+  FLOAT,
+  BINARY,
+  TIME,
+  TIMESTAMP_NANOS_TZ,
+  TIMESTAMP_NANOS_NTZ,
+  UUID
+};
+
+std::string variantTypeToString(VariantType type);
+
+class VariantMetadata {
+ public:
+  explicit VariantMetadata(std::string_view metadata);
+  /// \brief Get the variant metadata version. Currently, always 1.
+  int8_t version() const;
+  /// \brief Get the metadata key for a given variant field id.
+  std::string_view getMetadataKey(int32_t variantId) const;
+
+ private:
+  bool sortedStrings() const;
+  uint8_t offsetSize() const;
+  uint32_t dictionarySize() const;
+
+ private:
+  std::string_view metadata_;
+};
+
+template <typename DecimalType>
+struct DecimalValue {
+  uint8_t scale;
+  DecimalType value;
+};
+
+struct VariantValue {
+  VariantMetadata metadata;
+  std::string_view value;
+
+  VariantBasicType getBasicType() const;
+  VariantType getType() const;
+  std::string typeDebugString() const;
+
+  /// \defgroup ValueAccessors
+  /// @{
+
+  // Note: Null doesn't need visitor.
+  bool getBool() const;
+  int8_t getInt8() const;
+  int16_t getInt16() const;
+  int32_t getInt32() const;
+  int64_t getInt64() const;

Review Comment:
   Currently, `getInt64` only supports read from int64, which is too strict for 
integer. I think we can also uses some way to allow getInt64 to get some 
"smaller types" like int32, int16, int8.



##########
cpp/src/parquet/variant.h:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include <arrow/util/decimal.h>
+
+namespace parquet::variant {
+
+// TODO(mwish): Should I use parquet::ByteArray rather than
+//  std::string_view?
+
+enum class VariantBasicType {
+  /// One of the primitive types
+  Primitive = 0,
+  /// A string with a length less than 64 bytes
+  ShortString = 1,
+  /// A collection of (string-key, variant-value) pairs
+  Object = 2,
+  /// An ordered sequence of variant values
+  Array = 3
+};
+
+std::string variantBasicTypeToString(VariantBasicType type);
+
+enum class VariantPrimitiveType : int8_t {
+  /// Equivalent Parquet Type: UNKNOWN
+  NullType = 0,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanTrue = 1,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanFalse = 2,
+  /// Equivalent Parquet Type: INT(8, signed)
+  Int8 = 3,
+  /// Equivalent Parquet Type: INT(16, signed)
+  Int16 = 4,
+  /// Equivalent Parquet Type: INT(32, signed)
+  Int32 = 5,
+  /// Equivalent Parquet Type: INT(64, signed)
+  Int64 = 6,
+  /// Equivalent Parquet Type: DOUBLE
+  Double = 7,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal4 = 8,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal8 = 9,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal16 = 10,
+  /// Equivalent Parquet Type: DATE
+  Date = 11,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, MICROS)
+  Timestamp = 12,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, MICROS)
+  TimestampNTZ = 13,
+  /// Equivalent Parquet Type: FLOAT
+  Float = 14,
+  /// Equivalent Parquet Type: BINARY
+  Binary = 15,
+  /// Equivalent Parquet Type: STRING
+  String = 16,
+  /// Equivalent Parquet Type: TIME(isAdjustedToUTC=false, MICROS)
+  TimeNTZ = 17,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, NANOS)
+  TimestampTZ = 18,  // Assuming TZ stands for TimeZone, and follows the 
document's
+                     // 'timestamp with time zone'
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, NANOS)
+  TimestampNTZNanos = 19,  // Differentiating from TimestampNTZ (MICROS)
+  /// Equivalent Parquet Type: UUID
+  Uuid = 20
+};
+
+std::string variantPrimitiveTypeToString(VariantPrimitiveType type);
+
+/// VariantType is from basic type and primitive type.
+enum class VariantType {
+  OBJECT,
+  ARRAY,
+  VARIANT_NULL,
+  BOOLEAN,
+  BYTE,
+  SHORT,
+  INT,
+  LONG,
+  STRING,
+  DOUBLE,
+  DECIMAL4,
+  DECIMAL8,
+  DECIMAL16,
+  DATE,
+  TIMESTAMP_TZ,
+  TIMESTAMP_NTZ,
+  FLOAT,
+  BINARY,
+  TIME,
+  TIMESTAMP_NANOS_TZ,
+  TIMESTAMP_NANOS_NTZ,
+  UUID
+};
+
+std::string variantTypeToString(VariantType type);
+
+class VariantMetadata {
+ public:
+  explicit VariantMetadata(std::string_view metadata);
+  /// \brief Get the variant metadata version. Currently, always 1.
+  int8_t version() const;
+  /// \brief Get the metadata key for a given variant field id.
+  std::string_view getMetadataKey(int32_t variantId) const;
+
+ private:
+  bool sortedStrings() const;
+  uint8_t offsetSize() const;
+  uint32_t dictionarySize() const;
+
+ private:
+  std::string_view metadata_;
+};
+
+template <typename DecimalType>
+struct DecimalValue {
+  uint8_t scale;
+  DecimalType value;
+};
+
+struct VariantValue {
+  VariantMetadata metadata;
+  std::string_view value;
+
+  VariantBasicType getBasicType() const;
+  VariantType getType() const;
+  std::string typeDebugString() const;
+
+  /// \defgroup ValueAccessors
+  /// @{
+
+  // Note: Null doesn't need visitor.
+  bool getBool() const;
+  int8_t getInt8() const;
+  int16_t getInt16() const;
+  int32_t getInt32() const;
+  int64_t getInt64() const;
+  /// Include short_string optimization and primitive string type
+  std::string_view getString() const;

Review Comment:
   Currently I didn't check utf-8 here.



##########
cpp/src/parquet/variant.cc:
##########
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/variant.h"
+
+#include <cstdint>
+#include <iostream>
+#include <string_view>
+
+#include "arrow/util/endian.h"
+#include "parquet/exception.h"
+
+namespace parquet::variant {
+
+std::string variantBasicTypeToString(VariantBasicType type) {
+  switch (type) {
+    case VariantBasicType::Primitive:
+      return "Primitive";
+    case VariantBasicType::ShortString:
+      return "ShortString";
+    case VariantBasicType::Object:
+      return "Object";
+    case VariantBasicType::Array:
+      return "Array";
+    default:
+      return "Unknown";
+  }
+}
+
+std::string variantPrimitiveTypeToString(VariantPrimitiveType type) {
+  switch (type) {
+    case VariantPrimitiveType::NullType:
+      return "NullType";
+    case VariantPrimitiveType::BooleanTrue:
+      return "BooleanTrue";
+    case VariantPrimitiveType::BooleanFalse:
+      return "BooleanFalse";
+    case VariantPrimitiveType::Int8:
+      return "Int8";
+    case VariantPrimitiveType::Int16:
+      return "Int16";
+    case VariantPrimitiveType::Int32:
+      return "Int32";
+    case VariantPrimitiveType::Int64:
+      return "Int64";
+    case VariantPrimitiveType::Double:
+      return "Double";
+    case VariantPrimitiveType::Decimal4:
+      return "Decimal4";
+    case VariantPrimitiveType::Decimal8:
+      return "Decimal8";
+    case VariantPrimitiveType::Decimal16:
+      return "Decimal16";
+    case VariantPrimitiveType::Date:
+      return "Date";
+    case VariantPrimitiveType::Timestamp:
+      return "Timestamp";
+    case VariantPrimitiveType::TimestampNTZ:
+      return "TimestampNTZ";
+    case VariantPrimitiveType::Float:
+      return "Float";
+    case VariantPrimitiveType::Binary:
+      return "Binary";
+    case VariantPrimitiveType::String:
+      return "String";
+    case VariantPrimitiveType::TimeNTZ:
+      return "TimeNTZ";
+    case VariantPrimitiveType::TimestampTZ:
+      return "TimestampTZ";
+    case VariantPrimitiveType::TimestampNTZNanos:
+      return "TimestampNTZNanos";
+    case VariantPrimitiveType::Uuid:
+      return "Uuid";
+    default:
+      return "Unknown";
+  }
+}
+
+std::string variantTypeToString(VariantType type) {
+  switch (type) {
+    case VariantType::OBJECT:
+      return "OBJECT";
+    case VariantType::ARRAY:
+      return "ARRAY";
+    case VariantType::VARIANT_NULL:
+      return "NULL";
+    case VariantType::BOOLEAN:
+      return "BOOLEAN";
+    case VariantType::BYTE:
+      return "BYTE";
+    case VariantType::SHORT:
+      return "SHORT";
+    case VariantType::INT:
+      return "INT";
+    case VariantType::LONG:
+      return "LONG";
+    case VariantType::STRING:
+      return "STRING";
+    case VariantType::DOUBLE:
+      return "DOUBLE";
+    case VariantType::DECIMAL4:
+      return "DECIMAL4";
+    case VariantType::DECIMAL8:
+      return "DECIMAL8";
+    case VariantType::DECIMAL16:
+      return "DECIMAL16";
+    case VariantType::DATE:
+      return "DATE";
+    case VariantType::TIMESTAMP_TZ:
+      return "TIMESTAMP_TZ";
+    case VariantType::TIMESTAMP_NTZ:
+      return "TIMESTAMP_NTZ";
+    case VariantType::FLOAT:
+      return "FLOAT";
+    case VariantType::BINARY:
+      return "BINARY";
+    case VariantType::TIME:
+      return "TIME";
+    case VariantType::TIMESTAMP_NANOS_TZ:
+      return "TIMESTAMP_NANOS_TZ";
+    case VariantType::TIMESTAMP_NANOS_NTZ:
+      return "TIMESTAMP_NANOS_NTZ";
+    case VariantType::UUID:
+      return "UUID";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+VariantMetadata::VariantMetadata(std::string_view metadata) : 
metadata_(metadata) {
+  if (metadata.size() < 2) {
+    throw ParquetException("Invalid Variant metadata: too short: " +
+                           std::to_string(metadata.size()));
+  }
+}
+
+int8_t VariantMetadata::version() const {
+  return static_cast<int8_t>(metadata_[0]) & 0x0F;
+}
+
+bool VariantMetadata::sortedStrings() const { return (metadata_[0] & 0b10000) 
!= 0; }
+
+uint8_t VariantMetadata::offsetSize() const { return ((metadata_[0] >> 6) & 
0x3) + 1; }
+
+uint32_t VariantMetadata::dictionarySize() const {
+  uint8_t length = offsetSize();
+  if (length > 4) {
+    throw ParquetException("Invalid offset size: " + std::to_string(length));
+  }
+  if (length + 1 > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: too short for dictionary 
size");
+  }
+  uint32_t dict_size = 0;
+  memcpy(&dict_size, metadata_.data() + 1, length);
+  dict_size = arrow::bit_util::FromLittleEndian(dict_size);
+  return dict_size;
+}
+
+std::string_view VariantMetadata::getMetadataKey(int32_t variantId) const {
+  uint32_t offset_size = offsetSize();
+  uint32_t dict_size = dictionarySize();
+
+  if (variantId < 0 || variantId >= static_cast<int32_t>(dict_size)) {
+    throw ParquetException("Invalid Variant metadata: variantId out of range");
+  }
+
+  if ((dict_size + 1) * offset_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: offset out of range");
+  }
+
+  size_t offset_start_pos = 1 + offset_size + (variantId * offset_size);
+
+  uint32_t variant_offset = 0;
+  uint32_t variant_next_offset = 0;
+  memcpy(&variant_offset, metadata_.data() + offset_start_pos, offset_size);
+  variant_offset = arrow::bit_util::FromLittleEndian(variant_offset);
+  memcpy(&variant_next_offset, metadata_.data() + offset_start_pos + 
offset_size,
+         offset_size);
+  variant_next_offset = arrow::bit_util::FromLittleEndian(variant_next_offset);
+
+  uint32_t key_size = variant_next_offset - variant_offset;
+
+  size_t string_start = 1 + offset_size * (dict_size + 2) + variant_offset;
+  if (string_start + key_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: string data out of 
range");
+  }
+  return {metadata_.data() + string_start, key_size};
+}
+
+VariantBasicType VariantValue::getBasicType() const {
+  if (value.empty()) {
+    throw ParquetException("Empty variant value");
+  }
+  return static_cast<VariantBasicType>(value[0] & BASIC_TYPE_MASK);
+}
+
+VariantType VariantValue::getType() const {
+  VariantBasicType basic_type = getBasicType();
+  switch (basic_type) {
+    case VariantBasicType::Primitive: {
+      auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+      switch (primitive_type) {
+        case VariantPrimitiveType::NullType:
+          return VariantType::VARIANT_NULL;
+        case VariantPrimitiveType::BooleanTrue:
+        case VariantPrimitiveType::BooleanFalse:
+          return VariantType::BOOLEAN;
+        case VariantPrimitiveType::Int8:
+          return VariantType::BYTE;
+        case VariantPrimitiveType::Int16:
+          return VariantType::SHORT;
+        case VariantPrimitiveType::Int32:
+          return VariantType::INT;
+        case VariantPrimitiveType::Int64:
+          return VariantType::LONG;
+        case VariantPrimitiveType::Double:
+          return VariantType::DOUBLE;
+        case VariantPrimitiveType::Decimal4:
+          return VariantType::DECIMAL4;
+        case VariantPrimitiveType::Decimal8:
+          return VariantType::DECIMAL8;
+        case VariantPrimitiveType::Decimal16:
+          return VariantType::DECIMAL16;
+        case VariantPrimitiveType::Date:
+          return VariantType::DATE;
+        case VariantPrimitiveType::Timestamp:
+          return VariantType::TIMESTAMP_TZ;
+        case VariantPrimitiveType::TimestampNTZ:
+          return VariantType::TIMESTAMP_NTZ;
+        case VariantPrimitiveType::Float:
+          return VariantType::FLOAT;
+        case VariantPrimitiveType::Binary:
+          return VariantType::BINARY;
+        case VariantPrimitiveType::String:
+          return VariantType::STRING;
+        case VariantPrimitiveType::TimeNTZ:
+          return VariantType::TIME;
+        case VariantPrimitiveType::TimestampTZ:
+          return VariantType::TIMESTAMP_NANOS_TZ;
+        case VariantPrimitiveType::TimestampNTZNanos:
+          return VariantType::TIMESTAMP_NANOS_NTZ;
+        case VariantPrimitiveType::Uuid:
+          return VariantType::UUID;
+        default:
+          throw ParquetException("Unknown primitive type: " +
+                                 
std::to_string(static_cast<int>(primitive_type)));
+      }
+    }
+    case VariantBasicType::ShortString:
+      return VariantType::STRING;
+    case VariantBasicType::Object:
+      return VariantType::OBJECT;
+    case VariantBasicType::Array:
+      return VariantType::ARRAY;
+    default:
+      throw ParquetException("Unknown basic type: " +
+                             std::to_string(static_cast<int>(basic_type)));
+  }
+}
+
+std::string VariantValue::typeDebugString() const {
+  VariantType type = getType();
+  switch (type) {
+    case VariantType::OBJECT:
+      return "OBJECT";
+    case VariantType::ARRAY:
+      return "ARRAY";
+    case VariantType::VARIANT_NULL:
+      return "NULL";
+    case VariantType::BOOLEAN:
+      return "BOOLEAN";
+    case VariantType::BYTE:
+      return "BYTE";
+    case VariantType::SHORT:
+      return "SHORT";
+    case VariantType::INT:
+      return "INT";
+    case VariantType::LONG:
+      return "LONG";
+    case VariantType::STRING:
+      return "STRING";
+    case VariantType::DOUBLE:
+      return "DOUBLE";
+    case VariantType::DECIMAL4:
+      return "DECIMAL4";
+    case VariantType::DECIMAL8:
+      return "DECIMAL8";
+    case VariantType::DECIMAL16:
+      return "DECIMAL16";
+    case VariantType::DATE:
+      return "DATE";
+    case VariantType::TIMESTAMP_TZ:
+      return "TIMESTAMP_TZ";
+    case VariantType::TIMESTAMP_NTZ:
+      return "TIMESTAMP_NTZ";
+    case VariantType::FLOAT:
+      return "FLOAT";
+    case VariantType::BINARY:
+      return "BINARY";
+    case VariantType::TIME:
+      return "TIME";
+    case VariantType::TIMESTAMP_NANOS_TZ:
+      return "TIMESTAMP_NANOS_TZ";
+    case VariantType::TIMESTAMP_NANOS_NTZ:
+      return "TIMESTAMP_NANOS_NTZ";
+    case VariantType::UUID:
+      return "UUID";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+bool VariantValue::getBool() const {
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Expected primitive type, but got: " +
+                           variantBasicTypeToString(getBasicType()));
+  }
+
+  int8_t primitive_type = static_cast<int8_t>(value[0]) >> 2;
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanTrue)) {
+    return true;
+  }
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanFalse)) {
+    return false;
+  }
+
+  throw ParquetException("Not a variant primitive boolean type with primitive 
type: " +
+                         std::to_string(primitive_type));
+}
+
+void VariantValue::checkBasicType(VariantBasicType type) const {
+  if (getBasicType() != type) {
+    throw ParquetException("Expected basic type: " + 
variantBasicTypeToString(type) +
+                           ", but got: " + 
variantBasicTypeToString(getBasicType()));
+  }
+}
+
+void VariantValue::checkPrimitiveType(VariantPrimitiveType type,
+                                      size_t size_required) const {
+  checkBasicType(VariantBasicType::Primitive);
+
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != type) {
+    throw ParquetException(
+        "Expected primitive type: " + variantPrimitiveTypeToString(type) +
+        ", but got: " + variantPrimitiveTypeToString(primitive_type));
+  }
+
+  if (value.size() < size_required) {
+    throw ParquetException("Invalid value: too short, expected at least " +
+                           std::to_string(size_required) + " bytes for type " +
+                           variantPrimitiveTypeToString(type) +
+                           ", but got: " + std::to_string(value.size()) + " 
bytes");
+  }
+}
+
+template <typename PrimitiveType>
+PrimitiveType VariantValue::getPrimitiveType(VariantPrimitiveType type) const {
+  checkPrimitiveType(type, sizeof(PrimitiveType) + 1);
+
+  PrimitiveType primitive_value{};
+  memcpy(&primitive_value, value.data() + 1, sizeof(PrimitiveType));
+  // Here we should cast from Little endian.
+  primitive_value = ::arrow::bit_util::FromLittleEndian(primitive_value);
+  return primitive_value;
+}
+
+int8_t VariantValue::getInt8() const {
+  return getPrimitiveType<int8_t>(VariantPrimitiveType::Int8);
+}
+
+int16_t VariantValue::getInt16() const {
+  return getPrimitiveType<int16_t>(VariantPrimitiveType::Int16);
+}
+
+int32_t VariantValue::getInt32() const {
+  return getPrimitiveType<int32_t>(VariantPrimitiveType::Int32);
+}
+
+int64_t VariantValue::getInt64() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::Int64);
+}
+
+float VariantValue::getFloat() const {
+  return getPrimitiveType<float>(VariantPrimitiveType::Float);
+}
+
+double VariantValue::getDouble() const {
+  return getPrimitiveType<double>(VariantPrimitiveType::Double);
+}
+
+std::string_view VariantValue::getPrimitiveBinaryType(VariantPrimitiveType 
type) const {
+  checkPrimitiveType(type, /*size_required=*/5);
+
+  uint32_t length;
+  memcpy(&length, value.data() + 1, sizeof(uint32_t));
+  length = arrow::bit_util::FromLittleEndian(length);
+
+  if (value.size() < length + 5) {
+    throw ParquetException("Invalid string value: too short for specified 
length");
+  }
+
+  return {value.data() + 5, length};
+}
+
+std::string_view VariantValue::getString() const {
+  VariantBasicType basic_type = getBasicType();
+
+  if (basic_type == VariantBasicType::ShortString) {
+    uint8_t length = (value[0] >> 2) & MAX_SHORT_STR_SIZE_MASK;
+    if (value.size() < length + 1) {
+      throw ParquetException("Invalid short string: too short");
+    }
+    return {value.data() + 1, length};
+  }
+  if (basic_type == VariantBasicType::Primitive) {
+    // TODO(mwish): Should we validate utf8 here?
+    return getPrimitiveBinaryType(VariantPrimitiveType::String);
+  }
+
+  throw ParquetException("Not a primitive or short string type calls 
getString");
+}
+
+std::string_view VariantValue::getBinary() const {
+  return getPrimitiveBinaryType(VariantPrimitiveType::Binary);
+}
+
+template <typename DecimalType>
+DecimalValue<DecimalType> VariantValue::getPrimitiveDecimalType(
+    VariantPrimitiveType type) const {
+  using DecimalValueType = typename DecimalType::ValueType;
+  checkPrimitiveType(type, sizeof(DecimalValueType) + 2);
+
+  uint8_t scale = value[1];
+  DecimalValueType decimal_value;
+  memcpy(&decimal_value, value.data() + 2, sizeof(DecimalValueType));
+  decimal_value = arrow::bit_util::FromLittleEndian(decimal_value);
+
+  return {scale, DecimalType(decimal_value)};
+}
+
+DecimalValue<::arrow::Decimal32> VariantValue::getDecimal4() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal32>(VariantPrimitiveType::Decimal4);
+}
+
+DecimalValue<::arrow::Decimal64> VariantValue::getDecimal8() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal64>(VariantPrimitiveType::Decimal8);
+}
+
+DecimalValue<::arrow::Decimal128> VariantValue::getDecimal16() const {
+  checkPrimitiveType(VariantPrimitiveType::Decimal16,
+                     /*size_required=*/sizeof(int64_t) * 2 + 2);
+
+  uint8_t scale = value[1];
+
+  // TODO(mwish): Do we have better way for this?
+  std::array<int64_t, 2> low_high_bits;
+  memcpy(&low_high_bits[0], value.data() + 2, sizeof(int64_t));
+  memcpy(&low_high_bits[1], value.data() + 10, sizeof(int64_t));
+  ::arrow::bit_util::little_endian::ToNative(low_high_bits);
+  return {scale, ::arrow::Decimal128(low_high_bits[1], low_high_bits[0])};
+}
+
+int32_t VariantValue::getDate() const {
+  return getPrimitiveType<int32_t>(VariantPrimitiveType::Date);
+}
+
+int64_t VariantValue::getTimeNTZ() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::TimeNTZ);
+}
+
+int64_t VariantValue::getTimestamp() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::Timestamp);
+}
+
+int64_t VariantValue::getTimestampNTZ() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::TimestampNTZ);
+}
+
+std::array<uint8_t, 16> VariantValue::getUuid() const {
+  checkPrimitiveType(VariantPrimitiveType::Uuid, /*size_required=*/17);
+  std::array<uint8_t, 16> uuid_value;
+  memcpy(uuid_value.data(), value.data() + 1, sizeof(uuid_value));
+#if ARROW_LITTLE_ENDIAN
+  std::array<uint8_t, 16> uuid_value_le;
+  ::arrow::bit_util::ByteSwap(uuid_value_le.data(), uuid_value.data(), 
uuid_value.size());
+  return uuid_value_le;
+#else
+  return uuid_value;
+#endif
+}
+
+std::string VariantValue::ObjectInfo::toDebugString() const {
+  std::stringstream ss;
+  ss << "ObjectInfo{"
+     << "num_elements=" << num_elements
+     << ", id_size=" << static_cast<int>(id_size)
+     << ", offset_size=" << static_cast<int>(offset_size)
+     << ", id_start_offset=" << id_start_offset
+     << ", offset_start_offset=" << offset_start_offset
+     << ", data_start_offset=" << data_start_offset
+     << "}";
+  return ss.str();
+}
+
+
+VariantValue::ObjectInfo VariantValue::getObjectInfo() const {
+  checkBasicType(VariantBasicType::Object);
+  uint8_t value_header = value[0] >> 2;
+  uint8_t field_offset_size = (value_header & 0b11) + 1;
+  uint8_t field_id_size = ((value_header >> 2) & 0b11) + 1;
+  bool is_large = ((value_header >> 4) & 0b1);
+  uint8_t num_elements_size = is_large ? 4 : 1;
+  if (value.size() < 1 + num_elements_size) {
+    throw ParquetException("Invalid object value: too short: " +
+                           std::to_string(value.size()) + " for at least " +
+                           std::to_string(1 + num_elements_size));
+  }
+  // parse num_elements
+  uint32_t num_elements = 0;
+  {
+    memcpy(&num_elements, value.data() + 1, num_elements_size);
+    num_elements = arrow::bit_util::FromLittleEndian(num_elements);
+  }
+  ObjectInfo info{};
+  info.num_elements = num_elements;
+  info.id_size = field_id_size;
+  info.offset_size = field_offset_size;
+  info.id_start_offset = 1 + num_elements_size;
+  info.offset_start_offset = info.id_start_offset + num_elements * 
field_id_size;
+  info.data_start_offset = info.offset_start_offset + (num_elements + 1) * 
field_offset_size;
+  // Check the boundary with the final offset
+  if (info.data_start_offset > value.size()) {
+    throw ParquetException("Invalid object value: data_start_offset=" +
+                                 std::to_string(info.data_start_offset) +
+                                 ", value_size=" + 
std::to_string(value.size()));
+  }
+  {
+    uint32_t final_offset = 0;
+    memcpy(&final_offset,
+           value.data() + info.offset_start_offset + num_elements * 
field_offset_size,
+           field_offset_size);
+    // It could be less than value size since it could be a sub-object.
+    if (final_offset + info.data_start_offset > value.size()) {
+      throw ParquetException("Invalid object value: final_offset=" +
+                             std::to_string(final_offset) +
+                             ", data_start_offset=" + 
std::to_string(info.data_start_offset) +
+                             ", value_size=" + std::to_string(value.size()));
+    }
+  }
+  return info;
+}
+
+std::optional<VariantValue> VariantValue::getObjectValueByKey(
+    std::string_view key) const {
+  ObjectInfo info = getObjectInfo();
+
+  return getObjectValueByKey(key, info);
+}
+
+std::optional<VariantValue> VariantValue::getObjectValueByKey(
+    std::string_view key, const VariantValue::ObjectInfo& info) const {
+  // TODO(mwish): Currently we just linear search here. The best way here is:

Review Comment:
   Currently I just do linear search, this could be optimized later, but I 
don't want to finish it in this pr (will create an github issue when everyone 
agree with this interface)



##########
cpp/src/parquet/variant.cc:
##########
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/variant.h"
+
+#include <cstdint>
+#include <iostream>
+#include <string_view>
+
+#include "arrow/util/endian.h"
+#include "parquet/exception.h"
+
+namespace parquet::variant {
+
+std::string variantBasicTypeToString(VariantBasicType type) {
+  switch (type) {
+    case VariantBasicType::Primitive:
+      return "Primitive";
+    case VariantBasicType::ShortString:
+      return "ShortString";
+    case VariantBasicType::Object:
+      return "Object";
+    case VariantBasicType::Array:
+      return "Array";
+    default:
+      return "Unknown";
+  }
+}
+
+std::string variantPrimitiveTypeToString(VariantPrimitiveType type) {
+  switch (type) {
+    case VariantPrimitiveType::NullType:
+      return "NullType";
+    case VariantPrimitiveType::BooleanTrue:
+      return "BooleanTrue";
+    case VariantPrimitiveType::BooleanFalse:
+      return "BooleanFalse";
+    case VariantPrimitiveType::Int8:
+      return "Int8";
+    case VariantPrimitiveType::Int16:
+      return "Int16";
+    case VariantPrimitiveType::Int32:
+      return "Int32";
+    case VariantPrimitiveType::Int64:
+      return "Int64";
+    case VariantPrimitiveType::Double:
+      return "Double";
+    case VariantPrimitiveType::Decimal4:
+      return "Decimal4";
+    case VariantPrimitiveType::Decimal8:
+      return "Decimal8";
+    case VariantPrimitiveType::Decimal16:
+      return "Decimal16";
+    case VariantPrimitiveType::Date:
+      return "Date";
+    case VariantPrimitiveType::Timestamp:
+      return "Timestamp";
+    case VariantPrimitiveType::TimestampNTZ:
+      return "TimestampNTZ";
+    case VariantPrimitiveType::Float:
+      return "Float";
+    case VariantPrimitiveType::Binary:
+      return "Binary";
+    case VariantPrimitiveType::String:
+      return "String";
+    case VariantPrimitiveType::TimeNTZ:
+      return "TimeNTZ";
+    case VariantPrimitiveType::TimestampTZ:
+      return "TimestampTZ";
+    case VariantPrimitiveType::TimestampNTZNanos:
+      return "TimestampNTZNanos";
+    case VariantPrimitiveType::Uuid:
+      return "Uuid";
+    default:
+      return "Unknown";
+  }
+}
+
+std::string variantTypeToString(VariantType type) {
+  switch (type) {
+    case VariantType::OBJECT:
+      return "OBJECT";
+    case VariantType::ARRAY:
+      return "ARRAY";
+    case VariantType::VARIANT_NULL:
+      return "NULL";
+    case VariantType::BOOLEAN:
+      return "BOOLEAN";
+    case VariantType::BYTE:
+      return "BYTE";
+    case VariantType::SHORT:
+      return "SHORT";
+    case VariantType::INT:
+      return "INT";
+    case VariantType::LONG:
+      return "LONG";
+    case VariantType::STRING:
+      return "STRING";
+    case VariantType::DOUBLE:
+      return "DOUBLE";
+    case VariantType::DECIMAL4:
+      return "DECIMAL4";
+    case VariantType::DECIMAL8:
+      return "DECIMAL8";
+    case VariantType::DECIMAL16:
+      return "DECIMAL16";
+    case VariantType::DATE:
+      return "DATE";
+    case VariantType::TIMESTAMP_TZ:
+      return "TIMESTAMP_TZ";
+    case VariantType::TIMESTAMP_NTZ:
+      return "TIMESTAMP_NTZ";
+    case VariantType::FLOAT:
+      return "FLOAT";
+    case VariantType::BINARY:
+      return "BINARY";
+    case VariantType::TIME:
+      return "TIME";
+    case VariantType::TIMESTAMP_NANOS_TZ:
+      return "TIMESTAMP_NANOS_TZ";
+    case VariantType::TIMESTAMP_NANOS_NTZ:
+      return "TIMESTAMP_NANOS_NTZ";
+    case VariantType::UUID:
+      return "UUID";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+VariantMetadata::VariantMetadata(std::string_view metadata) : 
metadata_(metadata) {
+  if (metadata.size() < 2) {
+    throw ParquetException("Invalid Variant metadata: too short: " +
+                           std::to_string(metadata.size()));
+  }
+}
+
+int8_t VariantMetadata::version() const {
+  return static_cast<int8_t>(metadata_[0]) & 0x0F;
+}
+
+bool VariantMetadata::sortedStrings() const { return (metadata_[0] & 0b10000) 
!= 0; }
+
+uint8_t VariantMetadata::offsetSize() const { return ((metadata_[0] >> 6) & 
0x3) + 1; }
+
+uint32_t VariantMetadata::dictionarySize() const {
+  uint8_t length = offsetSize();
+  if (length > 4) {
+    throw ParquetException("Invalid offset size: " + std::to_string(length));
+  }
+  if (length + 1 > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: too short for dictionary 
size");
+  }
+  uint32_t dict_size = 0;
+  memcpy(&dict_size, metadata_.data() + 1, length);
+  dict_size = arrow::bit_util::FromLittleEndian(dict_size);
+  return dict_size;
+}
+
+std::string_view VariantMetadata::getMetadataKey(int32_t variantId) const {
+  uint32_t offset_size = offsetSize();
+  uint32_t dict_size = dictionarySize();
+
+  if (variantId < 0 || variantId >= static_cast<int32_t>(dict_size)) {
+    throw ParquetException("Invalid Variant metadata: variantId out of range");
+  }
+
+  if ((dict_size + 1) * offset_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: offset out of range");
+  }
+
+  size_t offset_start_pos = 1 + offset_size + (variantId * offset_size);
+
+  uint32_t variant_offset = 0;
+  uint32_t variant_next_offset = 0;
+  memcpy(&variant_offset, metadata_.data() + offset_start_pos, offset_size);
+  variant_offset = arrow::bit_util::FromLittleEndian(variant_offset);
+  memcpy(&variant_next_offset, metadata_.data() + offset_start_pos + 
offset_size,
+         offset_size);
+  variant_next_offset = arrow::bit_util::FromLittleEndian(variant_next_offset);
+
+  uint32_t key_size = variant_next_offset - variant_offset;
+
+  size_t string_start = 1 + offset_size * (dict_size + 2) + variant_offset;
+  if (string_start + key_size > metadata_.size()) {
+    throw ParquetException("Invalid Variant metadata: string data out of 
range");
+  }
+  return {metadata_.data() + string_start, key_size};
+}
+
+VariantBasicType VariantValue::getBasicType() const {
+  if (value.empty()) {
+    throw ParquetException("Empty variant value");
+  }
+  return static_cast<VariantBasicType>(value[0] & BASIC_TYPE_MASK);
+}
+
+VariantType VariantValue::getType() const {
+  VariantBasicType basic_type = getBasicType();
+  switch (basic_type) {
+    case VariantBasicType::Primitive: {
+      auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+      switch (primitive_type) {
+        case VariantPrimitiveType::NullType:
+          return VariantType::VARIANT_NULL;
+        case VariantPrimitiveType::BooleanTrue:
+        case VariantPrimitiveType::BooleanFalse:
+          return VariantType::BOOLEAN;
+        case VariantPrimitiveType::Int8:
+          return VariantType::BYTE;
+        case VariantPrimitiveType::Int16:
+          return VariantType::SHORT;
+        case VariantPrimitiveType::Int32:
+          return VariantType::INT;
+        case VariantPrimitiveType::Int64:
+          return VariantType::LONG;
+        case VariantPrimitiveType::Double:
+          return VariantType::DOUBLE;
+        case VariantPrimitiveType::Decimal4:
+          return VariantType::DECIMAL4;
+        case VariantPrimitiveType::Decimal8:
+          return VariantType::DECIMAL8;
+        case VariantPrimitiveType::Decimal16:
+          return VariantType::DECIMAL16;
+        case VariantPrimitiveType::Date:
+          return VariantType::DATE;
+        case VariantPrimitiveType::Timestamp:
+          return VariantType::TIMESTAMP_TZ;
+        case VariantPrimitiveType::TimestampNTZ:
+          return VariantType::TIMESTAMP_NTZ;
+        case VariantPrimitiveType::Float:
+          return VariantType::FLOAT;
+        case VariantPrimitiveType::Binary:
+          return VariantType::BINARY;
+        case VariantPrimitiveType::String:
+          return VariantType::STRING;
+        case VariantPrimitiveType::TimeNTZ:
+          return VariantType::TIME;
+        case VariantPrimitiveType::TimestampTZ:
+          return VariantType::TIMESTAMP_NANOS_TZ;
+        case VariantPrimitiveType::TimestampNTZNanos:
+          return VariantType::TIMESTAMP_NANOS_NTZ;
+        case VariantPrimitiveType::Uuid:
+          return VariantType::UUID;
+        default:
+          throw ParquetException("Unknown primitive type: " +
+                                 
std::to_string(static_cast<int>(primitive_type)));
+      }
+    }
+    case VariantBasicType::ShortString:
+      return VariantType::STRING;
+    case VariantBasicType::Object:
+      return VariantType::OBJECT;
+    case VariantBasicType::Array:
+      return VariantType::ARRAY;
+    default:
+      throw ParquetException("Unknown basic type: " +
+                             std::to_string(static_cast<int>(basic_type)));
+  }
+}
+
+std::string VariantValue::typeDebugString() const {
+  VariantType type = getType();
+  switch (type) {
+    case VariantType::OBJECT:
+      return "OBJECT";
+    case VariantType::ARRAY:
+      return "ARRAY";
+    case VariantType::VARIANT_NULL:
+      return "NULL";
+    case VariantType::BOOLEAN:
+      return "BOOLEAN";
+    case VariantType::BYTE:
+      return "BYTE";
+    case VariantType::SHORT:
+      return "SHORT";
+    case VariantType::INT:
+      return "INT";
+    case VariantType::LONG:
+      return "LONG";
+    case VariantType::STRING:
+      return "STRING";
+    case VariantType::DOUBLE:
+      return "DOUBLE";
+    case VariantType::DECIMAL4:
+      return "DECIMAL4";
+    case VariantType::DECIMAL8:
+      return "DECIMAL8";
+    case VariantType::DECIMAL16:
+      return "DECIMAL16";
+    case VariantType::DATE:
+      return "DATE";
+    case VariantType::TIMESTAMP_TZ:
+      return "TIMESTAMP_TZ";
+    case VariantType::TIMESTAMP_NTZ:
+      return "TIMESTAMP_NTZ";
+    case VariantType::FLOAT:
+      return "FLOAT";
+    case VariantType::BINARY:
+      return "BINARY";
+    case VariantType::TIME:
+      return "TIME";
+    case VariantType::TIMESTAMP_NANOS_TZ:
+      return "TIMESTAMP_NANOS_TZ";
+    case VariantType::TIMESTAMP_NANOS_NTZ:
+      return "TIMESTAMP_NANOS_NTZ";
+    case VariantType::UUID:
+      return "UUID";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+bool VariantValue::getBool() const {
+  if (getBasicType() != VariantBasicType::Primitive) {
+    throw ParquetException("Expected primitive type, but got: " +
+                           variantBasicTypeToString(getBasicType()));
+  }
+
+  int8_t primitive_type = static_cast<int8_t>(value[0]) >> 2;
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanTrue)) {
+    return true;
+  }
+  if (primitive_type == 
static_cast<int8_t>(VariantPrimitiveType::BooleanFalse)) {
+    return false;
+  }
+
+  throw ParquetException("Not a variant primitive boolean type with primitive 
type: " +
+                         std::to_string(primitive_type));
+}
+
+void VariantValue::checkBasicType(VariantBasicType type) const {
+  if (getBasicType() != type) {
+    throw ParquetException("Expected basic type: " + 
variantBasicTypeToString(type) +
+                           ", but got: " + 
variantBasicTypeToString(getBasicType()));
+  }
+}
+
+void VariantValue::checkPrimitiveType(VariantPrimitiveType type,
+                                      size_t size_required) const {
+  checkBasicType(VariantBasicType::Primitive);
+
+  auto primitive_type = static_cast<VariantPrimitiveType>(value[0] >> 2);
+  if (primitive_type != type) {
+    throw ParquetException(
+        "Expected primitive type: " + variantPrimitiveTypeToString(type) +
+        ", but got: " + variantPrimitiveTypeToString(primitive_type));
+  }
+
+  if (value.size() < size_required) {
+    throw ParquetException("Invalid value: too short, expected at least " +
+                           std::to_string(size_required) + " bytes for type " +
+                           variantPrimitiveTypeToString(type) +
+                           ", but got: " + std::to_string(value.size()) + " 
bytes");
+  }
+}
+
+template <typename PrimitiveType>
+PrimitiveType VariantValue::getPrimitiveType(VariantPrimitiveType type) const {
+  checkPrimitiveType(type, sizeof(PrimitiveType) + 1);
+
+  PrimitiveType primitive_value{};
+  memcpy(&primitive_value, value.data() + 1, sizeof(PrimitiveType));
+  // Here we should cast from Little endian.
+  primitive_value = ::arrow::bit_util::FromLittleEndian(primitive_value);
+  return primitive_value;
+}
+
+int8_t VariantValue::getInt8() const {
+  return getPrimitiveType<int8_t>(VariantPrimitiveType::Int8);
+}
+
+int16_t VariantValue::getInt16() const {
+  return getPrimitiveType<int16_t>(VariantPrimitiveType::Int16);
+}
+
+int32_t VariantValue::getInt32() const {
+  return getPrimitiveType<int32_t>(VariantPrimitiveType::Int32);
+}
+
+int64_t VariantValue::getInt64() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::Int64);
+}
+
+float VariantValue::getFloat() const {
+  return getPrimitiveType<float>(VariantPrimitiveType::Float);
+}
+
+double VariantValue::getDouble() const {
+  return getPrimitiveType<double>(VariantPrimitiveType::Double);
+}
+
+std::string_view VariantValue::getPrimitiveBinaryType(VariantPrimitiveType 
type) const {
+  checkPrimitiveType(type, /*size_required=*/5);
+
+  uint32_t length;
+  memcpy(&length, value.data() + 1, sizeof(uint32_t));
+  length = arrow::bit_util::FromLittleEndian(length);
+
+  if (value.size() < length + 5) {
+    throw ParquetException("Invalid string value: too short for specified 
length");
+  }
+
+  return {value.data() + 5, length};
+}
+
+std::string_view VariantValue::getString() const {
+  VariantBasicType basic_type = getBasicType();
+
+  if (basic_type == VariantBasicType::ShortString) {
+    uint8_t length = (value[0] >> 2) & MAX_SHORT_STR_SIZE_MASK;
+    if (value.size() < length + 1) {
+      throw ParquetException("Invalid short string: too short");
+    }
+    return {value.data() + 1, length};
+  }
+  if (basic_type == VariantBasicType::Primitive) {
+    // TODO(mwish): Should we validate utf8 here?
+    return getPrimitiveBinaryType(VariantPrimitiveType::String);
+  }
+
+  throw ParquetException("Not a primitive or short string type calls 
getString");
+}
+
+std::string_view VariantValue::getBinary() const {
+  return getPrimitiveBinaryType(VariantPrimitiveType::Binary);
+}
+
+template <typename DecimalType>
+DecimalValue<DecimalType> VariantValue::getPrimitiveDecimalType(
+    VariantPrimitiveType type) const {
+  using DecimalValueType = typename DecimalType::ValueType;
+  checkPrimitiveType(type, sizeof(DecimalValueType) + 2);
+
+  uint8_t scale = value[1];
+  DecimalValueType decimal_value;
+  memcpy(&decimal_value, value.data() + 2, sizeof(DecimalValueType));
+  decimal_value = arrow::bit_util::FromLittleEndian(decimal_value);
+
+  return {scale, DecimalType(decimal_value)};
+}
+
+DecimalValue<::arrow::Decimal32> VariantValue::getDecimal4() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal32>(VariantPrimitiveType::Decimal4);
+}
+
+DecimalValue<::arrow::Decimal64> VariantValue::getDecimal8() const {
+  return 
getPrimitiveDecimalType<::arrow::Decimal64>(VariantPrimitiveType::Decimal8);
+}
+
+DecimalValue<::arrow::Decimal128> VariantValue::getDecimal16() const {
+  checkPrimitiveType(VariantPrimitiveType::Decimal16,
+                     /*size_required=*/sizeof(int64_t) * 2 + 2);
+
+  uint8_t scale = value[1];
+
+  // TODO(mwish): Do we have better way for this?
+  std::array<int64_t, 2> low_high_bits;
+  memcpy(&low_high_bits[0], value.data() + 2, sizeof(int64_t));
+  memcpy(&low_high_bits[1], value.data() + 10, sizeof(int64_t));
+  ::arrow::bit_util::little_endian::ToNative(low_high_bits);
+  return {scale, ::arrow::Decimal128(low_high_bits[1], low_high_bits[0])};
+}
+
+int32_t VariantValue::getDate() const {
+  return getPrimitiveType<int32_t>(VariantPrimitiveType::Date);
+}
+
+int64_t VariantValue::getTimeNTZ() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::TimeNTZ);
+}
+
+int64_t VariantValue::getTimestamp() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::Timestamp);
+}
+
+int64_t VariantValue::getTimestampNTZ() const {
+  return getPrimitiveType<int64_t>(VariantPrimitiveType::TimestampNTZ);
+}
+
+std::array<uint8_t, 16> VariantValue::getUuid() const {
+  checkPrimitiveType(VariantPrimitiveType::Uuid, /*size_required=*/17);
+  std::array<uint8_t, 16> uuid_value;
+  memcpy(uuid_value.data(), value.data() + 1, sizeof(uuid_value));
+#if ARROW_LITTLE_ENDIAN
+  std::array<uint8_t, 16> uuid_value_le;
+  ::arrow::bit_util::ByteSwap(uuid_value_le.data(), uuid_value.data(), 
uuid_value.size());
+  return uuid_value_le;
+#else
+  return uuid_value;
+#endif
+}
+
+std::string VariantValue::ObjectInfo::toDebugString() const {
+  std::stringstream ss;
+  ss << "ObjectInfo{"
+     << "num_elements=" << num_elements
+     << ", id_size=" << static_cast<int>(id_size)
+     << ", offset_size=" << static_cast<int>(offset_size)
+     << ", id_start_offset=" << id_start_offset
+     << ", offset_start_offset=" << offset_start_offset
+     << ", data_start_offset=" << data_start_offset
+     << "}";
+  return ss.str();
+}
+
+
+VariantValue::ObjectInfo VariantValue::getObjectInfo() const {
+  checkBasicType(VariantBasicType::Object);
+  uint8_t value_header = value[0] >> 2;
+  uint8_t field_offset_size = (value_header & 0b11) + 1;
+  uint8_t field_id_size = ((value_header >> 2) & 0b11) + 1;
+  bool is_large = ((value_header >> 4) & 0b1);
+  uint8_t num_elements_size = is_large ? 4 : 1;
+  if (value.size() < 1 + num_elements_size) {
+    throw ParquetException("Invalid object value: too short: " +
+                           std::to_string(value.size()) + " for at least " +
+                           std::to_string(1 + num_elements_size));
+  }
+  // parse num_elements
+  uint32_t num_elements = 0;
+  {
+    memcpy(&num_elements, value.data() + 1, num_elements_size);
+    num_elements = arrow::bit_util::FromLittleEndian(num_elements);
+  }
+  ObjectInfo info{};
+  info.num_elements = num_elements;
+  info.id_size = field_id_size;
+  info.offset_size = field_offset_size;
+  info.id_start_offset = 1 + num_elements_size;
+  info.offset_start_offset = info.id_start_offset + num_elements * 
field_id_size;
+  info.data_start_offset = info.offset_start_offset + (num_elements + 1) * 
field_offset_size;
+  // Check the boundary with the final offset
+  if (info.data_start_offset > value.size()) {
+    throw ParquetException("Invalid object value: data_start_offset=" +
+                                 std::to_string(info.data_start_offset) +
+                                 ", value_size=" + 
std::to_string(value.size()));
+  }
+  {
+    uint32_t final_offset = 0;
+    memcpy(&final_offset,
+           value.data() + info.offset_start_offset + num_elements * 
field_offset_size,
+           field_offset_size);
+    // It could be less than value size since it could be a sub-object.
+    if (final_offset + info.data_start_offset > value.size()) {
+      throw ParquetException("Invalid object value: final_offset=" +
+                             std::to_string(final_offset) +
+                             ", data_start_offset=" + 
std::to_string(info.data_start_offset) +
+                             ", value_size=" + std::to_string(value.size()));
+    }
+  }
+  return info;
+}
+
+std::optional<VariantValue> VariantValue::getObjectValueByKey(
+    std::string_view key) const {
+  ObjectInfo info = getObjectInfo();
+
+  return getObjectValueByKey(key, info);
+}
+
+std::optional<VariantValue> VariantValue::getObjectValueByKey(
+    std::string_view key, const VariantValue::ObjectInfo& info) const {
+  // TODO(mwish): Currently we just linear search here. The best way here is:
+  //  1. check the num_elements
+  //  2.1. If the element number is less than 8(or other magic number), we can 
keep
+  //       current method.
+  //  2.2. If the element number is larger than 8, and metadata.sorted_strings 
is true,
+  //       we can first apply binary search on the metadata, and then binary 
search the
+  //       field id.
+
+  for (uint32_t i = 0; i < info.num_elements; ++i) {
+    std::string_view field_key;
+    std::optional<VariantValue> field_value = getObjectFieldByFieldId(i, 
&field_key);
+
+    if (field_key == key) {
+      return field_value;
+    }
+  }
+
+  return std::nullopt;
+}
+
+VariantValue VariantValue::getObjectFieldByFieldId(uint32_t variantId,
+                                                   std::string_view* key) 
const {
+  ObjectInfo info = getObjectInfo();
+
+  if (variantId >= info.num_elements) {
+    throw ParquetException("Field ID out of range: " + 
std::to_string(variantId) +
+                           " >= " + std::to_string(info.num_elements));
+  }
+
+  // Read the field ID
+  uint32_t field_id = 0;
+  memcpy(&field_id, value.data() + info.id_start_offset + variantId * 
info.id_size,
+         info.id_size);
+  field_id = arrow::bit_util::FromLittleEndian(field_id);
+
+  // Get the key from metadata
+  *key = metadata.getMetadataKey(static_cast<int32_t>(field_id));
+
+  // Read the offset and next offset
+  uint32_t offset = 0, next_offset = 0;
+  memcpy(&offset, value.data() + info.offset_start_offset + variantId * 
info.offset_size,
+         info.offset_size);
+  memcpy(&next_offset,
+         value.data() + info.offset_start_offset + (variantId + 1) * 
info.offset_size,
+         info.offset_size);
+  offset = arrow::bit_util::FromLittleEndian(offset);
+
+  if (info.data_start_offset + offset > value.size()) {
+    throw ParquetException("Invalid object field offsets: data_start_offset=" +
+                           std::to_string(info.data_start_offset) +
+                           ", offset=" + std::to_string(offset) +
+                           ", value_size=" + std::to_string(value.size()));
+  }
+
+  // Create a VariantValue for the field
+  VariantValue field_value{
+      .metadata = metadata,
+      .value = value.substr(info.data_start_offset + offset)};
+
+  return field_value;
+}
+
+VariantValue::ArrayInfo VariantValue::getArrayInfo() const {
+  checkBasicType(VariantBasicType::Array);
+  uint8_t value_header = value[0] >> 2;
+  uint8_t field_offset_size = (value_header & 0b11) + 1;
+  bool is_large = ((value_header >> 2) & 0b1);
+
+  // check the array header
+  uint8_t num_elements_size = is_large ? 4 : 1;
+  if (value.size() < 1 + num_elements_size) {
+    throw ParquetException(
+        "Invalid array value: too short: " + std::to_string(value.size()) +
+        " for at least " + std::to_string(1 + num_elements_size));
+  }
+
+  // parse num_elements
+  uint32_t num_elements = 0;
+  {
+    memcpy(&num_elements, value.data() + 1, num_elements_size);
+    num_elements = arrow::bit_util::FromLittleEndian(num_elements);
+  }
+
+  ArrayInfo info{};
+  info.num_elements = num_elements;
+  info.offset_size = field_offset_size;
+  info.offset_start_offset = 1 + num_elements_size;
+  info.data_start_offset =
+      info.offset_start_offset + (num_elements + 1) * field_offset_size;
+
+  // Boundary check
+  if (info.data_start_offset > value.size()) {
+    throw ParquetException("Invalid array value: data_start_offset=" +
+                           std::to_string(info.data_start_offset) +
+                           ", value_size=" + std::to_string(value.size()));
+  }
+
+  // Validate final offset is equal to the size of the value,
+  // it would work since even empty array would have an offset of 0.
+  {
+    uint32_t final_offset = 0;
+    memcpy(&final_offset,
+           value.data() + info.offset_start_offset + num_elements * 
field_offset_size,
+           field_offset_size);
+    final_offset = arrow::bit_util::FromLittleEndian(final_offset);
+
+    if (info.data_start_offset + final_offset > value.size()) {
+      throw ParquetException(
+          "Invalid array value: final_offset=" + std::to_string(final_offset) +
+          ", data_start_offset=" + std::to_string(info.data_start_offset) +
+          ", value_size=" + std::to_string(value.size()));
+    }
+  }
+
+  // checking the element is incremental.
+  // TODO(mwish): Remove this or encapsulate this range check to function

Review Comment:
   I think should we use a extra function here like "Validate", or just checks 
them here?



##########
cpp/src/parquet/variant.h:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+#include <vector>
+
+#include <arrow/util/decimal.h>
+
+namespace parquet::variant {
+
+// TODO(mwish): Should I use parquet::ByteArray rather than
+//  std::string_view?
+
+enum class VariantBasicType {
+  /// One of the primitive types
+  Primitive = 0,
+  /// A string with a length less than 64 bytes
+  ShortString = 1,
+  /// A collection of (string-key, variant-value) pairs
+  Object = 2,
+  /// An ordered sequence of variant values
+  Array = 3
+};
+
+std::string variantBasicTypeToString(VariantBasicType type);
+
+enum class VariantPrimitiveType : int8_t {
+  /// Equivalent Parquet Type: UNKNOWN
+  NullType = 0,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanTrue = 1,
+  /// Equivalent Parquet Type: BOOLEAN
+  BooleanFalse = 2,
+  /// Equivalent Parquet Type: INT(8, signed)
+  Int8 = 3,
+  /// Equivalent Parquet Type: INT(16, signed)
+  Int16 = 4,
+  /// Equivalent Parquet Type: INT(32, signed)
+  Int32 = 5,
+  /// Equivalent Parquet Type: INT(64, signed)
+  Int64 = 6,
+  /// Equivalent Parquet Type: DOUBLE
+  Double = 7,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal4 = 8,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal8 = 9,
+  /// Equivalent Parquet Type: DECIMAL(precision, scale)
+  Decimal16 = 10,
+  /// Equivalent Parquet Type: DATE
+  Date = 11,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, MICROS)
+  Timestamp = 12,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, MICROS)
+  TimestampNTZ = 13,
+  /// Equivalent Parquet Type: FLOAT
+  Float = 14,
+  /// Equivalent Parquet Type: BINARY
+  Binary = 15,
+  /// Equivalent Parquet Type: STRING
+  String = 16,
+  /// Equivalent Parquet Type: TIME(isAdjustedToUTC=false, MICROS)
+  TimeNTZ = 17,
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=true, NANOS)
+  TimestampTZ = 18,  // Assuming TZ stands for TimeZone, and follows the 
document's
+                     // 'timestamp with time zone'
+  /// Equivalent Parquet Type: TIMESTAMP(isAdjustedToUTC=false, NANOS)
+  TimestampNTZNanos = 19,  // Differentiating from TimestampNTZ (MICROS)
+  /// Equivalent Parquet Type: UUID
+  Uuid = 20
+};
+
+std::string variantPrimitiveTypeToString(VariantPrimitiveType type);
+
+/// VariantType is from basic type and primitive type.
+enum class VariantType {
+  OBJECT,
+  ARRAY,
+  VARIANT_NULL,
+  BOOLEAN,
+  BYTE,
+  SHORT,
+  INT,
+  LONG,
+  STRING,
+  DOUBLE,
+  DECIMAL4,
+  DECIMAL8,
+  DECIMAL16,
+  DATE,
+  TIMESTAMP_TZ,
+  TIMESTAMP_NTZ,
+  FLOAT,
+  BINARY,
+  TIME,
+  TIMESTAMP_NANOS_TZ,
+  TIMESTAMP_NANOS_NTZ,
+  UUID
+};
+
+std::string variantTypeToString(VariantType type);
+
+class VariantMetadata {
+ public:
+  explicit VariantMetadata(std::string_view metadata);
+  /// \brief Get the variant metadata version. Currently, always 1.
+  int8_t version() const;
+  /// \brief Get the metadata key for a given variant field id.
+  std::string_view getMetadataKey(int32_t variantId) const;
+
+ private:
+  bool sortedStrings() const;
+  uint8_t offsetSize() const;
+  uint32_t dictionarySize() const;
+
+ private:
+  std::string_view metadata_;
+};
+
+template <typename DecimalType>
+struct DecimalValue {
+  uint8_t scale;
+  DecimalType value;
+};
+
+struct VariantValue {
+  VariantMetadata metadata;
+  std::string_view value;
+
+  VariantBasicType getBasicType() const;
+  VariantType getType() const;
+  std::string typeDebugString() const;
+
+  /// \defgroup ValueAccessors
+  /// @{
+
+  // Note: Null doesn't need visitor.
+  bool getBool() const;
+  int8_t getInt8() const;
+  int16_t getInt16() const;
+  int32_t getInt32() const;
+  int64_t getInt64() const;
+  /// Include short_string optimization and primitive string type
+  std::string_view getString() const;
+  std::string_view getBinary() const;
+  float getFloat() const;
+  double getDouble() const;

Review Comment:
   Currently, `getDouble` only supports read from `getFloat`, which is too 
strict for. Maybe we can also uses some way to allow `getDouble` get other types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to