This is an automated email from the ASF dual-hosted git repository.
leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f0e17a4 feat: Add array data type support (#433)
f0e17a4 is described below
commit f0e17a4ab4d5c6f65d980043762c57d66eb57f90
Author: Kaiqi Dong <[email protected]>
AuthorDate: Sat Mar 28 21:20:24 2026 +0100
feat: Add array data type support (#433)
* add array data type support
* add docs
* address comments
* avoid intermediate record batch in get-array
* Address comments
* address review comments
* address comments and improve doc
* address comments
* add todo
* remove todo from reference.md to code
---
bindings/cpp/src/types.rs | 6 +
crates/fluss/src/record/arrow.rs | 65 ++
crates/fluss/src/row/binary/binary_writer.rs | 10 +-
crates/fluss/src/row/binary_array.rs | 848 +++++++++++++++++++++
crates/fluss/src/row/column.rs | 463 ++++++++++-
.../src/row/compacted/compacted_key_writer.rs | 11 +
crates/fluss/src/row/compacted/compacted_row.rs | 202 ++++-
.../src/row/compacted/compacted_row_reader.rs | 204 +++--
.../src/row/compacted/compacted_row_writer.rs | 4 +
crates/fluss/src/row/datum.rs | 122 ++-
.../fluss/src/row/encode/compacted_key_encoder.rs | 160 +++-
crates/fluss/src/row/field_getter.rs | 55 +-
crates/fluss/src/row/mod.rs | 14 +
website/docs/user-guide/rust/api-reference.md | 13 +
website/docs/user-guide/rust/data-types.md | 24 +
15 files changed, 2078 insertions(+), 123 deletions(-)
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 3c0c6f7..f7aabe9 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -351,6 +351,9 @@ pub fn resolve_row_types(
Datum::Time(t) => Datum::Time(*t),
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
+ // TODO: C++ bindings need proper CXX wrapper types for FlussArray
+ // before C++ users can construct or inspect array values through
FFI.
+ Datum::Array(a) => Datum::Array(a.clone()),
};
out.set_field(idx, resolved);
}
@@ -408,6 +411,9 @@ pub fn compacted_row_to_owned(
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i,
dt.length())?.to_vec()))
}
+ // TODO: C++ bindings need proper CXX wrapper types for FlussArray
+ // before C++ users can construct or inspect array values through
FFI.
+ fcore::metadata::DataType::Array(_) =>
Datum::Array(row.get_array(i)?),
other => return Err(anyhow!("Unsupported data type for column {i}:
{other:?}")),
};
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index d8ba6d9..7dd745b 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1091,6 +1091,71 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
Result<ArrowDataType> {
})
}
+/// Converts an Arrow data type back to a Fluss `DataType`.
+/// Used for reading array elements from Arrow ListArray back into Fluss types.
+pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
+ use crate::metadata::DataTypes;
+
+ Ok(match arrow_type {
+ ArrowDataType::Boolean => DataTypes::boolean(),
+ ArrowDataType::Int8 => DataTypes::tinyint(),
+ ArrowDataType::Int16 => DataTypes::smallint(),
+ ArrowDataType::Int32 => DataTypes::int(),
+ ArrowDataType::Int64 => DataTypes::bigint(),
+ ArrowDataType::Float32 => DataTypes::float(),
+ ArrowDataType::Float64 => DataTypes::double(),
+ ArrowDataType::Utf8 => DataTypes::string(),
+ ArrowDataType::Binary => DataTypes::bytes(),
+ ArrowDataType::Date32 => DataTypes::date(),
+ ArrowDataType::FixedSizeBinary(len) => {
+ if *len < 0 {
+ return Err(Error::IllegalArgument {
+ message: format!("FixedSizeBinary length must be >= 0, got
{len}"),
+ });
+ }
+ DataTypes::binary(*len as usize)
+ }
+ ArrowDataType::Decimal128(p, s) => {
+ if *s < 0 {
+ return Err(Error::IllegalArgument {
+ message: format!("Decimal scale must be >= 0, got {s}"),
+ });
+ }
+ DataTypes::decimal(*p as u32, *s as u32)
+ }
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Second) =>
DataTypes::time_with_precision(0),
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+ DataTypes::time_with_precision(3)
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+ DataTypes::time_with_precision(6)
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+ DataTypes::time_with_precision(9)
+ }
+ ArrowDataType::Timestamp(unit, tz) => {
+ let precision = match unit {
+ arrow_schema::TimeUnit::Second => 0,
+ arrow_schema::TimeUnit::Millisecond => 3,
+ arrow_schema::TimeUnit::Microsecond => 6,
+ arrow_schema::TimeUnit::Nanosecond => 9,
+ };
+
+ if tz.is_some() {
+ DataTypes::timestamp_ltz_with_precision(precision)
+ } else {
+ DataTypes::timestamp_with_precision(precision)
+ }
+ }
+ ArrowDataType::List(field) =>
DataTypes::array(from_arrow_type(field.data_type())?),
+ other => {
+ return Err(Error::IllegalArgument {
+ message: format!("Cannot convert Arrow type to Fluss type:
{other:?}"),
+ });
+ }
+ })
+}
+
#[derive(Clone)]
pub struct ReadContext {
target_schema: SchemaRef,
diff --git a/crates/fluss/src/row/binary/binary_writer.rs
b/crates/fluss/src/row/binary/binary_writer.rs
index af2765c..f51a6e8 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -67,8 +67,7 @@ pub trait BinaryWriter {
fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz,
precision: u32);
- // TODO InternalArray, ArraySerializer
- // fn write_array(&mut self, pos: i32, value: i64);
+ fn write_array(&mut self, value: &[u8]);
// TODO Row serializer
// fn write_row(&mut self, pos: i32, value: &InternalRow);
@@ -136,7 +135,8 @@ pub enum InnerValueWriter {
Time(u32), // precision (not used in wire format, but kept for
consistency)
TimestampNtz(u32), // precision
TimestampLtz(u32), // precision
- // TODO Array, Row
+ Array,
+ // TODO Row
}
/// Accessor for writing the fields/elements of a binary writer during
runtime, the
@@ -175,6 +175,7 @@ impl InnerValueWriter {
// Validation is done at TimestampLTzType construction time
Ok(InnerValueWriter::TimestampLtz(t.precision()))
}
+ DataType::Array(_) => Ok(InnerValueWriter::Array),
_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
@@ -237,6 +238,9 @@ impl InnerValueWriter {
(InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
writer.write_timestamp_ltz(ts, *p);
}
+ (InnerValueWriter::Array, Datum::Array(arr)) => {
+ writer.write_array(arr.as_bytes());
+ }
_ => {
return Err(IllegalArgument {
message: format!("{self:?} used to write value {value:?}"),
diff --git a/crates/fluss/src/row/binary_array.rs
b/crates/fluss/src/row/binary_array.rs
new file mode 100644
index 0000000..9008bc5
--- /dev/null
+++ b/crates/fluss/src/row/binary_array.rs
@@ -0,0 +1,848 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary array format matching Java's `BinaryArray.java` layout.
+//!
+//! Binary layout:
+//! ```text
+//! [size(4B)] + [null bits (4-byte word aligned)] + [fixed-length part] +
[variable-length part]
+//! ```
+//!
+//! Java reference: `BinaryArray.java`, `BinaryArrayWriter.java`
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Decimal;
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use bytes::Bytes;
+use serde::Serialize;
+use std::fmt;
+use std::hash::{Hash, Hasher};
+
+const MAX_FIX_PART_DATA_SIZE: usize = 7;
+const HIGHEST_FIRST_BIT: u64 = 0x80_u64 << 56;
+const HIGHEST_SECOND_TO_EIGHTH_BIT: u64 = 0x7F_u64 << 56;
+
+/// Calculates the header size in bytes: 4 (for element count) + null bits
(4-byte word aligned).
+/// Matches Java's `BinaryArray.calculateHeaderInBytes(numFields)`.
+pub fn calculate_header_in_bytes(num_elements: usize) -> usize {
+ 4 + num_elements.div_ceil(32) * 4
+}
+
+/// Calculates the fixed-length part size per element for a given data type.
+/// Matches Java's `BinaryArray.calculateFixLengthPartSize(DataType)`.
+pub fn calculate_fix_length_part_size(element_type: &DataType) -> usize {
+ match element_type {
+ DataType::Boolean(_) | DataType::TinyInt(_) => 1,
+ DataType::SmallInt(_) => 2,
+ DataType::Int(_) | DataType::Float(_) | DataType::Date(_) |
DataType::Time(_) => 4,
+ DataType::BigInt(_)
+ | DataType::Double(_)
+ | DataType::Char(_)
+ | DataType::String(_)
+ | DataType::Binary(_)
+ | DataType::Bytes(_)
+ | DataType::Decimal(_)
+ | DataType::Timestamp(_)
+ | DataType::TimestampLTz(_)
+ | DataType::Array(_)
+ | DataType::Map(_)
+ | DataType::Row(_) => 8,
+ }
+}
+
+/// Rounds a byte count up to the nearest 8-byte word boundary.
+/// Matches Java's `roundNumberOfBytesToNearestWord`.
+fn round_to_nearest_word(num_bytes: usize) -> usize {
+ (num_bytes + 7) & !7
+}
+
+/// A Fluss binary array, wire-compatible with Java's `BinaryArray`.
+///
+/// Stores elements in a flat byte buffer with a header (element count + null
bitmap)
+/// followed by fixed-length slots and an optional variable-length section.
+///
+/// Uses `Bytes` internally so cloning is O(1) reference-counted.
+// TODO: FlussArray currently exposes only fallible getters. Infallible
+// fast-path variants may be added later as non-breaking extensions.
+#[derive(Clone)]
+pub struct FlussArray {
+ data: Bytes,
+ size: usize,
+ element_offset: usize,
+}
+
+impl fmt::Debug for FlussArray {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlussArray")
+ .field("size", &self.size)
+ .field("data_len", &self.data.len())
+ .finish()
+ }
+}
+
+impl fmt::Display for FlussArray {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "FlussArray[size={}]", self.size)
+ }
+}
+
+impl PartialEq for FlussArray {
+ fn eq(&self, other: &Self) -> bool {
+ self.data == other.data
+ }
+}
+
+impl Eq for FlussArray {}
+
+impl PartialOrd for FlussArray {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for FlussArray {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.data.cmp(&other.data)
+ }
+}
+
+impl Hash for FlussArray {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.data.hash(state);
+ }
+}
+
+impl Serialize for FlussArray {
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ serializer.serialize_bytes(&self.data)
+ }
+}
+
+impl FlussArray {
+ /// Validates the raw bytes and computes derived fields (size,
element_offset).
+ fn validate(data: &[u8]) -> Result<(usize, usize)> {
+ if data.len() < 4 {
+ return Err(IllegalArgument {
+ message: format!(
+ "FlussArray data too short: need at least 4 bytes, got {}",
+ data.len()
+ ),
+ });
+ }
+ let raw_size = i32::from_le_bytes(data[0..4].try_into().unwrap());
+ if raw_size < 0 {
+ return Err(IllegalArgument {
+ message: format!("FlussArray size must be non-negative, got
{raw_size}"),
+ });
+ }
+ let size = raw_size as usize;
+ let element_offset = calculate_header_in_bytes(size);
+ if element_offset > data.len() {
+ return Err(IllegalArgument {
+ message: format!(
+ "FlussArray header exceeds payload: header={}, payload={}",
+ element_offset,
+ data.len()
+ ),
+ });
+ }
+ Ok((size, element_offset))
+ }
+
+ /// Creates a FlussArray from a byte slice (copies data).
+ pub fn from_bytes(data: &[u8]) -> Result<Self> {
+ let (size, element_offset) = Self::validate(data)?;
+ Ok(FlussArray {
+ data: Bytes::copy_from_slice(data),
+ size,
+ element_offset,
+ })
+ }
+
+ /// Creates a FlussArray from an owned `Vec<u8>` without copying.
+ pub fn from_vec(data: Vec<u8>) -> Result<Self> {
+ let (size, element_offset) = Self::validate(&data)?;
+ Ok(FlussArray {
+ data: Bytes::from(data),
+ size,
+ element_offset,
+ })
+ }
+
+ /// Creates a FlussArray from owned bytes without copying.
+ fn from_owned_bytes(data: Bytes) -> Result<Self> {
+ let (size, element_offset) = Self::validate(&data)?;
+ Ok(FlussArray {
+ data,
+ size,
+ element_offset,
+ })
+ }
+
+ /// Returns the number of elements.
+ pub fn size(&self) -> usize {
+ self.size
+ }
+
+ /// Returns the raw bytes of this array (the complete binary
representation).
+ pub fn as_bytes(&self) -> &[u8] {
+ &self.data
+ }
+
+ /// Returns true if the element at position `pos` is null.
+ pub fn is_null_at(&self, pos: usize) -> bool {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ (self.data[4 + byte_index] & (1u8 << bit)) != 0
+ }
+
+ fn checked_slice(&self, start: usize, len: usize, context: &str) ->
Result<&[u8]> {
+ let end = start.checked_add(len).ok_or_else(|| IllegalArgument {
+ message: format!("Overflow while reading {context}: start={start},
len={len}"),
+ })?;
+ if end > self.data.len() {
+ return Err(IllegalArgument {
+ message: format!(
+ "Out-of-bounds while reading {context}: start={start},
len={len}, payload={}",
+ self.data.len()
+ ),
+ });
+ }
+ Ok(&self.data[start..end])
+ }
+
+ fn checked_element_offset(
+ &self,
+ pos: usize,
+ element_size: usize,
+ context: &str,
+ ) -> Result<usize> {
+ if pos >= self.size {
+ return Err(IllegalArgument {
+ message: format!(
+ "Array element index out of bounds while reading
{context}: pos={pos}, size={}",
+ self.size
+ ),
+ });
+ }
+ let rel = pos.checked_mul(element_size).ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Overflow while calculating array element offset for
{context}: pos={pos}, element_size={element_size}"
+ ),
+ })?;
+ self.element_offset
+ .checked_add(rel)
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Overflow while adding base offset for {context}: base={},
rel={rel}",
+ self.element_offset
+ ),
+ })
+ }
+
+ fn read_fixed_bytes(&self, pos: usize, len: usize, context: &str) ->
Result<&[u8]> {
+ let offset = self.checked_element_offset(pos, len, context)?;
+ self.checked_slice(offset, len, context)
+ }
+
+ fn read_i16(&self, pos: usize, context: &str) -> Result<i16> {
+ let bytes = self.read_fixed_bytes(pos, 2, context)?;
+ Ok(i16::from_le_bytes([bytes[0], bytes[1]]))
+ }
+
+ fn read_i32(&self, pos: usize, context: &str) -> Result<i32> {
+ let bytes = self.read_fixed_bytes(pos, 4, context)?;
+ Ok(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
+ }
+
+ fn read_i64(&self, pos: usize, context: &str) -> Result<i64> {
+ let bytes = self.read_fixed_bytes(pos, 8, context)?;
+ let mut buf = [0_u8; 8];
+ buf.copy_from_slice(bytes);
+ Ok(i64::from_le_bytes(buf))
+ }
+
+ fn read_i64_at_offset(&self, offset: usize, context: &str) -> Result<i64> {
+ let bytes = self.checked_slice(offset, 8, context)?;
+ let mut buf = [0_u8; 8];
+ buf.copy_from_slice(bytes);
+ Ok(i64::from_le_bytes(buf))
+ }
+
+ fn read_var_len_span(&self, pos: usize) -> Result<(usize, usize)> {
+ let field_offset = self.checked_element_offset(pos, 8,
"variable-length array element")?;
+ let packed = self.read_i64(pos, "variable-length array element")? as
u64;
+ let mark = packed & HIGHEST_FIRST_BIT;
+
+ if mark == 0 {
+ let offset = (packed >> 32) as usize;
+ let len = (packed & 0xFFFF_FFFF) as usize;
+ let _ = self.checked_slice(offset, len, "variable-length array
element")?;
+ Ok((offset, len))
+ } else {
+ let len = ((packed & HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56) as usize;
+ if len > MAX_FIX_PART_DATA_SIZE {
+ return Err(IllegalArgument {
+ message: format!(
+ "Inline array element length must be <=
{MAX_FIX_PART_DATA_SIZE}, got {len}"
+ ),
+ });
+ }
+ // Java stores inline bytes in the 8-byte slot itself.
+ // On little-endian, bytes start at field_offset; on big-endian
they start at +1.
+ let start = if cfg!(target_endian = "little") {
+ field_offset
+ } else {
+ field_offset + 1
+ };
+ let _ = self.checked_slice(start, len, "inline array element")?;
+ Ok((start, len))
+ }
+ }
+
+ fn read_var_len_bytes(&self, pos: usize) -> Result<&[u8]> {
+ let (start, len) = self.read_var_len_span(pos)?;
+ Ok(&self.data[start..start + len])
+ }
+
+ pub fn get_boolean(&self, pos: usize) -> Result<bool> {
+ let bytes = self.read_fixed_bytes(pos, 1, "boolean array element")?;
+ Ok(bytes[0] != 0)
+ }
+
+ pub fn get_byte(&self, pos: usize) -> Result<i8> {
+ let bytes = self.read_fixed_bytes(pos, 1, "byte array element")?;
+ Ok(bytes[0] as i8)
+ }
+
+ pub fn get_short(&self, pos: usize) -> Result<i16> {
+ self.read_i16(pos, "short array element")
+ }
+
+ pub fn get_int(&self, pos: usize) -> Result<i32> {
+ self.read_i32(pos, "int array element")
+ }
+
+ pub fn get_long(&self, pos: usize) -> Result<i64> {
+ self.read_i64(pos, "long array element")
+ }
+
+ pub fn get_float(&self, pos: usize) -> Result<f32> {
+ let bits = self.read_i32(pos, "float array element")? as u32;
+ Ok(f32::from_bits(bits))
+ }
+
+ pub fn get_double(&self, pos: usize) -> Result<f64> {
+ let bits = self.read_i64(pos, "double array element")? as u64;
+ Ok(f64::from_bits(bits))
+ }
+
+ /// Reads the offset_and_size packed long for variable-length elements.
+ fn get_offset_and_size(&self, pos: usize) -> Result<(usize, usize)> {
+ let packed = self.get_long(pos)? as u64;
+ let offset = (packed >> 32) as usize;
+ let size = (packed & 0xFFFF_FFFF) as usize;
+ Ok((offset, size))
+ }
+
+ pub fn get_string(&self, pos: usize) -> Result<&str> {
+ let bytes = self.read_var_len_bytes(pos)?;
+ std::str::from_utf8(bytes).map_err(|e| IllegalArgument {
+ message: format!("Invalid UTF-8 in array element at position
{pos}: {e}"),
+ })
+ }
+
+ pub fn get_binary(&self, pos: usize) -> Result<&[u8]> {
+ self.read_var_len_bytes(pos)
+ }
+
+ pub fn get_decimal(&self, pos: usize, precision: u32, scale: u32) ->
Result<Decimal> {
+ if Decimal::is_compact_precision(precision) {
+ let unscaled = self.get_long(pos)?;
+ Decimal::from_unscaled_long(unscaled, precision, scale)
+ } else {
+ let (offset, size) = self.get_offset_and_size(pos)?;
+ let bytes = self.checked_slice(offset, size, "decimal bytes")?;
+ Decimal::from_unscaled_bytes(bytes, precision, scale)
+ }
+ }
+
+ pub fn get_date(&self, pos: usize) -> Result<Date> {
+ Ok(Date::new(self.get_int(pos)?))
+ }
+
+ pub fn get_time(&self, pos: usize) -> Result<Time> {
+ Ok(Time::new(self.get_int(pos)?))
+ }
+
+ pub fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz> {
+ if TimestampNtz::is_compact(precision) {
+ Ok(TimestampNtz::new(self.get_long(pos)?))
+ } else {
+ let (offset, nanos_of_millis) = self.get_offset_and_size(pos)?;
+ let millis = self.read_i64_at_offset(offset, "timestamp ntz
millis")?;
+ TimestampNtz::from_millis_nanos(millis, nanos_of_millis as i32)
+ }
+ }
+
+ pub fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz> {
+ if TimestampLtz::is_compact(precision) {
+ Ok(TimestampLtz::new(self.get_long(pos)?))
+ } else {
+ let (offset, nanos_of_millis) = self.get_offset_and_size(pos)?;
+ let millis = self.read_i64_at_offset(offset, "timestamp ltz
millis")?;
+ TimestampLtz::from_millis_nanos(millis, nanos_of_millis as i32)
+ }
+ }
+
+ pub fn get_array(&self, pos: usize) -> Result<FlussArray> {
+ let (start, len) = self.read_var_len_span(pos)?;
+ FlussArray::from_owned_bytes(self.data.slice(start..start + len))
+ }
+}
+
+/// Writer for building a `FlussArray` element by element.
+/// Matches Java's `BinaryArrayWriter`.
+pub struct FlussArrayWriter {
+ data: Vec<u8>,
+ null_bits_offset: usize,
+ element_offset: usize,
+ element_size: usize,
+ cursor: usize,
+ num_elements: usize,
+}
+
+impl FlussArrayWriter {
+ /// Creates a new writer for an array with `num_elements` elements of the
given element type.
+ pub fn new(num_elements: usize, element_type: &DataType) -> Self {
+ let element_size = calculate_fix_length_part_size(element_type);
+ Self::with_element_size(num_elements, element_size)
+ }
+
+ /// Creates a new writer with an explicit element size (in bytes).
+ pub fn with_element_size(num_elements: usize, element_size: usize) -> Self
{
+ let header_in_bytes = calculate_header_in_bytes(num_elements);
+ let fixed_size = round_to_nearest_word(header_in_bytes + element_size
* num_elements);
+ let mut data = vec![0u8; fixed_size];
+
+ // Java's MemorySegment.putInt() stores little-endian.
+ data[0..4].copy_from_slice(&(num_elements as i32).to_le_bytes());
+
+ FlussArrayWriter {
+ data,
+ null_bits_offset: 4,
+ element_offset: header_in_bytes,
+ element_size,
+ cursor: fixed_size,
+ num_elements,
+ }
+ }
+
+ fn get_element_offset(&self, pos: usize) -> usize {
+ self.element_offset + self.element_size * pos
+ }
+
+ /// Sets the null bit for the element at position `pos`.
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ self.data[self.null_bits_offset + byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, pos: usize, value: bool) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset] = if value { 1 } else { 0 };
+ }
+
+ pub fn write_byte(&mut self, pos: usize, value: i8) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset] = value as u8;
+ }
+
+ pub fn write_short(&mut self, pos: usize, value: i16) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
+ }
+
+ pub fn write_int(&mut self, pos: usize, value: i32) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+ }
+
+ pub fn write_long(&mut self, pos: usize, value: i64) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+ }
+
+ pub fn write_float(&mut self, pos: usize, value: f32) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+ }
+
+ pub fn write_double(&mut self, pos: usize, value: f64) {
+ let offset = self.get_element_offset(pos);
+ self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+ }
+
+ /// Writes variable-length bytes to the variable part and stores
offset+size in the fixed slot.
+ fn write_bytes_to_var_len_part(&mut self, pos: usize, bytes: &[u8]) {
+ let rounded = round_to_nearest_word(bytes.len());
+ let var_offset = self.cursor;
+ self.data.resize(self.data.len() + rounded, 0);
+ self.data[var_offset..var_offset + bytes.len()].copy_from_slice(bytes);
+ self.set_offset_and_size(pos, var_offset, bytes.len());
+ self.cursor += rounded;
+ }
+
+ fn set_offset_and_size(&mut self, pos: usize, offset: usize, size: usize) {
+ let packed = ((offset as i64) << 32) | (size as i64);
+ self.write_long(pos, packed);
+ }
+
+ fn write_bytes_to_fix_len_part(&mut self, pos: usize, bytes: &[u8]) {
+ let len = bytes.len();
+ debug_assert!(len <= MAX_FIX_PART_DATA_SIZE);
+ let first_byte = (len as u64) | 0x80;
+ let mut seven_bytes = 0_u64;
+ if cfg!(target_endian = "little") {
+ for (i, b) in bytes.iter().enumerate() {
+ seven_bytes |= ((*b as u64) & 0xFF) << (i * 8);
+ }
+ } else {
+ for (i, b) in bytes.iter().enumerate() {
+ seven_bytes |= ((*b as u64) & 0xFF) << ((6 - i) * 8);
+ }
+ }
+ let packed = ((first_byte << 56) | seven_bytes) as i64;
+ self.write_long(pos, packed);
+ }
+
+ pub fn write_string(&mut self, pos: usize, value: &str) {
+ let bytes = value.as_bytes();
+ if bytes.len() <= MAX_FIX_PART_DATA_SIZE {
+ self.write_bytes_to_fix_len_part(pos, bytes);
+ } else {
+ self.write_bytes_to_var_len_part(pos, bytes);
+ }
+ }
+
+ pub fn write_binary_bytes(&mut self, pos: usize, value: &[u8]) {
+ if value.len() <= MAX_FIX_PART_DATA_SIZE {
+ self.write_bytes_to_fix_len_part(pos, value);
+ } else {
+ self.write_bytes_to_var_len_part(pos, value);
+ }
+ }
+
+ pub fn write_decimal(&mut self, pos: usize, value: &Decimal, precision:
u32) {
+ if Decimal::is_compact_precision(precision) {
+ self.write_long(
+ pos,
+ value
+ .to_unscaled_long()
+ .expect("Decimal should fit in i64 for compact precision"),
+ );
+ } else {
+ let bytes = value.to_unscaled_bytes();
+ self.write_bytes_to_var_len_part(pos, &bytes);
+ }
+ }
+
+ pub fn write_date(&mut self, pos: usize, value: Date) {
+ self.write_int(pos, value.get_inner());
+ }
+
+ pub fn write_time(&mut self, pos: usize, value: Time) {
+ self.write_int(pos, value.get_inner());
+ }
+
+ pub fn write_timestamp_ntz(&mut self, pos: usize, value: &TimestampNtz,
precision: u32) {
+ if TimestampNtz::is_compact(precision) {
+ self.write_long(pos, value.get_millisecond());
+ } else {
+ let millis_bytes = value.get_millisecond().to_le_bytes();
+ let var_offset = self.cursor;
+ let rounded = round_to_nearest_word(8);
+ self.data.resize(self.data.len() + rounded, 0);
+ self.data[var_offset..var_offset +
8].copy_from_slice(&millis_bytes);
+ self.set_offset_and_size(pos, var_offset,
value.get_nano_of_millisecond() as usize);
+ self.cursor += rounded;
+ }
+ }
+
+ pub fn write_timestamp_ltz(&mut self, pos: usize, value: &TimestampLtz,
precision: u32) {
+ if TimestampLtz::is_compact(precision) {
+ self.write_long(pos, value.get_epoch_millisecond());
+ } else {
+ let millis_bytes = value.get_epoch_millisecond().to_le_bytes();
+ let var_offset = self.cursor;
+ let rounded = round_to_nearest_word(8);
+ self.data.resize(self.data.len() + rounded, 0);
+ self.data[var_offset..var_offset +
8].copy_from_slice(&millis_bytes);
+ self.set_offset_and_size(pos, var_offset,
value.get_nano_of_millisecond() as usize);
+ self.cursor += rounded;
+ }
+ }
+
+ /// Writes a nested FlussArray into this array at position `pos`.
+ pub fn write_array(&mut self, pos: usize, value: &FlussArray) {
+ self.write_bytes_to_var_len_part(pos, value.as_bytes());
+ }
+
+ /// Finalizes the writer and returns the completed FlussArray.
+ pub fn complete(self) -> Result<FlussArray> {
+ let mut data = self.data;
+ data.truncate(self.cursor);
+ FlussArray::from_vec(data)
+ }
+
+ /// Returns the number of elements this writer was initialized with.
+ pub fn num_elements(&self) -> usize {
+ self.num_elements
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+
+ #[test]
+ fn test_header_calculation() {
+ assert_eq!(calculate_header_in_bytes(0), 4);
+ assert_eq!(calculate_header_in_bytes(1), 8);
+ assert_eq!(calculate_header_in_bytes(31), 8);
+ assert_eq!(calculate_header_in_bytes(32), 8);
+ assert_eq!(calculate_header_in_bytes(33), 12);
+ assert_eq!(calculate_header_in_bytes(64), 12);
+ assert_eq!(calculate_header_in_bytes(65), 16);
+ }
+
+ #[test]
+ fn test_fix_length_part_size() {
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::boolean()), 1);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::tinyint()), 1);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::smallint()), 2);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::int()), 4);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::bigint()), 8);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::float()), 4);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::double()), 8);
+ assert_eq!(calculate_fix_length_part_size(&DataTypes::string()), 8);
+ assert_eq!(
+
calculate_fix_length_part_size(&DataTypes::array(DataTypes::int())),
+ 8
+ );
+ }
+
+ #[test]
+ fn test_round_trip_int_array() {
+ let elem_type = DataTypes::int();
+ let mut writer = FlussArrayWriter::new(3, &elem_type);
+ writer.write_int(0, 10);
+ writer.write_int(1, 20);
+ writer.write_int(2, 30);
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.size(), 3);
+ assert!(!array.is_null_at(0));
+ assert_eq!(array.get_int(0).unwrap(), 10);
+ assert_eq!(array.get_int(1).unwrap(), 20);
+ assert_eq!(array.get_int(2).unwrap(), 30);
+ }
+
+ #[test]
+ fn test_round_trip_with_nulls() {
+ let elem_type = DataTypes::int();
+ let mut writer = FlussArrayWriter::new(3, &elem_type);
+ writer.write_int(0, 1);
+ writer.set_null_at(1);
+ writer.write_int(2, 3);
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.size(), 3);
+ assert!(!array.is_null_at(0));
+ assert!(array.is_null_at(1));
+ assert!(!array.is_null_at(2));
+ assert_eq!(array.get_int(0).unwrap(), 1);
+ assert_eq!(array.get_int(2).unwrap(), 3);
+ }
+
+ #[test]
+ fn test_round_trip_string_array() {
+ let elem_type = DataTypes::string();
+ let mut writer = FlussArrayWriter::new(3, &elem_type);
+ writer.write_string(0, "hello");
+ writer.write_string(1, "world");
+ writer.write_string(2, "!");
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.size(), 3);
+ assert_eq!(array.get_string(0).unwrap(), "hello");
+ assert_eq!(array.get_string(1).unwrap(), "world");
+ assert_eq!(array.get_string(2).unwrap(), "!");
+ }
+
+ #[test]
+ fn test_java_inline_short_string_decoding() {
+ // Manually construct Java-style inline encoded short string ("abc")
+ // slot payload: [len|0x80 in top byte] + [bytes in low 7 bytes on
little-endian]
+ let mut data = vec![0_u8; 16];
+ data[0..4].copy_from_slice(&(1_i32).to_le_bytes());
+ // null bits remain 0
+ let first_byte = (3_u64 | 0x80) << 56;
+ let seven_bytes = (b'a' as u64) | ((b'b' as u64) << 8) | ((b'c' as
u64) << 16);
+ let packed = first_byte | seven_bytes;
+ data[8..16].copy_from_slice(&packed.to_le_bytes());
+
+ let arr = FlussArray::from_bytes(&data).unwrap();
+ assert_eq!(arr.size(), 1);
+ assert_eq!(arr.get_string(0).unwrap(), "abc");
+ }
+
+ #[test]
+ fn test_java_inline_short_binary_decoding() {
+ let elem_type = DataTypes::bytes();
+ let mut writer = FlussArrayWriter::new(1, &elem_type);
+ writer.write_binary_bytes(0, b"abc");
+ let arr = writer.complete().unwrap();
+ assert_eq!(arr.get_binary(0).unwrap(), b"abc");
+ }
+
+ #[test]
+ fn test_round_trip_empty_array() {
+ let elem_type = DataTypes::int();
+ let writer = FlussArrayWriter::new(0, &elem_type);
+ let array = writer.complete().unwrap();
+ assert_eq!(array.size(), 0);
+ }
+
+ #[test]
+ fn test_round_trip_boolean_array() {
+ let elem_type = DataTypes::boolean();
+ let mut writer = FlussArrayWriter::new(3, &elem_type);
+ writer.write_boolean(0, true);
+ writer.write_boolean(1, false);
+ writer.write_boolean(2, true);
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.size(), 3);
+ assert!(array.get_boolean(0).unwrap());
+ assert!(!array.get_boolean(1).unwrap());
+ assert!(array.get_boolean(2).unwrap());
+ }
+
+ #[test]
+ fn test_round_trip_long_array() {
+ let elem_type = DataTypes::bigint();
+ let mut writer = FlussArrayWriter::new(2, &elem_type);
+ writer.write_long(0, i64::MAX);
+ writer.write_long(1, i64::MIN);
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.get_long(0).unwrap(), i64::MAX);
+ assert_eq!(array.get_long(1).unwrap(), i64::MIN);
+ }
+
+ #[test]
+ fn test_round_trip_double_array() {
+ let elem_type = DataTypes::double();
+ let mut writer = FlussArrayWriter::new(2, &elem_type);
+ writer.write_double(0, 1.23);
+ writer.write_double(1, -4.56);
+ let array = writer.complete().unwrap();
+
+ assert_eq!(array.get_double(0).unwrap(), 1.23);
+ assert_eq!(array.get_double(1).unwrap(), -4.56);
+ }
+
+ #[test]
+ fn test_round_trip_nested_array() {
+ let inner_type = DataTypes::int();
+ let outer_type = DataTypes::array(DataTypes::int());
+
+ // Build inner array [1, 2]
+ let mut inner_writer = FlussArrayWriter::new(2, &inner_type);
+ inner_writer.write_int(0, 1);
+ inner_writer.write_int(1, 2);
+ let inner_array = inner_writer.complete().unwrap();
+
+ // Build outer array containing the inner array
+ let mut outer_writer = FlussArrayWriter::new(1, &outer_type);
+ outer_writer.write_array(0, &inner_array);
+ let outer_array = outer_writer.complete().unwrap();
+
+ assert_eq!(outer_array.size(), 1);
+ let nested = outer_array.get_array(0).unwrap();
+ assert_eq!(nested.size(), 2);
+ assert_eq!(nested.get_int(0).unwrap(), 1);
+ assert_eq!(nested.get_int(1).unwrap(), 2);
+ }
+
+ #[test]
+ fn test_primitive_getter_out_of_bounds_returns_error() {
+ let elem_type = DataTypes::int();
+ let mut writer = FlussArrayWriter::new(1, &elem_type);
+ writer.write_int(0, 10);
+ let array = writer.complete().unwrap();
+
+ let err = array.get_int(1).unwrap_err();
+ assert!(
+ err.to_string().contains("out of bounds"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_primitive_getter_on_malformed_payload_returns_error() {
+ // Size says 1, but payload only contains header (no element bytes).
+ let mut data = vec![0_u8; 8];
+ data[0..4].copy_from_slice(&(1_i32).to_le_bytes());
+ let arr = FlussArray::from_bytes(&data).unwrap();
+
+ let err = arr.get_int(0).unwrap_err();
+ assert!(
+ err.to_string().contains("Out-of-bounds"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_binary_layout_matches_java() {
+ // Verify exact byte layout for a simple [1, 2, 3] int array
+ let elem_type = DataTypes::int();
+ let mut writer = FlussArrayWriter::new(3, &elem_type);
+ writer.write_int(0, 1);
+ writer.write_int(1, 2);
+ writer.write_int(2, 3);
+ let array = writer.complete().unwrap();
+ let bytes = array.as_bytes();
+
+ // size = 3 at offset 0 (4 bytes, little-endian per Java
MemorySegment.putInt)
+ assert_eq!(i32::from_le_bytes(bytes[0..4].try_into().unwrap()), 3);
+ // null bits: 4 bytes starting at offset 4, should be all zeros
+ assert_eq!(&bytes[4..8], &[0, 0, 0, 0]);
+ // elements start at offset 8 (header = 4 + 4), each 4 bytes
(little-endian)
+ assert_eq!(i32::from_le_bytes(bytes[8..12].try_into().unwrap()), 1);
+ assert_eq!(i32::from_le_bytes(bytes[12..16].try_into().unwrap()), 2);
+ assert_eq!(i32::from_le_bytes(bytes[16..20].try_into().unwrap()), 3);
+ }
+}
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index c07fe97..4a3e708 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -19,7 +19,10 @@ use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::row::InternalRow;
use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
-use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray};
+use arrow::array::{
+ Array, AsArray, BinaryArray, BooleanArray, FixedSizeBinaryArray,
ListArray, RecordBatch,
+ StringArray,
+};
use arrow::datatypes::{
DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type,
Float64Type, Int8Type,
Int16Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType,
@@ -407,17 +410,379 @@ impl InternalRow for ColumnarRow {
})?
.value(self.row_id))
}
+
+ fn get_array(&self, pos: usize) -> Result<crate::row::FlussArray> {
+ use crate::record::from_arrow_type;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let column = self.column(pos)?;
+ let list_array =
+ column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("expected List array at position {pos}"),
+ })?;
+
+ let values = list_array.value(self.row_id);
+ let element_fluss_type = from_arrow_type(values.data_type())?;
+ let mut writer = FlussArrayWriter::new(values.len(),
&element_fluss_type);
+
+ write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut
writer)?;
+ writer.complete()
+ }
+}
+
+/// Downcast to a primitive Arrow array type, then loop with null checks
calling a writer method.
+macro_rules! write_primitive_elements {
+ ($values:expr, $arrow_type:ty, $element_type:expr, $writer:expr,
$write_method:ident) => {{
+ let arr = $values
+ .as_primitive_opt::<$arrow_type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Expected {} for {:?} element",
+ stringify!($arrow_type),
+ $element_type
+ ),
+ })?;
+ for i in 0..arr.len() {
+ if arr.is_null(i) {
+ $writer.set_null_at(i);
+ } else {
+ $writer.$write_method(i, arr.value(i));
+ }
+ }
+ }};
+}
+
+/// Downcast via `downcast_ref`, then loop with null checks calling a writer
method.
+macro_rules! write_downcast_elements {
+ ($values:expr, $array_type:ty, $element_type:expr, $writer:expr,
$write_method:ident) => {{
+ let arr = $values
+ .as_any()
+ .downcast_ref::<$array_type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Expected {} for {:?} element",
+ stringify!($array_type),
+ $element_type
+ ),
+ })?;
+ for i in 0..arr.len() {
+ if arr.is_null(i) {
+ $writer.set_null_at(i);
+ } else {
+ $writer.$write_method(i, arr.value(i));
+ }
+ }
+ }};
+}
+
+/// Converts all elements of an Arrow array into a `FlussArrayWriter`,
downcasting
+/// the Arrow array once per call rather than per element.
+fn write_arrow_values_to_fluss_array(
+ values: &dyn Array,
+ element_type: &crate::metadata::DataType,
+ writer: &mut crate::row::binary_array::FlussArrayWriter,
+) -> Result<()> {
+ use crate::metadata::DataType;
+ use crate::record::from_arrow_type;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let len = values.len();
+
+ match element_type {
+ DataType::Boolean(_) => {
+ write_downcast_elements!(values, BooleanArray, element_type,
writer, write_boolean)
+ }
+ DataType::TinyInt(_) => {
+ write_primitive_elements!(values, Int8Type, element_type, writer,
write_byte)
+ }
+ DataType::SmallInt(_) => {
+ write_primitive_elements!(values, Int16Type, element_type, writer,
write_short)
+ }
+ DataType::Int(_) => {
+ write_primitive_elements!(values, Int32Type, element_type, writer,
write_int)
+ }
+ DataType::BigInt(_) => {
+ write_primitive_elements!(values, Int64Type, element_type, writer,
write_long)
+ }
+ DataType::Float(_) => {
+ write_primitive_elements!(values, Float32Type, element_type,
writer, write_float)
+ }
+ DataType::Double(_) => {
+ write_primitive_elements!(values, Float64Type, element_type,
writer, write_double)
+ }
+ DataType::Char(_) | DataType::String(_) => {
+ write_downcast_elements!(values, StringArray, element_type,
writer, write_string)
+ }
+ DataType::Binary(_) => {
+ write_downcast_elements!(
+ values,
+ FixedSizeBinaryArray,
+ element_type,
+ writer,
+ write_binary_bytes
+ )
+ }
+ DataType::Bytes(_) => {
+ write_downcast_elements!(
+ values,
+ BinaryArray,
+ element_type,
+ writer,
+ write_binary_bytes
+ )
+ }
+ DataType::Decimal(dt) => {
+ let arr =
+ values
+ .as_primitive_opt::<Decimal128Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("Expected Decimal128Array for
{element_type:?} element"),
+ })?;
+ let arrow_scale = match values.data_type() {
+ ArrowDataType::Decimal128(_p, s) => *s as i64,
+ other => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Expected Decimal128 data type for
{element_type:?} element, got {other:?}"
+ ),
+ });
+ }
+ };
+ let precision = dt.precision();
+ let scale = dt.scale();
+ for i in 0..len {
+ if arr.is_null(i) {
+ writer.set_null_at(i);
+ } else {
+ let d = crate::row::Decimal::from_arrow_decimal128(
+ arr.value(i),
+ arrow_scale,
+ precision,
+ scale,
+ )?;
+ writer.write_decimal(i, &d, precision);
+ }
+ }
+ }
+ DataType::Date(_) => {
+ let arr = values
+ .as_primitive_opt::<Date32Type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("Expected Date32Array for
{element_type:?} element"),
+ })?;
+ for i in 0..len {
+ if arr.is_null(i) {
+ writer.set_null_at(i);
+ } else {
+ writer.write_date(i, Date::new(arr.value(i)));
+ }
+ }
+ }
+ DataType::Time(_) => {
+ write_time_elements(values, element_type, writer)?;
+ }
+ DataType::Timestamp(ts_type) => {
+ write_timestamp_elements(
+ values,
+ element_type,
+ writer,
+ ts_type.precision(),
+ TimestampNtz::new,
+ TimestampNtz::from_millis_nanos,
+ |w, i, ts, p| w.write_timestamp_ntz(i, &ts, p),
+ )?;
+ }
+ DataType::TimestampLTz(ts_type) => {
+ write_timestamp_elements(
+ values,
+ element_type,
+ writer,
+ ts_type.precision(),
+ TimestampLtz::new,
+ TimestampLtz::from_millis_nanos,
+ |w, i, ts, p| w.write_timestamp_ltz(i, &ts, p),
+ )?;
+ }
+ DataType::Array(_) => {
+ let list_arr =
+ values
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("Expected ListArray for
{element_type:?} element"),
+ })?;
+ let nested_element_type = from_arrow_type(&list_arr.value_type())?;
+ for i in 0..len {
+ if list_arr.is_null(i) {
+ writer.set_null_at(i);
+ } else {
+ let nested_values = list_arr.value(i);
+ let mut nested_writer =
+ FlussArrayWriter::new(nested_values.len(),
&nested_element_type);
+ write_arrow_values_to_fluss_array(
+ &*nested_values,
+ &nested_element_type,
+ &mut nested_writer,
+ )?;
+ let nested_array = nested_writer.complete()?;
+ writer.write_array(i, &nested_array);
+ }
+ }
+ }
+ _ => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Unsupported element type for Arrow → FlussArray
conversion: {element_type:?}"
+ ),
+ });
+ }
+ }
+ Ok(())
+}
+
+fn write_time_elements(
+ values: &dyn Array,
+ element_type: &crate::metadata::DataType,
+ writer: &mut crate::row::binary_array::FlussArrayWriter,
+) -> Result<()> {
+ macro_rules! process_time {
+ ($arrow_type:ty, $to_millis:expr) => {{
+ let arr = values
+ .as_primitive_opt::<$arrow_type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Expected {} for {:?} element",
+ stringify!($arrow_type),
+ element_type
+ ),
+ })?;
+ for i in 0..arr.len() {
+ if arr.is_null(i) {
+ writer.set_null_at(i);
+ } else {
+ let to_millis_fn = $to_millis;
+ writer.write_time(i,
Time::new(to_millis_fn(arr.value(i))));
+ }
+ }
+ }};
+ }
+
+ match values.data_type() {
+ ArrowDataType::Time32(TimeUnit::Second) => {
+ process_time!(Time32SecondType, |v: i32| v * 1000);
+ }
+ ArrowDataType::Time32(TimeUnit::Millisecond) => {
+ process_time!(Time32MillisecondType, |v: i32| v);
+ }
+ ArrowDataType::Time64(TimeUnit::Microsecond) => {
+ process_time!(Time64MicrosecondType, |v: i64| (v / 1000) as i32);
+ }
+ ArrowDataType::Time64(TimeUnit::Nanosecond) => {
+ process_time!(Time64NanosecondType, |v: i64| (v / 1_000_000) as
i32);
+ }
+ other => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Expected Time column for {element_type:?} element, got
{other:?}"
+ ),
+ });
+ }
+ }
+ Ok(())
+}
+
+fn convert_timestamp_raw(raw: i64, unit: &TimeUnit) -> (i64, i32) {
+ match unit {
+ TimeUnit::Second => (raw * 1000, 0),
+ TimeUnit::Millisecond => (raw, 0),
+ TimeUnit::Microsecond => {
+ let millis = raw.div_euclid(1000);
+ let nanos = (raw.rem_euclid(1000) * 1000) as i32;
+ (millis, nanos)
+ }
+ TimeUnit::Nanosecond => {
+ let millis = raw.div_euclid(1_000_000);
+ let nanos = raw.rem_euclid(1_000_000) as i32;
+ (millis, nanos)
+ }
+ }
+}
+
+fn write_timestamp_elements<T>(
+ values: &dyn Array,
+ element_type: &crate::metadata::DataType,
+ writer: &mut crate::row::binary_array::FlussArrayWriter,
+ precision: u32,
+ construct_compact: impl Fn(i64) -> T,
+ construct_with_nanos: impl Fn(i64, i32) -> Result<T>,
+ write_fn: impl Fn(&mut crate::row::binary_array::FlussArrayWriter, usize,
T, u32),
+) -> Result<()> {
+ let unit = match values.data_type() {
+ ArrowDataType::Timestamp(unit, _) => unit,
+ other => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Expected Timestamp column for {element_type:?} element,
got {other:?}"
+ ),
+ });
+ }
+ };
+
+ macro_rules! process_ts {
+ ($arrow_type:ty) => {{
+ let arr = values
+ .as_primitive_opt::<$arrow_type>()
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "Expected {} for {:?} element",
+ stringify!($arrow_type),
+ element_type
+ ),
+ })?;
+ for i in 0..arr.len() {
+ if arr.is_null(i) {
+ writer.set_null_at(i);
+ continue;
+ }
+ let (millis, nanos) = convert_timestamp_raw(arr.value(i),
unit);
+ let ts = if nanos == 0 {
+ construct_compact(millis)
+ } else {
+ construct_with_nanos(millis, nanos)?
+ };
+ write_fn(writer, i, ts, precision);
+ }
+ }};
+ }
+
+ match unit {
+ TimeUnit::Second => process_ts!(TimestampSecondType),
+ TimeUnit::Millisecond => process_ts!(TimestampMillisecondType),
+ TimeUnit::Microsecond => process_ts!(TimestampMicrosecondType),
+ TimeUnit::Nanosecond => process_ts!(TimestampNanosecondType),
+ }
+ Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
- BinaryArray, BooleanArray, Decimal128Array, Float32Array,
Float64Array, Int8Array,
- Int16Array, Int32Array, Int64Array, StringArray,
+ ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float32Array,
Float64Array,
+ Int8Array, Int16Array, Int32Array, Int32Builder, Int64Array,
ListBuilder, StringArray,
+ UInt32Builder,
};
use arrow::datatypes::{DataType, Field, Schema};
+ fn single_column_row(array: ArrayRef) -> ColumnarRow {
+ let batch =
+ RecordBatch::try_from_iter(vec![("arr", array)]).expect("record
batch with one column");
+ ColumnarRow::new(Arc::new(batch))
+ }
+
#[test]
fn columnar_row_reads_values() {
let schema = Arc::new(Schema::new(vec![
@@ -533,4 +898,96 @@ mod tests {
.unwrap()
);
}
+
+ #[test]
+ fn columnar_row_get_array_int_roundtrip() {
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ builder.values().append_value(1);
+ builder.values().append_value(2);
+ builder.values().append_value(3);
+ builder.append(true);
+ let array = Arc::new(builder.finish()) as ArrayRef;
+
+ let row = single_column_row(array);
+ let arr = row.get_array(0).unwrap();
+ assert_eq!(arr.size(), 3);
+ assert_eq!(arr.get_int(0).unwrap(), 1);
+ assert_eq!(arr.get_int(1).unwrap(), 2);
+ assert_eq!(arr.get_int(2).unwrap(), 3);
+ }
+
+ #[test]
+ fn columnar_row_get_array_with_nulls() {
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ builder.values().append_value(1);
+ builder.values().append_null();
+ builder.values().append_value(3);
+ builder.append(true);
+ let array = Arc::new(builder.finish()) as ArrayRef;
+
+ let row = single_column_row(array);
+ let arr = row.get_array(0).unwrap();
+ assert_eq!(arr.size(), 3);
+ assert_eq!(arr.get_int(0).unwrap(), 1);
+ assert!(arr.is_null_at(1));
+ assert_eq!(arr.get_int(2).unwrap(), 3);
+ }
+
+ #[test]
+ fn columnar_row_get_array_nested_array() {
+ let mut outer =
ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+ // first nested array: [1, 2]
+ outer.values().values().append_value(1);
+ outer.values().values().append_value(2);
+ outer.values().append(true);
+
+ // second nested array: [99]
+ outer.values().values().append_value(99);
+ outer.values().append(true);
+
+ // one row containing two nested arrays
+ outer.append(true);
+ let array = Arc::new(outer.finish()) as ArrayRef;
+
+ let row = single_column_row(array);
+ let arr = row.get_array(0).unwrap();
+ assert_eq!(arr.size(), 2);
+
+ let nested0 = arr.get_array(0).unwrap();
+ assert_eq!(nested0.size(), 2);
+ assert_eq!(nested0.get_int(0).unwrap(), 1);
+ assert_eq!(nested0.get_int(1).unwrap(), 2);
+
+ let nested1 = arr.get_array(1).unwrap();
+ assert_eq!(nested1.size(), 1);
+ assert_eq!(nested1.get_int(0).unwrap(), 99);
+ }
+
+ #[test]
+ fn columnar_row_get_array_non_list_column_returns_error() {
+ let array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+ let row = single_column_row(array);
+ let err = row.get_array(0).unwrap_err();
+ assert!(
+ err.to_string().contains("expected List array"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn columnar_row_get_array_unsupported_element_type_returns_error() {
+ let mut builder = ListBuilder::new(UInt32Builder::new());
+ builder.values().append_value(7);
+ builder.append(true);
+ let array = Arc::new(builder.finish()) as ArrayRef;
+
+ let row = single_column_row(array);
+ let err = row.get_array(0).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Cannot convert Arrow type to Fluss type"),
+ "unexpected error: {err}"
+ );
+ }
}
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
index 339e366..c694065 100644
--- a/crates/fluss/src/row/compacted/compacted_key_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -47,6 +47,15 @@ impl CompactedKeyWriter {
}
pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
+ // Java's CompactedKeyEncoder allows encoding Array types (Map/Row
+ // are not yet supported by ValueWriter). The server rejects
+ // unsupported key types at table-creation time, so encoding is
+ // allowed here to match Java parity.
+ if matches!(field_type, DataType::Map(_) | DataType::Row(_)) {
+ return Err(crate::error::Error::IllegalArgument {
+ message: format!("Cannot use {field_type:?} as a key column
type"),
+ });
+ }
ValueWriter::create_value_writer(field_type,
Some(&BinaryRowFormat::Compacted))
}
@@ -101,6 +110,8 @@ impl BinaryWriter for CompactedKeyWriter {
fn write_timestamp_ntz(&mut self, value:
&crate::row::datum::TimestampNtz, precision: u32);
fn write_timestamp_ltz(&mut self, value:
&crate::row::datum::TimestampLtz, precision: u32);
+
+ fn write_array(&mut self, value: &[u8]);
}
}
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index 918ebdf..267ae13 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -68,9 +68,16 @@ impl<'a> CompactedRow<'a> {
self.size_in_bytes
}
- fn decoded_row(&self) -> &GenericRow<'_> {
- self.decoded_row
- .get_or_init(|| self.deserializer.deserialize(&self.reader))
+ fn decoded_row(&self) -> Result<&GenericRow<'_>> {
+ if let Some(row) = self.decoded_row.get() {
+ return Ok(row);
+ }
+
+ // `OnceLock::get_or_try_init` is still unstable on our toolchain.
+ // Keep the same semantics by performing the fallible decode first,
+ // then atomically installing it via `get_or_init`.
+ let decoded = self.deserializer.deserialize(&self.reader)?;
+ Ok(self.decoded_row.get_or_init(|| decoded))
}
pub fn as_bytes(&self) -> &[u8] {
@@ -97,67 +104,71 @@ impl<'a> InternalRow for CompactedRow<'a> {
}
fn get_boolean(&self, pos: usize) -> Result<bool> {
- self.decoded_row().get_boolean(pos)
+ self.decoded_row()?.get_boolean(pos)
}
fn get_byte(&self, pos: usize) -> Result<i8> {
- self.decoded_row().get_byte(pos)
+ self.decoded_row()?.get_byte(pos)
}
fn get_short(&self, pos: usize) -> Result<i16> {
- self.decoded_row().get_short(pos)
+ self.decoded_row()?.get_short(pos)
}
fn get_int(&self, pos: usize) -> Result<i32> {
- self.decoded_row().get_int(pos)
+ self.decoded_row()?.get_int(pos)
}
fn get_long(&self, pos: usize) -> Result<i64> {
- self.decoded_row().get_long(pos)
+ self.decoded_row()?.get_long(pos)
}
fn get_float(&self, pos: usize) -> Result<f32> {
- self.decoded_row().get_float(pos)
+ self.decoded_row()?.get_float(pos)
}
fn get_double(&self, pos: usize) -> Result<f64> {
- self.decoded_row().get_double(pos)
+ self.decoded_row()?.get_double(pos)
}
fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
- self.decoded_row().get_char(pos, length)
+ self.decoded_row()?.get_char(pos, length)
}
fn get_string(&self, pos: usize) -> Result<&str> {
- self.decoded_row().get_string(pos)
+ self.decoded_row()?.get_string(pos)
}
fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
Result<Decimal> {
- self.decoded_row().get_decimal(pos, precision, scale)
+ self.decoded_row()?.get_decimal(pos, precision, scale)
}
fn get_date(&self, pos: usize) -> Result<Date> {
- self.decoded_row().get_date(pos)
+ self.decoded_row()?.get_date(pos)
}
fn get_time(&self, pos: usize) -> Result<Time> {
- self.decoded_row().get_time(pos)
+ self.decoded_row()?.get_time(pos)
}
fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz> {
- self.decoded_row().get_timestamp_ntz(pos, precision)
+ self.decoded_row()?.get_timestamp_ntz(pos, precision)
}
fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz> {
- self.decoded_row().get_timestamp_ltz(pos, precision)
+ self.decoded_row()?.get_timestamp_ltz(pos, precision)
}
fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
- self.decoded_row().get_binary(pos, length)
+ self.decoded_row()?.get_binary(pos, length)
}
fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
- self.decoded_row().get_bytes(pos)
+ self.decoded_row()?.get_bytes(pos)
+ }
+
+ fn get_array(&self, pos: usize) -> Result<crate::row::FlussArray> {
+ self.decoded_row()?.get_array(pos)
}
fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
@@ -327,4 +338,157 @@ mod tests {
999999999999999999i64
);
}
+
+ #[test]
+ fn test_compacted_row_int_array() {
+ use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let row_type =
+ RowType::with_data_types(vec![DataTypes::int(),
DataTypes::array(DataTypes::int())]);
+
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
+ writer.write_int(42);
+
+ let elem_type = DataTypes::int();
+ let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+ arr_writer.write_int(0, 1);
+ arr_writer.write_int(1, 2);
+ arr_writer.write_int(2, 3);
+ let arr = arr_writer.complete().unwrap();
+ writer.write_array(arr.as_bytes());
+
+ let bytes = writer.to_bytes();
+ let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+ assert_eq!(row.get_int(0).unwrap(), 42);
+ let read_arr = row.get_array(1).unwrap();
+ assert_eq!(read_arr.size(), 3);
+ assert_eq!(read_arr.get_int(0).unwrap(), 1);
+ assert_eq!(read_arr.get_int(1).unwrap(), 2);
+ assert_eq!(read_arr.get_int(2).unwrap(), 3);
+ }
+
+ #[test]
+ fn test_compacted_row_string_array() {
+ use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let row_type =
RowType::with_data_types(vec![DataTypes::array(DataTypes::string())]);
+
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+ let elem_type = DataTypes::string();
+ let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+ arr_writer.write_string(0, "hello");
+ arr_writer.write_string(1, "fluss");
+ arr_writer.write_string(2, "rust");
+ let arr = arr_writer.complete().unwrap();
+ writer.write_array(arr.as_bytes());
+
+ let bytes = writer.to_bytes();
+ let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+ let read_arr = row.get_array(0).unwrap();
+ assert_eq!(read_arr.size(), 3);
+ assert_eq!(read_arr.get_string(0).unwrap(), "hello");
+ assert_eq!(read_arr.get_string(1).unwrap(), "fluss");
+ assert_eq!(read_arr.get_string(2).unwrap(), "rust");
+ }
+
+ #[test]
+ fn test_compacted_row_array_with_nulls() {
+ use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let row_type =
RowType::with_data_types(vec![DataTypes::array(DataTypes::int())]);
+
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+ let elem_type = DataTypes::int();
+ let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+ arr_writer.write_int(0, 10);
+ arr_writer.set_null_at(1);
+ arr_writer.write_int(2, 30);
+ let arr = arr_writer.complete().unwrap();
+ writer.write_array(arr.as_bytes());
+
+ let bytes = writer.to_bytes();
+ let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+ let read_arr = row.get_array(0).unwrap();
+ assert_eq!(read_arr.size(), 3);
+ assert!(!read_arr.is_null_at(0));
+ assert_eq!(read_arr.get_int(0).unwrap(), 10);
+ assert!(read_arr.is_null_at(1));
+ assert!(!read_arr.is_null_at(2));
+ assert_eq!(read_arr.get_int(2).unwrap(), 30);
+ }
+
+ #[test]
+ fn test_compacted_row_empty_array() {
+ use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let row_type =
RowType::with_data_types(vec![DataTypes::array(DataTypes::int())]);
+
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+ let elem_type = DataTypes::int();
+ let arr_writer = FlussArrayWriter::new(0, &elem_type);
+ let arr = arr_writer.complete().unwrap();
+ writer.write_array(arr.as_bytes());
+
+ let bytes = writer.to_bytes();
+ let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+ let read_arr = row.get_array(0).unwrap();
+ assert_eq!(read_arr.size(), 0);
+ }
+
+ #[test]
+ fn test_compacted_row_nested_array() {
+ use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ let row_type =
+
RowType::with_data_types(vec![DataTypes::array(DataTypes::array(DataTypes::int()))]);
+
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+ // Build inner arrays
+ let inner_type = DataTypes::int();
+ let mut inner1 = FlussArrayWriter::new(2, &inner_type);
+ inner1.write_int(0, 1);
+ inner1.write_int(1, 2);
+ let inner1_arr = inner1.complete().unwrap();
+
+ let mut inner2 = FlussArrayWriter::new(1, &inner_type);
+ inner2.write_int(0, 99);
+ let inner2_arr = inner2.complete().unwrap();
+
+ // Build outer array
+ let outer_type = DataTypes::array(DataTypes::int());
+ let mut outer_writer = FlussArrayWriter::new(2, &outer_type);
+ outer_writer.write_array(0, &inner1_arr);
+ outer_writer.write_array(1, &inner2_arr);
+ let outer_arr = outer_writer.complete().unwrap();
+
+ writer.write_array(outer_arr.as_bytes());
+
+ let bytes = writer.to_bytes();
+ let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+ let read_outer = row.get_array(0).unwrap();
+ assert_eq!(read_outer.size(), 2);
+
+ let nested1 = read_outer.get_array(0).unwrap();
+ assert_eq!(nested1.size(), 2);
+ assert_eq!(nested1.get_int(0).unwrap(), 1);
+ assert_eq!(nested1.get_int(1).unwrap(), 2);
+
+ let nested2 = read_outer.get_array(1).unwrap();
+ assert_eq!(nested2.size(), 1);
+ assert_eq!(nested2.get_int(0).unwrap(), 99);
+ }
}
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 00e53aa..4ae442f 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -18,6 +18,7 @@
use crate::metadata::RowType;
use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::{
+ error::{Error::IllegalArgument, Result},
metadata::DataType,
row::{Datum, Decimal, GenericRow,
compacted::compacted_row_writer::CompactedRowWriter},
util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
@@ -49,7 +50,7 @@ impl<'a> CompactedRowDeserializer<'a> {
self.row_type.as_ref()
}
- pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
GenericRow<'a> {
+ pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
Result<GenericRow<'a>> {
let mut row = GenericRow::new(self.row_type.fields().len());
let mut cursor = reader.initial_position();
for (col_pos, data_field) in self.row_type.fields().iter().enumerate()
{
@@ -60,41 +61,41 @@ impl<'a> CompactedRowDeserializer<'a> {
}
let (datum, next_cursor) = match dtype {
DataType::Boolean(_) => {
- let (val, next) = reader.read_boolean(cursor);
+ let (val, next) = reader.read_boolean(cursor)?;
(Datum::Bool(val), next)
}
DataType::TinyInt(_) => {
- let (val, next) = reader.read_byte(cursor);
+ let (val, next) = reader.read_byte(cursor)?;
(Datum::Int8(val as i8), next)
}
DataType::SmallInt(_) => {
- let (val, next) = reader.read_short(cursor);
+ let (val, next) = reader.read_short(cursor)?;
(Datum::Int16(val), next)
}
DataType::Int(_) => {
- let (val, next) = reader.read_int(cursor);
+ let (val, next) = reader.read_int(cursor)?;
(Datum::Int32(val), next)
}
DataType::BigInt(_) => {
- let (val, next) = reader.read_long(cursor);
+ let (val, next) = reader.read_long(cursor)?;
(Datum::Int64(val), next)
}
DataType::Float(_) => {
- let (val, next) = reader.read_float(cursor);
+ let (val, next) = reader.read_float(cursor)?;
(Datum::Float32(val.into()), next)
}
DataType::Double(_) => {
- let (val, next) = reader.read_double(cursor);
+ let (val, next) = reader.read_double(cursor)?;
(Datum::Float64(val.into()), next)
}
// TODO: use read_char(length) in the future, but need to keep
compatibility
DataType::Char(_) | DataType::String(_) => {
- let (val, next) = reader.read_string(cursor);
+ let (val, next) = reader.read_string(cursor)?;
(Datum::String(val.into()), next)
}
// TODO: use read_binary(length) in the future, but need to
keep compatibility
DataType::Bytes(_) | DataType::Binary(_) => {
- let (val, next) = reader.read_bytes(cursor);
+ let (val, next) = reader.read_bytes(cursor)?;
(Datum::Blob(val.into()), next)
}
DataType::Decimal(decimal_type) => {
@@ -102,42 +103,57 @@ impl<'a> CompactedRowDeserializer<'a> {
let scale = decimal_type.scale();
if Decimal::is_compact_precision(precision) {
// Compact: stored as i64
- let (val, next) = reader.read_long(cursor);
- let decimal = Decimal::from_unscaled_long(val,
precision, scale)
- .expect("Failed to create decimal from unscaled
long");
+ let (val, next) = reader.read_long(cursor)?;
+ let decimal =
+ Decimal::from_unscaled_long(val, precision,
scale).map_err(|e| {
+ IllegalArgument {
+ message: format!(
+ "Failed to create decimal from
unscaled long: {e}"
+ ),
+ }
+ })?;
(Datum::Decimal(decimal), next)
} else {
// Non-compact: stored as minimal big-endian bytes
- let (bytes, next) = reader.read_bytes(cursor);
+ let (bytes, next) = reader.read_bytes(cursor)?;
let decimal = Decimal::from_unscaled_bytes(bytes,
precision, scale)
- .expect("Failed to create decimal from unscaled
bytes");
+ .map_err(|e| IllegalArgument {
+ message: format!(
+ "Failed to create decimal from unscaled
bytes: {e}"
+ ),
+ })?;
(Datum::Decimal(decimal), next)
}
}
DataType::Date(_) => {
- let (val, next) = reader.read_int(cursor);
+ let (val, next) = reader.read_int(cursor)?;
(Datum::Date(crate::row::datum::Date::new(val)), next)
}
DataType::Time(_) => {
- let (val, next) = reader.read_int(cursor);
+ let (val, next) = reader.read_int(cursor)?;
(Datum::Time(crate::row::datum::Time::new(val)), next)
}
DataType::Timestamp(timestamp_type) => {
let precision = timestamp_type.precision();
if crate::row::datum::TimestampNtz::is_compact(precision) {
// Compact: only milliseconds
- let (millis, next) = reader.read_long(cursor);
+ let (millis, next) = reader.read_long(cursor)?;
(
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(millis)),
next,
)
} else {
// Non-compact: milliseconds + nanos
- let (millis, mid) = reader.read_long(cursor);
- let (nanos, next) = reader.read_int(mid);
- let timestamp =
-
crate::row::datum::TimestampNtz::from_millis_nanos(millis, nanos)
- .expect("Invalid nano_of_millisecond value in
compacted row");
+ let (millis, mid) = reader.read_long(cursor)?;
+ let (nanos, next) = reader.read_int(mid)?;
+ let timestamp =
crate::row::datum::TimestampNtz::from_millis_nanos(
+ millis, nanos,
+ )
+ .map_err(|e| IllegalArgument {
+ message: format!(
+ "Invalid nano_of_millisecond value in
compacted row timestamp: {e}"
+ ),
+ })?;
(Datum::TimestampNtz(timestamp), next)
}
}
@@ -145,29 +161,44 @@ impl<'a> CompactedRowDeserializer<'a> {
let precision = timestamp_ltz_type.precision();
if crate::row::datum::TimestampLtz::is_compact(precision) {
// Compact: only epoch milliseconds
- let (epoch_millis, next) = reader.read_long(cursor);
+ let (epoch_millis, next) = reader.read_long(cursor)?;
(
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(epoch_millis)),
next,
)
} else {
// Non-compact: epoch milliseconds + nanos
- let (epoch_millis, mid) = reader.read_long(cursor);
- let (nanos, next) = reader.read_int(mid);
- let timestamp_ltz =
-
crate::row::datum::TimestampLtz::from_millis_nanos(epoch_millis, nanos)
- .expect("Invalid nano_of_millisecond value in
compacted row");
+ let (epoch_millis, mid) = reader.read_long(cursor)?;
+ let (nanos, next) = reader.read_int(mid)?;
+ let timestamp_ltz =
crate::row::datum::TimestampLtz::from_millis_nanos(
+ epoch_millis,
+ nanos,
+ )
+ .map_err(|e| IllegalArgument {
+ message: format!(
+ "Invalid nano_of_millisecond value in
compacted row timestamp_ltz: {e}"
+ ),
+ })?;
(Datum::TimestampLtz(timestamp_ltz), next)
}
}
+ DataType::Array(_) => {
+ let (bytes, next) = reader.read_bytes(cursor)?;
+ let array =
crate::row::binary_array::FlussArray::from_bytes(bytes)?;
+ (Datum::Array(array), next)
+ }
_ => {
- panic!("Unsupported DataType in CompactedRowDeserializer:
{dtype:?}");
+ return Err(IllegalArgument {
+ message: format!(
+ "Unsupported DataType in CompactedRowDeserializer:
{dtype:?}"
+ ),
+ });
}
};
cursor = next_cursor;
row.set_field(col_pos, datum);
}
- row
+ Ok(row)
}
}
@@ -202,6 +233,21 @@ impl<'a> CompactedRowReader<'a> {
self.offset + self.header_size_in_bytes
}
+ fn checked_pos(&self, pos: usize, width: usize, context: &str) ->
Result<usize> {
+ let next = pos.checked_add(width).ok_or_else(|| IllegalArgument {
+ message: format!("Overflow while reading {context}: pos={pos},
width={width}"),
+ })?;
+ if next > self.limit {
+ return Err(IllegalArgument {
+ message: format!(
+ "Out-of-bounds while reading {context}: pos={pos},
width={width}, limit={}",
+ self.limit
+ ),
+ });
+ }
+ Ok(next)
+ }
+
pub fn is_null_at(&self, col_pos: usize) -> bool {
let byte_index = col_pos >> 3;
let bit = col_pos & 7;
@@ -210,79 +256,73 @@ impl<'a> CompactedRowReader<'a> {
(self.segment[idx] & (1u8 << bit)) != 0
}
- pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
- let (val, next) = self.read_byte(pos);
- (val != 0, next)
+ pub fn read_boolean(&self, pos: usize) -> Result<(bool, usize)> {
+ let (val, next) = self.read_byte(pos)?;
+ Ok((val != 0, next))
}
- pub fn read_byte(&self, pos: usize) -> (u8, usize) {
- debug_assert!(pos < self.limit);
- (self.segment[pos], pos + 1)
+ pub fn read_byte(&self, pos: usize) -> Result<(u8, usize)> {
+ let next = self.checked_pos(pos, 1, "byte")?;
+ Ok((self.segment[pos], next))
}
- pub fn read_short(&self, pos: usize) -> (i16, usize) {
- let next_pos = pos + 2;
- debug_assert!(next_pos <= self.limit);
- let bytes_slice = &self.segment[pos..pos + 2];
- let val = i16::from_ne_bytes(
- bytes_slice
- .try_into()
- .expect("Slice must be exactly 2 bytes long"),
- );
- (val, next_pos)
+ pub fn read_short(&self, pos: usize) -> Result<(i16, usize)> {
+ let next_pos = self.checked_pos(pos, 2, "short")?;
+ let mut arr = [0u8; 2];
+ arr.copy_from_slice(&self.segment[pos..next_pos]);
+ Ok((i16::from_ne_bytes(arr), next_pos))
}
- pub fn read_int(&self, pos: usize) -> (i32, usize) {
+ pub fn read_int(&self, pos: usize) -> Result<(i32, usize)> {
match read_unsigned_varint_at(self.segment, pos,
CompactedRowWriter::MAX_INT_SIZE) {
- Ok((value, next_pos)) => (value as i32, next_pos),
- Err(_) => panic!("Invalid VarInt32 input stream."),
+ Ok((value, next_pos)) => Ok((value as i32, next_pos)),
+ Err(e) => Err(IllegalArgument {
+ message: format!("Invalid VarInt32 input stream at pos {pos}:
{e}"),
+ }),
}
}
- pub fn read_long(&self, pos: usize) -> (i64, usize) {
+ pub fn read_long(&self, pos: usize) -> Result<(i64, usize)> {
match read_unsigned_varint_u64_at(self.segment, pos,
CompactedRowWriter::MAX_LONG_SIZE) {
- Ok((value, next_pos)) => (value as i64, next_pos),
- Err(_) => panic!("Invalid VarInt64 input stream."),
+ Ok((value, next_pos)) => Ok((value as i64, next_pos)),
+ Err(e) => Err(IllegalArgument {
+ message: format!("Invalid VarInt64 input stream at pos {pos}:
{e}"),
+ }),
}
}
- pub fn read_float(&self, pos: usize) -> (f32, usize) {
- let next_pos = pos + 4;
- debug_assert!(next_pos <= self.limit);
- let val = f32::from_ne_bytes(
- self.segment[pos..pos + 4]
- .try_into()
- .expect("Slice must be exactly 4 bytes long"),
- );
- (val, next_pos)
+ pub fn read_float(&self, pos: usize) -> Result<(f32, usize)> {
+ let next_pos = self.checked_pos(pos, 4, "float")?;
+ let mut arr = [0u8; 4];
+ arr.copy_from_slice(&self.segment[pos..next_pos]);
+ Ok((f32::from_ne_bytes(arr), next_pos))
}
- pub fn read_double(&self, pos: usize) -> (f64, usize) {
- let next_pos = pos + 8;
- debug_assert!(next_pos <= self.limit);
- let val = f64::from_ne_bytes(
- self.segment[pos..pos + 8]
- .try_into()
- .expect("Slice must be exactly 8 bytes long"),
- );
- (val, next_pos)
+ pub fn read_double(&self, pos: usize) -> Result<(f64, usize)> {
+ let next_pos = self.checked_pos(pos, 8, "double")?;
+ let mut arr = [0u8; 8];
+ arr.copy_from_slice(&self.segment[pos..next_pos]);
+ Ok((f64::from_ne_bytes(arr), next_pos))
}
- pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
+ pub fn read_binary(&self, pos: usize) -> Result<(&'a [u8], usize)> {
self.read_bytes(pos)
}
- pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
- let (len, data_pos) = self.read_int(pos);
- let len = len as usize;
- let next_pos = data_pos + len;
- debug_assert!(next_pos <= self.limit);
- (&self.segment[data_pos..next_pos], next_pos)
+ pub fn read_bytes(&self, pos: usize) -> Result<(&'a [u8], usize)> {
+ let (len, data_pos) = self.read_int(pos)?;
+ let len = usize::try_from(len).map_err(|_| IllegalArgument {
+ message: format!("Negative length while reading bytes at pos
{pos}: {len}"),
+ })?;
+ let next_pos = self.checked_pos(data_pos, len, "bytes payload")?;
+ Ok((&self.segment[data_pos..next_pos], next_pos))
}
- pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
- let (bytes, next_pos) = self.read_bytes(pos);
- let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
- (s, next_pos)
+ pub fn read_string(&self, pos: usize) -> Result<(&'a str, usize)> {
+ let (bytes, next_pos) = self.read_bytes(pos)?;
+ let s = from_utf8(bytes).map_err(|e| IllegalArgument {
+ message: format!("Invalid UTF-8 when reading string at pos {pos}:
{e}"),
+ })?;
+ Ok((s, next_pos))
}
}
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index ac0100e..3627174 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -165,6 +165,10 @@ impl BinaryWriter for CompactedRowWriter {
self.write_bytes(&bytes[..length.min(bytes.len())])
}
+ fn write_array(&mut self, value: &[u8]) {
+ self.write_bytes(value)
+ }
+
fn complete(&mut self) {
// do nothing
}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 9b2e80a..2f1d183 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -18,12 +18,14 @@
use crate::error::Error::RowConvertError;
use crate::error::Result;
use crate::row::Decimal;
+use crate::row::binary_array::FlussArray;
use arrow::array::{
ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
- Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder,
- Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder,
- TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder,
+ Int32Builder, Int64Builder, ListBuilder, StringBuilder,
Time32MillisecondBuilder,
+ Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
+ TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
TimestampNanosecondBuilder,
+ TimestampSecondBuilder,
};
use arrow::datatypes as arrow_schema;
use arrow::error::ArrowError;
@@ -68,6 +70,8 @@ pub enum Datum<'a> {
TimestampNtz(TimestampNtz),
#[display("{0}")]
TimestampLtz(TimestampLtz),
+ #[display("{0}")]
+ Array(FlussArray),
}
impl Datum<'_> {
@@ -123,6 +127,13 @@ impl Datum<'_> {
_ => panic!("not a timestamp ltz: {self:?}"),
}
}
+
+ pub fn as_array(&self) -> &FlussArray {
+ match self {
+ Self::Array(a) => a,
+ _ => panic!("not an array: {self:?}"),
+ }
+ }
}
// ----------- implement from
@@ -388,6 +399,13 @@ impl<'a> From<TimestampLtz> for Datum<'a> {
}
}
+impl<'a> From<FlussArray> for Datum<'a> {
+ #[inline]
+ fn from(arr: FlussArray) -> Datum<'a> {
+ Datum::Array(arr)
+ }
+}
+
pub trait ToArrow {
fn append_to(
&self,
@@ -494,6 +512,89 @@ impl AppendResult for std::result::Result<(), ArrowError> {
}
}
+fn append_fluss_array_to_list_builder(
+ arr: &FlussArray,
+ builder: &mut dyn ArrayBuilder,
+ data_type: &arrow_schema::DataType,
+) -> Result<()> {
+ use crate::record::from_arrow_type;
+
+ let list_builder = builder
+ .as_any_mut()
+ .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
+ .ok_or_else(|| RowConvertError {
+ message: "Builder type mismatch for Array: expected
ListBuilder".to_string(),
+ })?;
+
+ let element_arrow_type = match data_type {
+ arrow_schema::DataType::List(field) => field.data_type().clone(),
+ _ => {
+ return Err(RowConvertError {
+ message: format!("Expected List Arrow type for Array datum,
got: {data_type:?}"),
+ });
+ }
+ };
+
+ let element_fluss_type = from_arrow_type(&element_arrow_type)?;
+ let values_builder = list_builder.values();
+
+ for i in 0..arr.size() {
+ if arr.is_null_at(i) {
+ // TODO: Datum::Null triggers a chain of downcast attempts in
append_to.
+ // For sparse arrays with many nulls, call append_null directly on
the
+ // typed inner builder to avoid the overhead.
+ let null_datum = Datum::Null;
+ null_datum.append_to(values_builder, &element_arrow_type)?;
+ } else {
+ let datum = read_datum_from_fluss_array(arr, i,
&element_fluss_type)?;
+ datum.append_to(values_builder, &element_arrow_type)?;
+ }
+ }
+ list_builder.append(true);
+ Ok(())
+}
+
+fn read_datum_from_fluss_array<'a>(
+ arr: &FlussArray,
+ pos: usize,
+ element_type: &crate::metadata::DataType,
+) -> Result<Datum<'a>> {
+ use crate::metadata::DataType;
+
+ Ok(match element_type {
+ DataType::Boolean(_) => Datum::Bool(arr.get_boolean(pos)?),
+ DataType::TinyInt(_) => Datum::Int8(arr.get_byte(pos)?),
+ DataType::SmallInt(_) => Datum::Int16(arr.get_short(pos)?),
+ DataType::Int(_) => Datum::Int32(arr.get_int(pos)?),
+ DataType::BigInt(_) => Datum::Int64(arr.get_long(pos)?),
+ DataType::Float(_) => Datum::Float32(arr.get_float(pos)?.into()),
+ DataType::Double(_) => Datum::Float64(arr.get_double(pos)?.into()),
+ DataType::Char(_) | DataType::String(_) => {
+ Datum::String(Cow::Owned(arr.get_string(pos)?.to_string()))
+ }
+ DataType::Binary(_) | DataType::Bytes(_) => {
+ Datum::Blob(Cow::Owned(arr.get_binary(pos)?.to_vec()))
+ }
+ DataType::Decimal(dt) => {
+ Datum::Decimal(arr.get_decimal(pos, dt.precision(), dt.scale())?)
+ }
+ DataType::Date(_) => Datum::Date(arr.get_date(pos)?),
+ DataType::Time(_) => Datum::Time(arr.get_time(pos)?),
+ DataType::Timestamp(t) =>
Datum::TimestampNtz(arr.get_timestamp_ntz(pos, t.precision())?),
+ DataType::TimestampLTz(t) => {
+ Datum::TimestampLtz(arr.get_timestamp_ltz(pos, t.precision())?)
+ }
+ DataType::Array(_) => Datum::Array(arr.get_array(pos)?),
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Unsupported element type for FlussArray → Arrow
conversion: {element_type:?}"
+ ),
+ });
+ }
+ })
+}
+
impl Datum<'_> {
pub fn append_to(
&self,
@@ -540,6 +641,18 @@ impl Datum<'_> {
append_null_to_arrow!(TimestampMillisecondBuilder);
append_null_to_arrow!(TimestampMicrosecondBuilder);
append_null_to_arrow!(TimestampNanosecondBuilder);
+ if let arrow_schema::DataType::List(_) = data_type {
+ let b = builder
+ .as_any_mut()
+ .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
+ .ok_or_else(|| RowConvertError {
+ message:
+ "Expected ListBuilder<Box<dyn ArrayBuilder>>
for List Arrow type"
+ .to_string(),
+ })?;
+ b.append_null();
+ return Ok(());
+ }
}
Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
@@ -742,6 +855,9 @@ impl Datum<'_> {
message: "Builder type mismatch for
TimestampLtz".to_string(),
});
}
+ Datum::Array(arr) => {
+ return append_fluss_array_to_list_builder(arr, builder,
data_type);
+ }
}
Err(RowConvertError {
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index d201450..877b3ec 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -64,8 +64,12 @@ impl CompactedKeyEncoder {
for pos in &encode_field_pos {
let data_type = row_type.fields().get(*pos).unwrap().data_type();
- field_getters.push(FieldGetter::create(data_type, *pos));
-
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
+ // Validate key type support first, so unsupported types return a
+ // typed error instead of panicking in FieldGetter::create.
+ let field_encoder =
CompactedKeyWriter::create_value_writer(data_type)?;
+ let field_getter = FieldGetter::create(data_type, *pos);
+ field_getters.push(field_getter);
+ field_encoders.push(field_encoder);
}
Ok(CompactedKeyEncoder {
@@ -82,18 +86,19 @@ impl KeyEncoder for CompactedKeyEncoder {
self.compacted_encoder.reset();
// iterate all the fields of the row, and encode each field
- for (pos, field_getter) in self.field_getters.iter().enumerate() {
+ for (pos, (field_getter, field_encoder)) in self
+ .field_getters
+ .iter()
+ .zip(self.field_encoders.iter())
+ .enumerate()
+ {
match &field_getter.get_field(row)? {
Datum::Null => {
return Err(IllegalArgument {
message: format!("Cannot encode key with null value at
position: {pos:?}"),
});
}
- value => self.field_encoders.get(pos).unwrap().write_value(
- &mut self.compacted_encoder,
- pos,
- value,
- )?,
+ value => field_encoder.write_value(&mut
self.compacted_encoder, pos, value)?,
}
}
@@ -105,8 +110,55 @@ impl KeyEncoder for CompactedKeyEncoder {
mod tests {
use super::*;
use crate::metadata::DataTypes;
+ use crate::row::binary_array::FlussArrayWriter;
use crate::row::{Datum, GenericRow};
+ fn build_int_array(values: &[i32]) -> crate::row::FlussArray {
+ let mut w = FlussArrayWriter::new(values.len(), &DataTypes::int());
+ for (i, v) in values.iter().enumerate() {
+ w.write_int(i, *v);
+ }
+ w.complete().unwrap()
+ }
+
+ fn build_nullable_int_array(values: &[Option<i32>]) ->
crate::row::FlussArray {
+ let mut w = FlussArrayWriter::new(values.len(), &DataTypes::int());
+ for (i, v) in values.iter().enumerate() {
+ match v {
+ Some(value) => w.write_int(i, *value),
+ None => w.set_null_at(i),
+ }
+ }
+ w.complete().unwrap()
+ }
+
+ fn build_float_array(values: &[f32]) -> crate::row::FlussArray {
+ let mut w = FlussArrayWriter::new(values.len(),
&DataTypes::float().as_non_nullable());
+ for (i, v) in values.iter().enumerate() {
+ w.write_float(i, *v);
+ }
+ w.complete().unwrap()
+ }
+
+ fn build_nested_string_array() -> crate::row::FlussArray {
+ let mut inner_1 = FlussArrayWriter::new(3, &DataTypes::string());
+ inner_1.write_string(0, "a");
+ inner_1.set_null_at(1);
+ inner_1.write_string(2, "c");
+ let inner_1 = inner_1.complete().unwrap();
+
+ let mut inner_2 = FlussArrayWriter::new(2, &DataTypes::string());
+ inner_2.write_string(0, "hello");
+ inner_2.write_string(1, "world");
+ let inner_2 = inner_2.complete().unwrap();
+
+ let mut outer = FlussArrayWriter::new(3,
&DataTypes::array(DataTypes::string()));
+ outer.write_array(0, &inner_1);
+ outer.set_null_at(1);
+ outer.write_array(2, &inner_2);
+ outer.complete().unwrap()
+ }
+
pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder {
CompactedKeyEncoder::new(row_type,
(0..row_type.fields().len()).collect())
.expect("CompactedKeyEncoder initialization failed")
@@ -237,6 +289,51 @@ mod tests {
);
}
+ #[test]
+ fn test_array_type_allowed_as_key() {
+ // Java's CompactedKeyEncoder allows Array as a key column type
+ // (the server rejects unsupported key types at table-creation time).
+ let row_type =
+ RowType::with_data_types(vec![DataTypes::int(),
DataTypes::array(DataTypes::int())]);
+ let mut encoder = CompactedKeyEncoder::new(&row_type, vec![0,
1]).unwrap();
+
+ let row_a = GenericRow::from_data(vec![
+ Datum::Int32(42),
+ Datum::Array(build_int_array(&[10, 20])),
+ ]);
+ let row_b = GenericRow::from_data(vec![
+ Datum::Int32(42),
+ Datum::Array(build_int_array(&[10, 30])),
+ ]);
+
+ let encoded_a = encoder.encode_key(&row_a).unwrap();
+ let encoded_b = encoder.encode_key(&row_b).unwrap();
+
+ assert!(!encoded_a.is_empty());
+ assert_ne!(
+ encoded_a.iter().as_slice(),
+ encoded_b.iter().as_slice(),
+ "Array key payload should affect compacted key encoding"
+ );
+ }
+
+ #[test]
+ fn test_map_type_rejected_as_key() {
+ let row_type = RowType::with_data_types(vec![
+ DataTypes::int(),
+ DataTypes::map(DataTypes::int(), DataTypes::string()),
+ ]);
+ match CompactedKeyEncoder::new(&row_type, vec![0, 1]) {
+ Ok(_) => panic!("Expected error when using Map as key type"),
+ Err(err) => {
+ assert!(
+ err.to_string().contains("Cannot use"),
+ "Expected 'Cannot use' error, got: {err}"
+ );
+ }
+ }
+ }
+
#[test]
fn test_all_data_types_java_compatible() {
// Test encoding compatibility with Java using reference from:
@@ -263,9 +360,11 @@ mod tests {
DataType::Timestamp(TimestampType::with_nullable(false,
5).unwrap()), // TIMESTAMP(5)
DataType::TimestampLTz(TimestampLTzType::with_nullable(false,
1).unwrap()), // TIMESTAMP_LTZ(1)
DataType::TimestampLTz(TimestampLTzType::with_nullable(false,
5).unwrap()), // TIMESTAMP_LTZ(5)
-
// TODO: Add support for ARRAY type
-
// TODO: Add support for MAP type
-
// TODO: Add support for ROW type
+ DataTypes::array(DataTypes::int()), // ARRAY<INT>
+ DataTypes::array(DataTypes::float().as_non_nullable()), //
ARRAY<FLOAT NOT NULL>
+ DataTypes::array(DataTypes::array(DataTypes::string())), //
ARRAY<ARRAY<STRING>>
+ // TODO: Add support for MAP
type
+ // TODO: Add support for ROW
type
]);
// Exact values from Java's IndexedRowTest.genRecordForAllTypes()
@@ -296,6 +395,26 @@ mod tests {
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), //
TIMESTAMP(5)
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), //
TIMESTAMP_LTZ(1)
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), //
TIMESTAMP_LTZ(5)
+ Datum::Array(build_nullable_int_array(&[
+ Some(1),
+ Some(2),
+ Some(3),
+ Some(4),
+ Some(5),
+ Some(-11),
+ None,
+ Some(444),
+ Some(102234),
+ ])), // ARRAY<INT>: GenericArray.of(1, 2, 3, 4, 5, -11, null, 444,
102234)
+ Datum::Array(build_float_array(&[
+ 0.1_f32,
+ 1.1_f32,
+ -0.5_f32,
+ 6.6_f32,
+ f32::MAX,
+ f32::from_bits(1),
+ ])), // ARRAY<FLOAT NOT NULL>: GenericArray.of(0.1f, 1.1f, -0.5f,
6.6f, MAX, MIN)
+ Datum::Array(build_nested_string_array()), // ARRAY<ARRAY<STRING>>
]);
// Expected bytes from Java's encoded_key.hex reference file
@@ -339,6 +458,25 @@ mod tests {
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
// TIMESTAMP_LTZ(5): 1698235273182
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
+ // ARRAY<INT>: GenericArray.of(1, 2, 3, 4, 5, -11, null, 444,
102234)
+ 0x30, 0x09, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00,
+ 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00,
+ 0x00, 0x05, 0x00, 0x00, 0x00, 0xF5, 0xFF, 0xFF, 0xFF, 0x00, 0x00,
0x00,
+ 0x00, 0xBC, 0x01, 0x00, 0x00, 0x5A, 0x8F, 0x01, 0x00, 0x00, 0x00,
0x00,
+ 0x00,
+ // ARRAY<FLOAT NOT NULL>: GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f,
MAX, MIN)
+ 0x20, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xCD, 0xCC,
0xCC,
+ 0x3D, 0xCD, 0xCC, 0x8C, 0x3F, 0x00, 0x00, 0x00, 0xBF, 0x33, 0x33,
0xD3,
+ 0x40, 0xFF, 0xFF, 0x7F, 0x7F, 0x01, 0x00, 0x00, 0x00,
+ // ARRAY<ARRAY<STRING>>
+ 0x58, 0x03, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x20, 0x00,
0x00,
+ 0x00, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
+ 0x00, 0x18, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00,
+ 0x00, 0x02, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
+ 0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x00,
0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x81, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
+ 0x00, 0x68, 0x65, 0x6C, 0x6C, 0x6F, 0x00, 0x00, 0x85, 0x77, 0x6F,
0x72,
+ 0x6C, 0x64, 0x00, 0x00, 0x85,
];
let mut encoder = for_test_row_type(&row_type);
diff --git a/crates/fluss/src/row/field_getter.rs
b/crates/fluss/src/row/field_getter.rs
index d6b9fc9..69e0860 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -82,6 +82,8 @@ impl FieldGetter {
pos,
precision: t.precision(),
},
+ // TODO: add Map and Row variants when get_map/get_row are
available in InternalRow.
+ DataType::Array(_) => InnerFieldGetter::Array { pos },
_ => unimplemented!("DataType {:?} is currently unimplemented",
data_type),
};
@@ -149,6 +151,9 @@ pub enum InnerFieldGetter {
pos: usize,
precision: u32,
},
+ Array {
+ pos: usize,
+ },
}
impl InnerFieldGetter {
@@ -177,7 +182,9 @@ impl InnerFieldGetter {
}
InnerFieldGetter::TimestampLtz { pos, precision } => {
Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?)
- } //TODO Array, Map, Row
+ }
+ // TODO: add Map and Row field getter support once their binary
forms are implemented.
+ InnerFieldGetter::Array { pos } =>
Datum::Array(row.get_array(*pos)?),
})
}
@@ -198,7 +205,51 @@ impl InnerFieldGetter {
| Self::Date { pos }
| Self::Time { pos }
| Self::Timestamp { pos, .. }
- | Self::TimestampLtz { pos, .. } => *pos,
+ | Self::TimestampLtz { pos, .. }
+ | Self::Array { pos } => *pos,
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+ use crate::row::GenericRow;
+ use crate::row::binary_array::FlussArrayWriter;
+
+ #[test]
+ fn test_field_getter_array() {
+ let elem_type = DataTypes::int();
+ let mut arr_writer = FlussArrayWriter::new(2, &elem_type);
+ arr_writer.write_int(0, 10);
+ arr_writer.write_int(1, 20);
+ let arr = arr_writer.complete().unwrap();
+
+ let mut row = GenericRow::new(2);
+ row.set_field(0, Datum::Int32(42));
+ row.set_field(1, Datum::Array(arr.clone()));
+
+ let getter = FieldGetter::create(&DataTypes::array(DataTypes::int()),
1);
+ let datum = getter.get_field(&row).unwrap();
+
+ match datum {
+ Datum::Array(a) => {
+ assert_eq!(a.size(), 2);
+ assert_eq!(a.get_int(0).unwrap(), 10);
+ assert_eq!(a.get_int(1).unwrap(), 20);
+ }
+ _ => panic!("Expected Array datum"),
+ }
+ }
+
+ #[test]
+ fn test_field_getter_nullable_array() {
+ let row = GenericRow::from_data(vec![Datum::Null]);
+
+ let data_type = DataTypes::array(DataTypes::int());
+ let getter = FieldGetter::create(&data_type, 0);
+ let datum = getter.get_field(&row).unwrap();
+ assert!(datum.is_null());
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index ef99ba2..359a9a5 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub mod binary_array;
mod column;
pub(crate) mod datum;
@@ -28,6 +29,7 @@ pub mod field_getter;
mod row_decoder;
use crate::client::WriteFormat;
+pub use binary_array::FlussArray;
use bytes::Bytes;
pub use column::*;
pub use compacted::CompactedRow;
@@ -119,6 +121,9 @@ pub trait InternalRow: Send + Sync {
/// Returns the binary value at the given position
fn get_bytes(&self, pos: usize) -> Result<&[u8]>;
+ /// Returns the array value at the given position
+ fn get_array(&self, pos: usize) -> Result<FlussArray>;
+
/// Returns encoded bytes if already encoded
fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
None
@@ -274,6 +279,15 @@ impl<'a> InternalRow for GenericRow<'a> {
}),
}
}
+
+ fn get_array(&self, pos: usize) -> Result<FlussArray> {
+ match self.get_value(pos)? {
+ Datum::Array(a) => Ok(a.clone()),
+ other => Err(IllegalArgument {
+ message: format!("type mismatch at position {pos}: expected
Array, got {other:?}"),
+ }),
+ }
+ }
}
impl<'a> GenericRow<'a> {
diff --git a/website/docs/user-guide/rust/api-reference.md
b/website/docs/user-guide/rust/api-reference.md
index a4befa5..15a62c1 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -407,6 +407,19 @@ Implements the `InternalRow` trait (see below).
| `fn get_bytes(&self, idx: usize) -> Result<&[u8]>`
| Get bytes value |
| `fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]>`
| Get fixed-length binary value |
| `fn get_char(&self, idx: usize, length: usize) -> Result<&str>`
| Get fixed-length char value |
+| `fn get_array(&self, idx: usize) -> Result<FlussArray>`
| Get array value |
+
+## `FlussArray`
+
+`FlussArray` is the Rust row representation for `ARRAY` values. You usually
obtain it from `InternalRow::get_array()`.
+
+| Method | Description |
+|--------|-------------|
+| `fn size(&self) -> usize` | Number of elements in the array |
+| `fn is_null_at(&self, pos: usize) -> bool` | Check whether an element is
null |
+| `fn as_bytes(&self) -> &[u8]` | Get encoded bytes of the array |
+
+Element getters mirror `InternalRow` typed getters and return `Result<T>`. For
example, use `get_int()`, `get_long()`, and `get_double()` for primitive
elements, and `get_string()`, `get_binary()`, `get_decimal()`,
`get_timestamp_ntz()`, `get_timestamp_ltz()`, and `get_array()` for
variable-length or nested elements.
## `ChangeType`
diff --git a/website/docs/user-guide/rust/data-types.md
b/website/docs/user-guide/rust/data-types.md
index 143fe34..63b7fa6 100644
--- a/website/docs/user-guide/rust/data-types.md
+++ b/website/docs/user-guide/rust/data-types.md
@@ -21,6 +21,7 @@ sidebar_position: 3
| `TIMESTAMP_LTZ` | `TimestampLtz` | `get_timestamp_ltz(idx, precision)` |
`set_field(idx, TimestampLtz)` |
| `BYTES` | `&[u8]` | `get_bytes()` |
`set_field(idx, &[u8])` |
| `BINARY(n)` | `&[u8]` | `get_binary(idx, length)` |
`set_field(idx, &[u8])` |
+| `ARRAY<T>` | `FlussArray` | `get_array()` |
`set_field(idx, FlussArray)` |
## Constructing Special Types
@@ -59,6 +60,29 @@ let data: Vec<Datum> = vec![1i32.into(), "hello".into(),
Datum::Null];
let row = GenericRow::from_data(data);
```
+## Arrays
+
+Use `DataTypes::array(element_type)` in schema definitions. At runtime, read
arrays with `row.get_array(idx)?`.
+
+To construct array values for writes, build a `FlussArray` and wrap it with
`Datum::Array`:
+
+```rust
+use fluss::metadata::DataTypes;
+use fluss::row::binary_array::FlussArrayWriter;
+use fluss::row::{Datum, GenericRow};
+
+let mut writer = FlussArrayWriter::new(3, &DataTypes::int());
+writer.write_int(0, 10);
+writer.write_int(1, 20);
+writer.set_null_at(2);
+let arr = writer.complete()?;
+
+let mut row = GenericRow::new(1);
+row.set_field(0, Datum::Array(arr));
+```
+
+`ARRAY` is supported for row values and nested row fields. For key encoding,
Rust follows Java parity: `ARRAY` can be encoded by the compacted key encoder,
while table-level key constraints are validated by the server (which may reject
unsupported key types).
+
## Reading Row Data
```rust