This is an automated email from the ASF dual-hosted git repository.

leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f0e17a4  feat: Add array data type support (#433)
f0e17a4 is described below

commit f0e17a4ab4d5c6f65d980043762c57d66eb57f90
Author: Kaiqi Dong <[email protected]>
AuthorDate: Sat Mar 28 21:20:24 2026 +0100

    feat: Add array data type support (#433)
    
    * add array data type support
    
    * add docs
    
    * address comments
    
    * avoid intermediate record batch in get-array
    
    * Address comments
    
    * address review comments
    
    * address comments and improve doc
    
    * address comments
    
    * add todo
    
    * remove todo from reference.md to code
---
 bindings/cpp/src/types.rs                          |   6 +
 crates/fluss/src/record/arrow.rs                   |  65 ++
 crates/fluss/src/row/binary/binary_writer.rs       |  10 +-
 crates/fluss/src/row/binary_array.rs               | 848 +++++++++++++++++++++
 crates/fluss/src/row/column.rs                     | 463 ++++++++++-
 .../src/row/compacted/compacted_key_writer.rs      |  11 +
 crates/fluss/src/row/compacted/compacted_row.rs    | 202 ++++-
 .../src/row/compacted/compacted_row_reader.rs      | 204 +++--
 .../src/row/compacted/compacted_row_writer.rs      |   4 +
 crates/fluss/src/row/datum.rs                      | 122 ++-
 .../fluss/src/row/encode/compacted_key_encoder.rs  | 160 +++-
 crates/fluss/src/row/field_getter.rs               |  55 +-
 crates/fluss/src/row/mod.rs                        |  14 +
 website/docs/user-guide/rust/api-reference.md      |  13 +
 website/docs/user-guide/rust/data-types.md         |  24 +
 15 files changed, 2078 insertions(+), 123 deletions(-)

diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 3c0c6f7..f7aabe9 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -351,6 +351,9 @@ pub fn resolve_row_types(
             Datum::Time(t) => Datum::Time(*t),
             Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
             Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
+            // TODO: C++ bindings need proper CXX wrapper types for FlussArray
+            // before C++ users can construct or inspect array values through 
FFI.
+            Datum::Array(a) => Datum::Array(a.clone()),
         };
         out.set_field(idx, resolved);
     }
@@ -408,6 +411,9 @@ pub fn compacted_row_to_owned(
             fcore::metadata::DataType::Binary(dt) => {
                 Datum::Blob(Cow::Owned(row.get_binary(i, 
dt.length())?.to_vec()))
             }
+            // TODO: C++ bindings need proper CXX wrapper types for FlussArray
+            // before C++ users can construct or inspect array values through 
FFI.
+            fcore::metadata::DataType::Array(_) => 
Datum::Array(row.get_array(i)?),
             other => return Err(anyhow!("Unsupported data type for column {i}: 
{other:?}")),
         };
 
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index d8ba6d9..7dd745b 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1091,6 +1091,71 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
Result<ArrowDataType> {
     })
 }
 
+/// Converts an Arrow data type back to a Fluss `DataType`.
+/// Used for reading array elements from Arrow ListArray back into Fluss types.
+pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
+    use crate::metadata::DataTypes;
+
+    Ok(match arrow_type {
+        ArrowDataType::Boolean => DataTypes::boolean(),
+        ArrowDataType::Int8 => DataTypes::tinyint(),
+        ArrowDataType::Int16 => DataTypes::smallint(),
+        ArrowDataType::Int32 => DataTypes::int(),
+        ArrowDataType::Int64 => DataTypes::bigint(),
+        ArrowDataType::Float32 => DataTypes::float(),
+        ArrowDataType::Float64 => DataTypes::double(),
+        ArrowDataType::Utf8 => DataTypes::string(),
+        ArrowDataType::Binary => DataTypes::bytes(),
+        ArrowDataType::Date32 => DataTypes::date(),
+        ArrowDataType::FixedSizeBinary(len) => {
+            if *len < 0 {
+                return Err(Error::IllegalArgument {
+                    message: format!("FixedSizeBinary length must be >= 0, got 
{len}"),
+                });
+            }
+            DataTypes::binary(*len as usize)
+        }
+        ArrowDataType::Decimal128(p, s) => {
+            if *s < 0 {
+                return Err(Error::IllegalArgument {
+                    message: format!("Decimal scale must be >= 0, got {s}"),
+                });
+            }
+            DataTypes::decimal(*p as u32, *s as u32)
+        }
+        ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => 
DataTypes::time_with_precision(0),
+        ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+            DataTypes::time_with_precision(3)
+        }
+        ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+            DataTypes::time_with_precision(6)
+        }
+        ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+            DataTypes::time_with_precision(9)
+        }
+        ArrowDataType::Timestamp(unit, tz) => {
+            let precision = match unit {
+                arrow_schema::TimeUnit::Second => 0,
+                arrow_schema::TimeUnit::Millisecond => 3,
+                arrow_schema::TimeUnit::Microsecond => 6,
+                arrow_schema::TimeUnit::Nanosecond => 9,
+            };
+
+            if tz.is_some() {
+                DataTypes::timestamp_ltz_with_precision(precision)
+            } else {
+                DataTypes::timestamp_with_precision(precision)
+            }
+        }
+        ArrowDataType::List(field) => 
DataTypes::array(from_arrow_type(field.data_type())?),
+        other => {
+            return Err(Error::IllegalArgument {
+                message: format!("Cannot convert Arrow type to Fluss type: 
{other:?}"),
+            });
+        }
+    })
+}
+
 #[derive(Clone)]
 pub struct ReadContext {
     target_schema: SchemaRef,
diff --git a/crates/fluss/src/row/binary/binary_writer.rs 
b/crates/fluss/src/row/binary/binary_writer.rs
index af2765c..f51a6e8 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -67,8 +67,7 @@ pub trait BinaryWriter {
 
     fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, 
precision: u32);
 
-    // TODO InternalArray, ArraySerializer
-    // fn write_array(&mut self, pos: i32, value: i64);
+    fn write_array(&mut self, value: &[u8]);
 
     // TODO Row serializer
     // fn write_row(&mut self, pos: i32, value: &InternalRow);
@@ -136,7 +135,8 @@ pub enum InnerValueWriter {
     Time(u32),         // precision (not used in wire format, but kept for 
consistency)
     TimestampNtz(u32), // precision
     TimestampLtz(u32), // precision
-                       // TODO Array, Row
+    Array,
+    // TODO Row
 }
 
 /// Accessor for writing the fields/elements of a binary writer during 
runtime, the
@@ -175,6 +175,7 @@ impl InnerValueWriter {
                 // Validation is done at TimestampLTzType construction time
                 Ok(InnerValueWriter::TimestampLtz(t.precision()))
             }
+            DataType::Array(_) => Ok(InnerValueWriter::Array),
             _ => unimplemented!(
                 "ValueWriter for DataType {:?} is currently not implemented",
                 data_type
@@ -237,6 +238,9 @@ impl InnerValueWriter {
             (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
                 writer.write_timestamp_ltz(ts, *p);
             }
+            (InnerValueWriter::Array, Datum::Array(arr)) => {
+                writer.write_array(arr.as_bytes());
+            }
             _ => {
                 return Err(IllegalArgument {
                     message: format!("{self:?} used to write value {value:?}"),
diff --git a/crates/fluss/src/row/binary_array.rs 
b/crates/fluss/src/row/binary_array.rs
new file mode 100644
index 0000000..9008bc5
--- /dev/null
+++ b/crates/fluss/src/row/binary_array.rs
@@ -0,0 +1,848 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary array format matching Java's `BinaryArray.java` layout.
+//!
+//! Binary layout:
+//! ```text
+//! [size(4B)] + [null bits (4-byte word aligned)] + [fixed-length part] + 
[variable-length part]
+//! ```
+//!
+//! Java reference: `BinaryArray.java`, `BinaryArrayWriter.java`
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Decimal;
+use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
+use bytes::Bytes;
+use serde::Serialize;
+use std::fmt;
+use std::hash::{Hash, Hasher};
+
+const MAX_FIX_PART_DATA_SIZE: usize = 7;
+const HIGHEST_FIRST_BIT: u64 = 0x80_u64 << 56;
+const HIGHEST_SECOND_TO_EIGHTH_BIT: u64 = 0x7F_u64 << 56;
+
+/// Calculates the header size in bytes: 4 (for element count) + null bits 
(4-byte word aligned).
+/// Matches Java's `BinaryArray.calculateHeaderInBytes(numFields)`.
+pub fn calculate_header_in_bytes(num_elements: usize) -> usize {
+    4 + num_elements.div_ceil(32) * 4
+}
+
+/// Calculates the fixed-length part size per element for a given data type.
+/// Matches Java's `BinaryArray.calculateFixLengthPartSize(DataType)`.
+pub fn calculate_fix_length_part_size(element_type: &DataType) -> usize {
+    match element_type {
+        DataType::Boolean(_) | DataType::TinyInt(_) => 1,
+        DataType::SmallInt(_) => 2,
+        DataType::Int(_) | DataType::Float(_) | DataType::Date(_) | 
DataType::Time(_) => 4,
+        DataType::BigInt(_)
+        | DataType::Double(_)
+        | DataType::Char(_)
+        | DataType::String(_)
+        | DataType::Binary(_)
+        | DataType::Bytes(_)
+        | DataType::Decimal(_)
+        | DataType::Timestamp(_)
+        | DataType::TimestampLTz(_)
+        | DataType::Array(_)
+        | DataType::Map(_)
+        | DataType::Row(_) => 8,
+    }
+}
+
+/// Rounds a byte count up to the nearest 8-byte word boundary.
+/// Matches Java's `roundNumberOfBytesToNearestWord`.
+fn round_to_nearest_word(num_bytes: usize) -> usize {
+    (num_bytes + 7) & !7
+}
+
+/// A Fluss binary array, wire-compatible with Java's `BinaryArray`.
+///
+/// Stores elements in a flat byte buffer with a header (element count + null 
bitmap)
+/// followed by fixed-length slots and an optional variable-length section.
+///
+/// Uses `Bytes` internally so cloning is O(1) reference-counted.
+// TODO: FlussArray currently exposes only fallible getters. Infallible
+// fast-path variants may be added later as non-breaking extensions.
+#[derive(Clone)]
+pub struct FlussArray {
+    data: Bytes,
+    size: usize,
+    element_offset: usize,
+}
+
+impl fmt::Debug for FlussArray {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("FlussArray")
+            .field("size", &self.size)
+            .field("data_len", &self.data.len())
+            .finish()
+    }
+}
+
+impl fmt::Display for FlussArray {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlussArray[size={}]", self.size)
+    }
+}
+
+impl PartialEq for FlussArray {
+    fn eq(&self, other: &Self) -> bool {
+        self.data == other.data
+    }
+}
+
+impl Eq for FlussArray {}
+
+impl PartialOrd for FlussArray {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for FlussArray {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.data.cmp(&other.data)
+    }
+}
+
+impl Hash for FlussArray {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.data.hash(state);
+    }
+}
+
+impl Serialize for FlussArray {
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        serializer.serialize_bytes(&self.data)
+    }
+}
+
+impl FlussArray {
+    /// Validates the raw bytes and computes derived fields (size, 
element_offset).
+    fn validate(data: &[u8]) -> Result<(usize, usize)> {
+        if data.len() < 4 {
+            return Err(IllegalArgument {
+                message: format!(
+                    "FlussArray data too short: need at least 4 bytes, got {}",
+                    data.len()
+                ),
+            });
+        }
+        let raw_size = i32::from_le_bytes(data[0..4].try_into().unwrap());
+        if raw_size < 0 {
+            return Err(IllegalArgument {
+                message: format!("FlussArray size must be non-negative, got 
{raw_size}"),
+            });
+        }
+        let size = raw_size as usize;
+        let element_offset = calculate_header_in_bytes(size);
+        if element_offset > data.len() {
+            return Err(IllegalArgument {
+                message: format!(
+                    "FlussArray header exceeds payload: header={}, payload={}",
+                    element_offset,
+                    data.len()
+                ),
+            });
+        }
+        Ok((size, element_offset))
+    }
+
+    /// Creates a FlussArray from a byte slice (copies data).
+    pub fn from_bytes(data: &[u8]) -> Result<Self> {
+        let (size, element_offset) = Self::validate(data)?;
+        Ok(FlussArray {
+            data: Bytes::copy_from_slice(data),
+            size,
+            element_offset,
+        })
+    }
+
+    /// Creates a FlussArray from an owned `Vec<u8>` without copying.
+    pub fn from_vec(data: Vec<u8>) -> Result<Self> {
+        let (size, element_offset) = Self::validate(&data)?;
+        Ok(FlussArray {
+            data: Bytes::from(data),
+            size,
+            element_offset,
+        })
+    }
+
+    /// Creates a FlussArray from owned bytes without copying.
+    fn from_owned_bytes(data: Bytes) -> Result<Self> {
+        let (size, element_offset) = Self::validate(&data)?;
+        Ok(FlussArray {
+            data,
+            size,
+            element_offset,
+        })
+    }
+
+    /// Returns the number of elements.
+    pub fn size(&self) -> usize {
+        self.size
+    }
+
+    /// Returns the raw bytes of this array (the complete binary 
representation).
+    pub fn as_bytes(&self) -> &[u8] {
+        &self.data
+    }
+
+    /// Returns true if the element at position `pos` is null.
+    pub fn is_null_at(&self, pos: usize) -> bool {
+        let byte_index = pos >> 3;
+        let bit = pos & 7;
+        (self.data[4 + byte_index] & (1u8 << bit)) != 0
+    }
+
+    fn checked_slice(&self, start: usize, len: usize, context: &str) -> 
Result<&[u8]> {
+        let end = start.checked_add(len).ok_or_else(|| IllegalArgument {
+            message: format!("Overflow while reading {context}: start={start}, 
len={len}"),
+        })?;
+        if end > self.data.len() {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Out-of-bounds while reading {context}: start={start}, 
len={len}, payload={}",
+                    self.data.len()
+                ),
+            });
+        }
+        Ok(&self.data[start..end])
+    }
+
+    fn checked_element_offset(
+        &self,
+        pos: usize,
+        element_size: usize,
+        context: &str,
+    ) -> Result<usize> {
+        if pos >= self.size {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Array element index out of bounds while reading 
{context}: pos={pos}, size={}",
+                    self.size
+                ),
+            });
+        }
+        let rel = pos.checked_mul(element_size).ok_or_else(|| IllegalArgument {
+            message: format!(
+                "Overflow while calculating array element offset for 
{context}: pos={pos}, element_size={element_size}"
+            ),
+        })?;
+        self.element_offset
+            .checked_add(rel)
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "Overflow while adding base offset for {context}: base={}, 
rel={rel}",
+                    self.element_offset
+                ),
+            })
+    }
+
+    fn read_fixed_bytes(&self, pos: usize, len: usize, context: &str) -> 
Result<&[u8]> {
+        let offset = self.checked_element_offset(pos, len, context)?;
+        self.checked_slice(offset, len, context)
+    }
+
+    fn read_i16(&self, pos: usize, context: &str) -> Result<i16> {
+        let bytes = self.read_fixed_bytes(pos, 2, context)?;
+        Ok(i16::from_le_bytes([bytes[0], bytes[1]]))
+    }
+
+    fn read_i32(&self, pos: usize, context: &str) -> Result<i32> {
+        let bytes = self.read_fixed_bytes(pos, 4, context)?;
+        Ok(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
+    }
+
+    fn read_i64(&self, pos: usize, context: &str) -> Result<i64> {
+        let bytes = self.read_fixed_bytes(pos, 8, context)?;
+        let mut buf = [0_u8; 8];
+        buf.copy_from_slice(bytes);
+        Ok(i64::from_le_bytes(buf))
+    }
+
+    fn read_i64_at_offset(&self, offset: usize, context: &str) -> Result<i64> {
+        let bytes = self.checked_slice(offset, 8, context)?;
+        let mut buf = [0_u8; 8];
+        buf.copy_from_slice(bytes);
+        Ok(i64::from_le_bytes(buf))
+    }
+
+    fn read_var_len_span(&self, pos: usize) -> Result<(usize, usize)> {
+        let field_offset = self.checked_element_offset(pos, 8, 
"variable-length array element")?;
+        let packed = self.read_i64(pos, "variable-length array element")? as 
u64;
+        let mark = packed & HIGHEST_FIRST_BIT;
+
+        if mark == 0 {
+            let offset = (packed >> 32) as usize;
+            let len = (packed & 0xFFFF_FFFF) as usize;
+            let _ = self.checked_slice(offset, len, "variable-length array 
element")?;
+            Ok((offset, len))
+        } else {
+            let len = ((packed & HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56) as usize;
+            if len > MAX_FIX_PART_DATA_SIZE {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "Inline array element length must be <= 
{MAX_FIX_PART_DATA_SIZE}, got {len}"
+                    ),
+                });
+            }
+            // Java stores inline bytes in the 8-byte slot itself.
+            // On little-endian, bytes start at field_offset; on big-endian 
they start at +1.
+            let start = if cfg!(target_endian = "little") {
+                field_offset
+            } else {
+                field_offset + 1
+            };
+            let _ = self.checked_slice(start, len, "inline array element")?;
+            Ok((start, len))
+        }
+    }
+
+    fn read_var_len_bytes(&self, pos: usize) -> Result<&[u8]> {
+        let (start, len) = self.read_var_len_span(pos)?;
+        Ok(&self.data[start..start + len])
+    }
+
+    pub fn get_boolean(&self, pos: usize) -> Result<bool> {
+        let bytes = self.read_fixed_bytes(pos, 1, "boolean array element")?;
+        Ok(bytes[0] != 0)
+    }
+
+    pub fn get_byte(&self, pos: usize) -> Result<i8> {
+        let bytes = self.read_fixed_bytes(pos, 1, "byte array element")?;
+        Ok(bytes[0] as i8)
+    }
+
+    pub fn get_short(&self, pos: usize) -> Result<i16> {
+        self.read_i16(pos, "short array element")
+    }
+
+    pub fn get_int(&self, pos: usize) -> Result<i32> {
+        self.read_i32(pos, "int array element")
+    }
+
+    pub fn get_long(&self, pos: usize) -> Result<i64> {
+        self.read_i64(pos, "long array element")
+    }
+
+    pub fn get_float(&self, pos: usize) -> Result<f32> {
+        let bits = self.read_i32(pos, "float array element")? as u32;
+        Ok(f32::from_bits(bits))
+    }
+
+    pub fn get_double(&self, pos: usize) -> Result<f64> {
+        let bits = self.read_i64(pos, "double array element")? as u64;
+        Ok(f64::from_bits(bits))
+    }
+
+    /// Reads the offset_and_size packed long for variable-length elements.
+    fn get_offset_and_size(&self, pos: usize) -> Result<(usize, usize)> {
+        let packed = self.get_long(pos)? as u64;
+        let offset = (packed >> 32) as usize;
+        let size = (packed & 0xFFFF_FFFF) as usize;
+        Ok((offset, size))
+    }
+
+    pub fn get_string(&self, pos: usize) -> Result<&str> {
+        let bytes = self.read_var_len_bytes(pos)?;
+        std::str::from_utf8(bytes).map_err(|e| IllegalArgument {
+            message: format!("Invalid UTF-8 in array element at position 
{pos}: {e}"),
+        })
+    }
+
+    pub fn get_binary(&self, pos: usize) -> Result<&[u8]> {
+        self.read_var_len_bytes(pos)
+    }
+
+    pub fn get_decimal(&self, pos: usize, precision: u32, scale: u32) -> 
Result<Decimal> {
+        if Decimal::is_compact_precision(precision) {
+            let unscaled = self.get_long(pos)?;
+            Decimal::from_unscaled_long(unscaled, precision, scale)
+        } else {
+            let (offset, size) = self.get_offset_and_size(pos)?;
+            let bytes = self.checked_slice(offset, size, "decimal bytes")?;
+            Decimal::from_unscaled_bytes(bytes, precision, scale)
+        }
+    }
+
+    pub fn get_date(&self, pos: usize) -> Result<Date> {
+        Ok(Date::new(self.get_int(pos)?))
+    }
+
+    pub fn get_time(&self, pos: usize) -> Result<Time> {
+        Ok(Time::new(self.get_int(pos)?))
+    }
+
+    pub fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
Result<TimestampNtz> {
+        if TimestampNtz::is_compact(precision) {
+            Ok(TimestampNtz::new(self.get_long(pos)?))
+        } else {
+            let (offset, nanos_of_millis) = self.get_offset_and_size(pos)?;
+            let millis = self.read_i64_at_offset(offset, "timestamp ntz 
millis")?;
+            TimestampNtz::from_millis_nanos(millis, nanos_of_millis as i32)
+        }
+    }
+
+    pub fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
Result<TimestampLtz> {
+        if TimestampLtz::is_compact(precision) {
+            Ok(TimestampLtz::new(self.get_long(pos)?))
+        } else {
+            let (offset, nanos_of_millis) = self.get_offset_and_size(pos)?;
+            let millis = self.read_i64_at_offset(offset, "timestamp ltz 
millis")?;
+            TimestampLtz::from_millis_nanos(millis, nanos_of_millis as i32)
+        }
+    }
+
+    pub fn get_array(&self, pos: usize) -> Result<FlussArray> {
+        let (start, len) = self.read_var_len_span(pos)?;
+        FlussArray::from_owned_bytes(self.data.slice(start..start + len))
+    }
+}
+
+/// Writer for building a `FlussArray` element by element.
+/// Matches Java's `BinaryArrayWriter`.
+pub struct FlussArrayWriter {
+    data: Vec<u8>,
+    null_bits_offset: usize,
+    element_offset: usize,
+    element_size: usize,
+    cursor: usize,
+    num_elements: usize,
+}
+
+impl FlussArrayWriter {
+    /// Creates a new writer for an array with `num_elements` elements of the 
given element type.
+    pub fn new(num_elements: usize, element_type: &DataType) -> Self {
+        let element_size = calculate_fix_length_part_size(element_type);
+        Self::with_element_size(num_elements, element_size)
+    }
+
+    /// Creates a new writer with an explicit element size (in bytes).
+    pub fn with_element_size(num_elements: usize, element_size: usize) -> Self 
{
+        let header_in_bytes = calculate_header_in_bytes(num_elements);
+        let fixed_size = round_to_nearest_word(header_in_bytes + element_size 
* num_elements);
+        let mut data = vec![0u8; fixed_size];
+
+        // Java's MemorySegment.putInt() stores little-endian.
+        data[0..4].copy_from_slice(&(num_elements as i32).to_le_bytes());
+
+        FlussArrayWriter {
+            data,
+            null_bits_offset: 4,
+            element_offset: header_in_bytes,
+            element_size,
+            cursor: fixed_size,
+            num_elements,
+        }
+    }
+
+    fn get_element_offset(&self, pos: usize) -> usize {
+        self.element_offset + self.element_size * pos
+    }
+
+    /// Sets the null bit for the element at position `pos`.
+    pub fn set_null_at(&mut self, pos: usize) {
+        let byte_index = pos >> 3;
+        let bit = pos & 7;
+        self.data[self.null_bits_offset + byte_index] |= 1u8 << bit;
+    }
+
+    pub fn write_boolean(&mut self, pos: usize, value: bool) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    pub fn write_byte(&mut self, pos: usize, value: i8) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset] = value as u8;
+    }
+
+    pub fn write_short(&mut self, pos: usize, value: i16) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
+    }
+
+    pub fn write_int(&mut self, pos: usize, value: i32) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+    }
+
+    pub fn write_long(&mut self, pos: usize, value: i64) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+    }
+
+    pub fn write_float(&mut self, pos: usize, value: f32) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+    }
+
+    pub fn write_double(&mut self, pos: usize, value: f64) {
+        let offset = self.get_element_offset(pos);
+        self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+    }
+
+    /// Writes variable-length bytes to the variable part and stores 
offset+size in the fixed slot.
+    fn write_bytes_to_var_len_part(&mut self, pos: usize, bytes: &[u8]) {
+        let rounded = round_to_nearest_word(bytes.len());
+        let var_offset = self.cursor;
+        self.data.resize(self.data.len() + rounded, 0);
+        self.data[var_offset..var_offset + bytes.len()].copy_from_slice(bytes);
+        self.set_offset_and_size(pos, var_offset, bytes.len());
+        self.cursor += rounded;
+    }
+
+    fn set_offset_and_size(&mut self, pos: usize, offset: usize, size: usize) {
+        let packed = ((offset as i64) << 32) | (size as i64);
+        self.write_long(pos, packed);
+    }
+
+    fn write_bytes_to_fix_len_part(&mut self, pos: usize, bytes: &[u8]) {
+        let len = bytes.len();
+        debug_assert!(len <= MAX_FIX_PART_DATA_SIZE);
+        let first_byte = (len as u64) | 0x80;
+        let mut seven_bytes = 0_u64;
+        if cfg!(target_endian = "little") {
+            for (i, b) in bytes.iter().enumerate() {
+                seven_bytes |= ((*b as u64) & 0xFF) << (i * 8);
+            }
+        } else {
+            for (i, b) in bytes.iter().enumerate() {
+                seven_bytes |= ((*b as u64) & 0xFF) << ((6 - i) * 8);
+            }
+        }
+        let packed = ((first_byte << 56) | seven_bytes) as i64;
+        self.write_long(pos, packed);
+    }
+
+    pub fn write_string(&mut self, pos: usize, value: &str) {
+        let bytes = value.as_bytes();
+        if bytes.len() <= MAX_FIX_PART_DATA_SIZE {
+            self.write_bytes_to_fix_len_part(pos, bytes);
+        } else {
+            self.write_bytes_to_var_len_part(pos, bytes);
+        }
+    }
+
+    pub fn write_binary_bytes(&mut self, pos: usize, value: &[u8]) {
+        if value.len() <= MAX_FIX_PART_DATA_SIZE {
+            self.write_bytes_to_fix_len_part(pos, value);
+        } else {
+            self.write_bytes_to_var_len_part(pos, value);
+        }
+    }
+
+    pub fn write_decimal(&mut self, pos: usize, value: &Decimal, precision: 
u32) {
+        if Decimal::is_compact_precision(precision) {
+            self.write_long(
+                pos,
+                value
+                    .to_unscaled_long()
+                    .expect("Decimal should fit in i64 for compact precision"),
+            );
+        } else {
+            let bytes = value.to_unscaled_bytes();
+            self.write_bytes_to_var_len_part(pos, &bytes);
+        }
+    }
+
+    pub fn write_date(&mut self, pos: usize, value: Date) {
+        self.write_int(pos, value.get_inner());
+    }
+
+    pub fn write_time(&mut self, pos: usize, value: Time) {
+        self.write_int(pos, value.get_inner());
+    }
+
+    pub fn write_timestamp_ntz(&mut self, pos: usize, value: &TimestampNtz, 
precision: u32) {
+        if TimestampNtz::is_compact(precision) {
+            self.write_long(pos, value.get_millisecond());
+        } else {
+            let millis_bytes = value.get_millisecond().to_le_bytes();
+            let var_offset = self.cursor;
+            let rounded = round_to_nearest_word(8);
+            self.data.resize(self.data.len() + rounded, 0);
+            self.data[var_offset..var_offset + 
8].copy_from_slice(&millis_bytes);
+            self.set_offset_and_size(pos, var_offset, 
value.get_nano_of_millisecond() as usize);
+            self.cursor += rounded;
+        }
+    }
+
+    pub fn write_timestamp_ltz(&mut self, pos: usize, value: &TimestampLtz, 
precision: u32) {
+        if TimestampLtz::is_compact(precision) {
+            self.write_long(pos, value.get_epoch_millisecond());
+        } else {
+            let millis_bytes = value.get_epoch_millisecond().to_le_bytes();
+            let var_offset = self.cursor;
+            let rounded = round_to_nearest_word(8);
+            self.data.resize(self.data.len() + rounded, 0);
+            self.data[var_offset..var_offset + 
8].copy_from_slice(&millis_bytes);
+            self.set_offset_and_size(pos, var_offset, 
value.get_nano_of_millisecond() as usize);
+            self.cursor += rounded;
+        }
+    }
+
+    /// Writes a nested FlussArray into this array at position `pos`.
+    pub fn write_array(&mut self, pos: usize, value: &FlussArray) {
+        self.write_bytes_to_var_len_part(pos, value.as_bytes());
+    }
+
+    /// Finalizes the writer and returns the completed FlussArray.
+    pub fn complete(self) -> Result<FlussArray> {
+        let mut data = self.data;
+        data.truncate(self.cursor);
+        FlussArray::from_vec(data)
+    }
+
+    /// Returns the number of elements this writer was initialized with.
+    pub fn num_elements(&self) -> usize {
+        self.num_elements
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+
+    #[test]
+    fn test_header_calculation() {
+        assert_eq!(calculate_header_in_bytes(0), 4);
+        assert_eq!(calculate_header_in_bytes(1), 8);
+        assert_eq!(calculate_header_in_bytes(31), 8);
+        assert_eq!(calculate_header_in_bytes(32), 8);
+        assert_eq!(calculate_header_in_bytes(33), 12);
+        assert_eq!(calculate_header_in_bytes(64), 12);
+        assert_eq!(calculate_header_in_bytes(65), 16);
+    }
+
+    #[test]
+    fn test_fix_length_part_size() {
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::boolean()), 1);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::tinyint()), 1);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::smallint()), 2);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::int()), 4);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::bigint()), 8);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::float()), 4);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::double()), 8);
+        assert_eq!(calculate_fix_length_part_size(&DataTypes::string()), 8);
+        assert_eq!(
+            
calculate_fix_length_part_size(&DataTypes::array(DataTypes::int())),
+            8
+        );
+    }
+
+    #[test]
+    fn test_round_trip_int_array() {
+        let elem_type = DataTypes::int();
+        let mut writer = FlussArrayWriter::new(3, &elem_type);
+        writer.write_int(0, 10);
+        writer.write_int(1, 20);
+        writer.write_int(2, 30);
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.size(), 3);
+        assert!(!array.is_null_at(0));
+        assert_eq!(array.get_int(0).unwrap(), 10);
+        assert_eq!(array.get_int(1).unwrap(), 20);
+        assert_eq!(array.get_int(2).unwrap(), 30);
+    }
+
+    #[test]
+    fn test_round_trip_with_nulls() {
+        let elem_type = DataTypes::int();
+        let mut writer = FlussArrayWriter::new(3, &elem_type);
+        writer.write_int(0, 1);
+        writer.set_null_at(1);
+        writer.write_int(2, 3);
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.size(), 3);
+        assert!(!array.is_null_at(0));
+        assert!(array.is_null_at(1));
+        assert!(!array.is_null_at(2));
+        assert_eq!(array.get_int(0).unwrap(), 1);
+        assert_eq!(array.get_int(2).unwrap(), 3);
+    }
+
+    #[test]
+    fn test_round_trip_string_array() {
+        let elem_type = DataTypes::string();
+        let mut writer = FlussArrayWriter::new(3, &elem_type);
+        writer.write_string(0, "hello");
+        writer.write_string(1, "world");
+        writer.write_string(2, "!");
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.size(), 3);
+        assert_eq!(array.get_string(0).unwrap(), "hello");
+        assert_eq!(array.get_string(1).unwrap(), "world");
+        assert_eq!(array.get_string(2).unwrap(), "!");
+    }
+
+    #[test]
+    fn test_java_inline_short_string_decoding() {
+        // Manually construct Java-style inline encoded short string ("abc")
+        // slot payload: [len|0x80 in top byte] + [bytes in low 7 bytes on 
little-endian]
+        let mut data = vec![0_u8; 16];
+        data[0..4].copy_from_slice(&(1_i32).to_le_bytes());
+        // null bits remain 0
+        let first_byte = (3_u64 | 0x80) << 56;
+        let seven_bytes = (b'a' as u64) | ((b'b' as u64) << 8) | ((b'c' as 
u64) << 16);
+        let packed = first_byte | seven_bytes;
+        data[8..16].copy_from_slice(&packed.to_le_bytes());
+
+        let arr = FlussArray::from_bytes(&data).unwrap();
+        assert_eq!(arr.size(), 1);
+        assert_eq!(arr.get_string(0).unwrap(), "abc");
+    }
+
+    #[test]
+    fn test_java_inline_short_binary_decoding() {
+        let elem_type = DataTypes::bytes();
+        let mut writer = FlussArrayWriter::new(1, &elem_type);
+        writer.write_binary_bytes(0, b"abc");
+        let arr = writer.complete().unwrap();
+        assert_eq!(arr.get_binary(0).unwrap(), b"abc");
+    }
+
+    #[test]
+    fn test_round_trip_empty_array() {
+        let elem_type = DataTypes::int();
+        let writer = FlussArrayWriter::new(0, &elem_type);
+        let array = writer.complete().unwrap();
+        assert_eq!(array.size(), 0);
+    }
+
+    #[test]
+    fn test_round_trip_boolean_array() {
+        let elem_type = DataTypes::boolean();
+        let mut writer = FlussArrayWriter::new(3, &elem_type);
+        writer.write_boolean(0, true);
+        writer.write_boolean(1, false);
+        writer.write_boolean(2, true);
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.size(), 3);
+        assert!(array.get_boolean(0).unwrap());
+        assert!(!array.get_boolean(1).unwrap());
+        assert!(array.get_boolean(2).unwrap());
+    }
+
+    #[test]
+    fn test_round_trip_long_array() {
+        let elem_type = DataTypes::bigint();
+        let mut writer = FlussArrayWriter::new(2, &elem_type);
+        writer.write_long(0, i64::MAX);
+        writer.write_long(1, i64::MIN);
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.get_long(0).unwrap(), i64::MAX);
+        assert_eq!(array.get_long(1).unwrap(), i64::MIN);
+    }
+
+    #[test]
+    fn test_round_trip_double_array() {
+        let elem_type = DataTypes::double();
+        let mut writer = FlussArrayWriter::new(2, &elem_type);
+        writer.write_double(0, 1.23);
+        writer.write_double(1, -4.56);
+        let array = writer.complete().unwrap();
+
+        assert_eq!(array.get_double(0).unwrap(), 1.23);
+        assert_eq!(array.get_double(1).unwrap(), -4.56);
+    }
+
+    #[test]
+    fn test_round_trip_nested_array() {
+        let inner_type = DataTypes::int();
+        let outer_type = DataTypes::array(DataTypes::int());
+
+        // Build inner array [1, 2]
+        let mut inner_writer = FlussArrayWriter::new(2, &inner_type);
+        inner_writer.write_int(0, 1);
+        inner_writer.write_int(1, 2);
+        let inner_array = inner_writer.complete().unwrap();
+
+        // Build outer array containing the inner array
+        let mut outer_writer = FlussArrayWriter::new(1, &outer_type);
+        outer_writer.write_array(0, &inner_array);
+        let outer_array = outer_writer.complete().unwrap();
+
+        assert_eq!(outer_array.size(), 1);
+        let nested = outer_array.get_array(0).unwrap();
+        assert_eq!(nested.size(), 2);
+        assert_eq!(nested.get_int(0).unwrap(), 1);
+        assert_eq!(nested.get_int(1).unwrap(), 2);
+    }
+
+    #[test]
+    fn test_primitive_getter_out_of_bounds_returns_error() {
+        let elem_type = DataTypes::int();
+        let mut writer = FlussArrayWriter::new(1, &elem_type);
+        writer.write_int(0, 10);
+        let array = writer.complete().unwrap();
+
+        let err = array.get_int(1).unwrap_err();
+        assert!(
+            err.to_string().contains("out of bounds"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn test_primitive_getter_on_malformed_payload_returns_error() {
+        // Size says 1, but payload only contains header (no element bytes).
+        let mut data = vec![0_u8; 8];
+        data[0..4].copy_from_slice(&(1_i32).to_le_bytes());
+        let arr = FlussArray::from_bytes(&data).unwrap();
+
+        let err = arr.get_int(0).unwrap_err();
+        assert!(
+            err.to_string().contains("Out-of-bounds"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn test_binary_layout_matches_java() {
+        // Verify exact byte layout for a simple [1, 2, 3] int array
+        let elem_type = DataTypes::int();
+        let mut writer = FlussArrayWriter::new(3, &elem_type);
+        writer.write_int(0, 1);
+        writer.write_int(1, 2);
+        writer.write_int(2, 3);
+        let array = writer.complete().unwrap();
+        let bytes = array.as_bytes();
+
+        // size = 3 at offset 0 (4 bytes, little-endian per Java 
MemorySegment.putInt)
+        assert_eq!(i32::from_le_bytes(bytes[0..4].try_into().unwrap()), 3);
+        // null bits: 4 bytes starting at offset 4, should be all zeros
+        assert_eq!(&bytes[4..8], &[0, 0, 0, 0]);
+        // elements start at offset 8 (header = 4 + 4), each 4 bytes 
(little-endian)
+        assert_eq!(i32::from_le_bytes(bytes[8..12].try_into().unwrap()), 1);
+        assert_eq!(i32::from_le_bytes(bytes[12..16].try_into().unwrap()), 2);
+        assert_eq!(i32::from_le_bytes(bytes[16..20].try_into().unwrap()), 3);
+    }
+}
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index c07fe97..4a3e708 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -19,7 +19,10 @@ use crate::error::Error::IllegalArgument;
 use crate::error::Result;
 use crate::row::InternalRow;
 use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
-use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray};
+use arrow::array::{
+    Array, AsArray, BinaryArray, BooleanArray, FixedSizeBinaryArray, 
ListArray, RecordBatch,
+    StringArray,
+};
 use arrow::datatypes::{
     DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type, 
Float64Type, Int8Type,
     Int16Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType,
@@ -407,17 +410,379 @@ impl InternalRow for ColumnarRow {
             })?
             .value(self.row_id))
     }
+
+    fn get_array(&self, pos: usize) -> Result<crate::row::FlussArray> {
+        use crate::record::from_arrow_type;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let column = self.column(pos)?;
+        let list_array =
+            column
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .ok_or_else(|| IllegalArgument {
+                    message: format!("expected List array at position {pos}"),
+                })?;
+
+        let values = list_array.value(self.row_id);
+        let element_fluss_type = from_arrow_type(values.data_type())?;
+        let mut writer = FlussArrayWriter::new(values.len(), 
&element_fluss_type);
+
+        write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut 
writer)?;
+        writer.complete()
+    }
+}
+
+/// Downcast to a primitive Arrow array type, then loop with null checks 
calling a writer method.
+macro_rules! write_primitive_elements {
+    ($values:expr, $arrow_type:ty, $element_type:expr, $writer:expr, 
$write_method:ident) => {{
+        let arr = $values
+            .as_primitive_opt::<$arrow_type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "Expected {} for {:?} element",
+                    stringify!($arrow_type),
+                    $element_type
+                ),
+            })?;
+        for i in 0..arr.len() {
+            if arr.is_null(i) {
+                $writer.set_null_at(i);
+            } else {
+                $writer.$write_method(i, arr.value(i));
+            }
+        }
+    }};
+}
+
+/// Downcast via `downcast_ref`, then loop with null checks calling a writer 
method.
+macro_rules! write_downcast_elements {
+    ($values:expr, $array_type:ty, $element_type:expr, $writer:expr, 
$write_method:ident) => {{
+        let arr = $values
+            .as_any()
+            .downcast_ref::<$array_type>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "Expected {} for {:?} element",
+                    stringify!($array_type),
+                    $element_type
+                ),
+            })?;
+        for i in 0..arr.len() {
+            if arr.is_null(i) {
+                $writer.set_null_at(i);
+            } else {
+                $writer.$write_method(i, arr.value(i));
+            }
+        }
+    }};
+}
+
+/// Converts all elements of an Arrow array into a `FlussArrayWriter`, 
downcasting
+/// the Arrow array once per call rather than per element.
+fn write_arrow_values_to_fluss_array(
+    values: &dyn Array,
+    element_type: &crate::metadata::DataType,
+    writer: &mut crate::row::binary_array::FlussArrayWriter,
+) -> Result<()> {
+    use crate::metadata::DataType;
+    use crate::record::from_arrow_type;
+    use crate::row::binary_array::FlussArrayWriter;
+
+    let len = values.len();
+
+    match element_type {
+        DataType::Boolean(_) => {
+            write_downcast_elements!(values, BooleanArray, element_type, 
writer, write_boolean)
+        }
+        DataType::TinyInt(_) => {
+            write_primitive_elements!(values, Int8Type, element_type, writer, 
write_byte)
+        }
+        DataType::SmallInt(_) => {
+            write_primitive_elements!(values, Int16Type, element_type, writer, 
write_short)
+        }
+        DataType::Int(_) => {
+            write_primitive_elements!(values, Int32Type, element_type, writer, 
write_int)
+        }
+        DataType::BigInt(_) => {
+            write_primitive_elements!(values, Int64Type, element_type, writer, 
write_long)
+        }
+        DataType::Float(_) => {
+            write_primitive_elements!(values, Float32Type, element_type, 
writer, write_float)
+        }
+        DataType::Double(_) => {
+            write_primitive_elements!(values, Float64Type, element_type, 
writer, write_double)
+        }
+        DataType::Char(_) | DataType::String(_) => {
+            write_downcast_elements!(values, StringArray, element_type, 
writer, write_string)
+        }
+        DataType::Binary(_) => {
+            write_downcast_elements!(
+                values,
+                FixedSizeBinaryArray,
+                element_type,
+                writer,
+                write_binary_bytes
+            )
+        }
+        DataType::Bytes(_) => {
+            write_downcast_elements!(
+                values,
+                BinaryArray,
+                element_type,
+                writer,
+                write_binary_bytes
+            )
+        }
+        DataType::Decimal(dt) => {
+            let arr =
+                values
+                    .as_primitive_opt::<Decimal128Type>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("Expected Decimal128Array for 
{element_type:?} element"),
+                    })?;
+            let arrow_scale = match values.data_type() {
+                ArrowDataType::Decimal128(_p, s) => *s as i64,
+                other => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Expected Decimal128 data type for 
{element_type:?} element, got {other:?}"
+                        ),
+                    });
+                }
+            };
+            let precision = dt.precision();
+            let scale = dt.scale();
+            for i in 0..len {
+                if arr.is_null(i) {
+                    writer.set_null_at(i);
+                } else {
+                    let d = crate::row::Decimal::from_arrow_decimal128(
+                        arr.value(i),
+                        arrow_scale,
+                        precision,
+                        scale,
+                    )?;
+                    writer.write_decimal(i, &d, precision);
+                }
+            }
+        }
+        DataType::Date(_) => {
+            let arr = values
+                .as_primitive_opt::<Date32Type>()
+                .ok_or_else(|| IllegalArgument {
+                    message: format!("Expected Date32Array for 
{element_type:?} element"),
+                })?;
+            for i in 0..len {
+                if arr.is_null(i) {
+                    writer.set_null_at(i);
+                } else {
+                    writer.write_date(i, Date::new(arr.value(i)));
+                }
+            }
+        }
+        DataType::Time(_) => {
+            write_time_elements(values, element_type, writer)?;
+        }
+        DataType::Timestamp(ts_type) => {
+            write_timestamp_elements(
+                values,
+                element_type,
+                writer,
+                ts_type.precision(),
+                TimestampNtz::new,
+                TimestampNtz::from_millis_nanos,
+                |w, i, ts, p| w.write_timestamp_ntz(i, &ts, p),
+            )?;
+        }
+        DataType::TimestampLTz(ts_type) => {
+            write_timestamp_elements(
+                values,
+                element_type,
+                writer,
+                ts_type.precision(),
+                TimestampLtz::new,
+                TimestampLtz::from_millis_nanos,
+                |w, i, ts, p| w.write_timestamp_ltz(i, &ts, p),
+            )?;
+        }
+        DataType::Array(_) => {
+            let list_arr =
+                values
+                    .as_any()
+                    .downcast_ref::<ListArray>()
+                    .ok_or_else(|| IllegalArgument {
+                        message: format!("Expected ListArray for 
{element_type:?} element"),
+                    })?;
+            let nested_element_type = from_arrow_type(&list_arr.value_type())?;
+            for i in 0..len {
+                if list_arr.is_null(i) {
+                    writer.set_null_at(i);
+                } else {
+                    let nested_values = list_arr.value(i);
+                    let mut nested_writer =
+                        FlussArrayWriter::new(nested_values.len(), 
&nested_element_type);
+                    write_arrow_values_to_fluss_array(
+                        &*nested_values,
+                        &nested_element_type,
+                        &mut nested_writer,
+                    )?;
+                    let nested_array = nested_writer.complete()?;
+                    writer.write_array(i, &nested_array);
+                }
+            }
+        }
+        _ => {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Unsupported element type for Arrow → FlussArray 
conversion: {element_type:?}"
+                ),
+            });
+        }
+    }
+    Ok(())
+}
+
+fn write_time_elements(
+    values: &dyn Array,
+    element_type: &crate::metadata::DataType,
+    writer: &mut crate::row::binary_array::FlussArrayWriter,
+) -> Result<()> {
+    macro_rules! process_time {
+        ($arrow_type:ty, $to_millis:expr) => {{
+            let arr = values
+                .as_primitive_opt::<$arrow_type>()
+                .ok_or_else(|| IllegalArgument {
+                    message: format!(
+                        "Expected {} for {:?} element",
+                        stringify!($arrow_type),
+                        element_type
+                    ),
+                })?;
+            for i in 0..arr.len() {
+                if arr.is_null(i) {
+                    writer.set_null_at(i);
+                } else {
+                    let to_millis_fn = $to_millis;
+                    writer.write_time(i, 
Time::new(to_millis_fn(arr.value(i))));
+                }
+            }
+        }};
+    }
+
+    match values.data_type() {
+        ArrowDataType::Time32(TimeUnit::Second) => {
+            process_time!(Time32SecondType, |v: i32| v * 1000);
+        }
+        ArrowDataType::Time32(TimeUnit::Millisecond) => {
+            process_time!(Time32MillisecondType, |v: i32| v);
+        }
+        ArrowDataType::Time64(TimeUnit::Microsecond) => {
+            process_time!(Time64MicrosecondType, |v: i64| (v / 1000) as i32);
+        }
+        ArrowDataType::Time64(TimeUnit::Nanosecond) => {
+            process_time!(Time64NanosecondType, |v: i64| (v / 1_000_000) as 
i32);
+        }
+        other => {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Expected Time column for {element_type:?} element, got 
{other:?}"
+                ),
+            });
+        }
+    }
+    Ok(())
+}
+
+fn convert_timestamp_raw(raw: i64, unit: &TimeUnit) -> (i64, i32) {
+    match unit {
+        TimeUnit::Second => (raw * 1000, 0),
+        TimeUnit::Millisecond => (raw, 0),
+        TimeUnit::Microsecond => {
+            let millis = raw.div_euclid(1000);
+            let nanos = (raw.rem_euclid(1000) * 1000) as i32;
+            (millis, nanos)
+        }
+        TimeUnit::Nanosecond => {
+            let millis = raw.div_euclid(1_000_000);
+            let nanos = raw.rem_euclid(1_000_000) as i32;
+            (millis, nanos)
+        }
+    }
+}
+
+fn write_timestamp_elements<T>(
+    values: &dyn Array,
+    element_type: &crate::metadata::DataType,
+    writer: &mut crate::row::binary_array::FlussArrayWriter,
+    precision: u32,
+    construct_compact: impl Fn(i64) -> T,
+    construct_with_nanos: impl Fn(i64, i32) -> Result<T>,
+    write_fn: impl Fn(&mut crate::row::binary_array::FlussArrayWriter, usize, 
T, u32),
+) -> Result<()> {
+    let unit = match values.data_type() {
+        ArrowDataType::Timestamp(unit, _) => unit,
+        other => {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Expected Timestamp column for {element_type:?} element, 
got {other:?}"
+                ),
+            });
+        }
+    };
+
+    macro_rules! process_ts {
+        ($arrow_type:ty) => {{
+            let arr = values
+                .as_primitive_opt::<$arrow_type>()
+                .ok_or_else(|| IllegalArgument {
+                    message: format!(
+                        "Expected {} for {:?} element",
+                        stringify!($arrow_type),
+                        element_type
+                    ),
+                })?;
+            for i in 0..arr.len() {
+                if arr.is_null(i) {
+                    writer.set_null_at(i);
+                    continue;
+                }
+                let (millis, nanos) = convert_timestamp_raw(arr.value(i), 
unit);
+                let ts = if nanos == 0 {
+                    construct_compact(millis)
+                } else {
+                    construct_with_nanos(millis, nanos)?
+                };
+                write_fn(writer, i, ts, precision);
+            }
+        }};
+    }
+
+    match unit {
+        TimeUnit::Second => process_ts!(TimestampSecondType),
+        TimeUnit::Millisecond => process_ts!(TimestampMillisecondType),
+        TimeUnit::Microsecond => process_ts!(TimestampMicrosecondType),
+        TimeUnit::Nanosecond => process_ts!(TimestampNanosecondType),
+    }
+    Ok(())
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
     use arrow::array::{
-        BinaryArray, BooleanArray, Decimal128Array, Float32Array, 
Float64Array, Int8Array,
-        Int16Array, Int32Array, Int64Array, StringArray,
+        ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float32Array, 
Float64Array,
+        Int8Array, Int16Array, Int32Array, Int32Builder, Int64Array, 
ListBuilder, StringArray,
+        UInt32Builder,
     };
     use arrow::datatypes::{DataType, Field, Schema};
 
+    fn single_column_row(array: ArrayRef) -> ColumnarRow {
+        let batch =
+            RecordBatch::try_from_iter(vec![("arr", array)]).expect("record 
batch with one column");
+        ColumnarRow::new(Arc::new(batch))
+    }
+
     #[test]
     fn columnar_row_reads_values() {
         let schema = Arc::new(Schema::new(vec![
@@ -533,4 +898,96 @@ mod tests {
             .unwrap()
         );
     }
+
+    #[test]
+    fn columnar_row_get_array_int_roundtrip() {
+        let mut builder = ListBuilder::new(Int32Builder::new());
+        builder.values().append_value(1);
+        builder.values().append_value(2);
+        builder.values().append_value(3);
+        builder.append(true);
+        let array = Arc::new(builder.finish()) as ArrayRef;
+
+        let row = single_column_row(array);
+        let arr = row.get_array(0).unwrap();
+        assert_eq!(arr.size(), 3);
+        assert_eq!(arr.get_int(0).unwrap(), 1);
+        assert_eq!(arr.get_int(1).unwrap(), 2);
+        assert_eq!(arr.get_int(2).unwrap(), 3);
+    }
+
+    #[test]
+    fn columnar_row_get_array_with_nulls() {
+        let mut builder = ListBuilder::new(Int32Builder::new());
+        builder.values().append_value(1);
+        builder.values().append_null();
+        builder.values().append_value(3);
+        builder.append(true);
+        let array = Arc::new(builder.finish()) as ArrayRef;
+
+        let row = single_column_row(array);
+        let arr = row.get_array(0).unwrap();
+        assert_eq!(arr.size(), 3);
+        assert_eq!(arr.get_int(0).unwrap(), 1);
+        assert!(arr.is_null_at(1));
+        assert_eq!(arr.get_int(2).unwrap(), 3);
+    }
+
+    #[test]
+    fn columnar_row_get_array_nested_array() {
+        let mut outer = 
ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        // first nested array: [1, 2]
+        outer.values().values().append_value(1);
+        outer.values().values().append_value(2);
+        outer.values().append(true);
+
+        // second nested array: [99]
+        outer.values().values().append_value(99);
+        outer.values().append(true);
+
+        // one row containing two nested arrays
+        outer.append(true);
+        let array = Arc::new(outer.finish()) as ArrayRef;
+
+        let row = single_column_row(array);
+        let arr = row.get_array(0).unwrap();
+        assert_eq!(arr.size(), 2);
+
+        let nested0 = arr.get_array(0).unwrap();
+        assert_eq!(nested0.size(), 2);
+        assert_eq!(nested0.get_int(0).unwrap(), 1);
+        assert_eq!(nested0.get_int(1).unwrap(), 2);
+
+        let nested1 = arr.get_array(1).unwrap();
+        assert_eq!(nested1.size(), 1);
+        assert_eq!(nested1.get_int(0).unwrap(), 99);
+    }
+
+    #[test]
+    fn columnar_row_get_array_non_list_column_returns_error() {
+        let array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+        let row = single_column_row(array);
+        let err = row.get_array(0).unwrap_err();
+        assert!(
+            err.to_string().contains("expected List array"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn columnar_row_get_array_unsupported_element_type_returns_error() {
+        let mut builder = ListBuilder::new(UInt32Builder::new());
+        builder.values().append_value(7);
+        builder.append(true);
+        let array = Arc::new(builder.finish()) as ArrayRef;
+
+        let row = single_column_row(array);
+        let err = row.get_array(0).unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Cannot convert Arrow type to Fluss type"),
+            "unexpected error: {err}"
+        );
+    }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs 
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
index 339e366..c694065 100644
--- a/crates/fluss/src/row/compacted/compacted_key_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -47,6 +47,15 @@ impl CompactedKeyWriter {
     }
 
     pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
+        // Java's CompactedKeyEncoder allows encoding Array types (Map/Row
+        // are not yet supported by ValueWriter). The server rejects
+        // unsupported key types at table-creation time, so encoding is
+        // allowed here to match Java parity.
+        if matches!(field_type, DataType::Map(_) | DataType::Row(_)) {
+            return Err(crate::error::Error::IllegalArgument {
+                message: format!("Cannot use {field_type:?} as a key column 
type"),
+            });
+        }
         ValueWriter::create_value_writer(field_type, 
Some(&BinaryRowFormat::Compacted))
     }
 
@@ -101,6 +110,8 @@ impl BinaryWriter for CompactedKeyWriter {
             fn write_timestamp_ntz(&mut self, value: 
&crate::row::datum::TimestampNtz, precision: u32);
 
             fn write_timestamp_ltz(&mut self, value: 
&crate::row::datum::TimestampLtz, precision: u32);
+
+            fn write_array(&mut self, value: &[u8]);
         }
     }
 
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 918ebdf..267ae13 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -68,9 +68,16 @@ impl<'a> CompactedRow<'a> {
         self.size_in_bytes
     }
 
-    fn decoded_row(&self) -> &GenericRow<'_> {
-        self.decoded_row
-            .get_or_init(|| self.deserializer.deserialize(&self.reader))
+    fn decoded_row(&self) -> Result<&GenericRow<'_>> {
+        if let Some(row) = self.decoded_row.get() {
+            return Ok(row);
+        }
+
+        // `OnceLock::get_or_try_init` is still unstable on our toolchain.
+        // Keep the same semantics by performing the fallible decode first,
+        // then atomically installing it via `get_or_init`.
+        let decoded = self.deserializer.deserialize(&self.reader)?;
+        Ok(self.decoded_row.get_or_init(|| decoded))
     }
 
     pub fn as_bytes(&self) -> &[u8] {
@@ -97,67 +104,71 @@ impl<'a> InternalRow for CompactedRow<'a> {
     }
 
     fn get_boolean(&self, pos: usize) -> Result<bool> {
-        self.decoded_row().get_boolean(pos)
+        self.decoded_row()?.get_boolean(pos)
     }
 
     fn get_byte(&self, pos: usize) -> Result<i8> {
-        self.decoded_row().get_byte(pos)
+        self.decoded_row()?.get_byte(pos)
     }
 
     fn get_short(&self, pos: usize) -> Result<i16> {
-        self.decoded_row().get_short(pos)
+        self.decoded_row()?.get_short(pos)
     }
 
     fn get_int(&self, pos: usize) -> Result<i32> {
-        self.decoded_row().get_int(pos)
+        self.decoded_row()?.get_int(pos)
     }
 
     fn get_long(&self, pos: usize) -> Result<i64> {
-        self.decoded_row().get_long(pos)
+        self.decoded_row()?.get_long(pos)
     }
 
     fn get_float(&self, pos: usize) -> Result<f32> {
-        self.decoded_row().get_float(pos)
+        self.decoded_row()?.get_float(pos)
     }
 
     fn get_double(&self, pos: usize) -> Result<f64> {
-        self.decoded_row().get_double(pos)
+        self.decoded_row()?.get_double(pos)
     }
 
     fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
-        self.decoded_row().get_char(pos, length)
+        self.decoded_row()?.get_char(pos, length)
     }
 
     fn get_string(&self, pos: usize) -> Result<&str> {
-        self.decoded_row().get_string(pos)
+        self.decoded_row()?.get_string(pos)
     }
 
     fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Result<Decimal> {
-        self.decoded_row().get_decimal(pos, precision, scale)
+        self.decoded_row()?.get_decimal(pos, precision, scale)
     }
 
     fn get_date(&self, pos: usize) -> Result<Date> {
-        self.decoded_row().get_date(pos)
+        self.decoded_row()?.get_date(pos)
     }
 
     fn get_time(&self, pos: usize) -> Result<Time> {
-        self.decoded_row().get_time(pos)
+        self.decoded_row()?.get_time(pos)
     }
 
     fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
Result<TimestampNtz> {
-        self.decoded_row().get_timestamp_ntz(pos, precision)
+        self.decoded_row()?.get_timestamp_ntz(pos, precision)
     }
 
     fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
Result<TimestampLtz> {
-        self.decoded_row().get_timestamp_ltz(pos, precision)
+        self.decoded_row()?.get_timestamp_ltz(pos, precision)
     }
 
     fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
-        self.decoded_row().get_binary(pos, length)
+        self.decoded_row()?.get_binary(pos, length)
     }
 
     fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
-        self.decoded_row().get_bytes(pos)
+        self.decoded_row()?.get_bytes(pos)
+    }
+
+    fn get_array(&self, pos: usize) -> Result<crate::row::FlussArray> {
+        self.decoded_row()?.get_array(pos)
     }
 
     fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
@@ -327,4 +338,157 @@ mod tests {
             999999999999999999i64
         );
     }
+
+    #[test]
+    fn test_compacted_row_int_array() {
+        use crate::metadata::DataTypes;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let row_type =
+            RowType::with_data_types(vec![DataTypes::int(), 
DataTypes::array(DataTypes::int())]);
+
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
+        writer.write_int(42);
+
+        let elem_type = DataTypes::int();
+        let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+        arr_writer.write_int(0, 1);
+        arr_writer.write_int(1, 2);
+        arr_writer.write_int(2, 3);
+        let arr = arr_writer.complete().unwrap();
+        writer.write_array(arr.as_bytes());
+
+        let bytes = writer.to_bytes();
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        assert_eq!(row.get_int(0).unwrap(), 42);
+        let read_arr = row.get_array(1).unwrap();
+        assert_eq!(read_arr.size(), 3);
+        assert_eq!(read_arr.get_int(0).unwrap(), 1);
+        assert_eq!(read_arr.get_int(1).unwrap(), 2);
+        assert_eq!(read_arr.get_int(2).unwrap(), 3);
+    }
+
+    #[test]
+    fn test_compacted_row_string_array() {
+        use crate::metadata::DataTypes;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let row_type = 
RowType::with_data_types(vec![DataTypes::array(DataTypes::string())]);
+
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+        let elem_type = DataTypes::string();
+        let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+        arr_writer.write_string(0, "hello");
+        arr_writer.write_string(1, "fluss");
+        arr_writer.write_string(2, "rust");
+        let arr = arr_writer.complete().unwrap();
+        writer.write_array(arr.as_bytes());
+
+        let bytes = writer.to_bytes();
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        let read_arr = row.get_array(0).unwrap();
+        assert_eq!(read_arr.size(), 3);
+        assert_eq!(read_arr.get_string(0).unwrap(), "hello");
+        assert_eq!(read_arr.get_string(1).unwrap(), "fluss");
+        assert_eq!(read_arr.get_string(2).unwrap(), "rust");
+    }
+
+    #[test]
+    fn test_compacted_row_array_with_nulls() {
+        use crate::metadata::DataTypes;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let row_type = 
RowType::with_data_types(vec![DataTypes::array(DataTypes::int())]);
+
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+        let elem_type = DataTypes::int();
+        let mut arr_writer = FlussArrayWriter::new(3, &elem_type);
+        arr_writer.write_int(0, 10);
+        arr_writer.set_null_at(1);
+        arr_writer.write_int(2, 30);
+        let arr = arr_writer.complete().unwrap();
+        writer.write_array(arr.as_bytes());
+
+        let bytes = writer.to_bytes();
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        let read_arr = row.get_array(0).unwrap();
+        assert_eq!(read_arr.size(), 3);
+        assert!(!read_arr.is_null_at(0));
+        assert_eq!(read_arr.get_int(0).unwrap(), 10);
+        assert!(read_arr.is_null_at(1));
+        assert!(!read_arr.is_null_at(2));
+        assert_eq!(read_arr.get_int(2).unwrap(), 30);
+    }
+
+    #[test]
+    fn test_compacted_row_empty_array() {
+        use crate::metadata::DataTypes;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let row_type = 
RowType::with_data_types(vec![DataTypes::array(DataTypes::int())]);
+
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+        let elem_type = DataTypes::int();
+        let arr_writer = FlussArrayWriter::new(0, &elem_type);
+        let arr = arr_writer.complete().unwrap();
+        writer.write_array(arr.as_bytes());
+
+        let bytes = writer.to_bytes();
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        let read_arr = row.get_array(0).unwrap();
+        assert_eq!(read_arr.size(), 0);
+    }
+
+    #[test]
+    fn test_compacted_row_nested_array() {
+        use crate::metadata::DataTypes;
+        use crate::row::binary_array::FlussArrayWriter;
+
+        let row_type =
+            
RowType::with_data_types(vec![DataTypes::array(DataTypes::array(DataTypes::int()))]);
+
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
+
+        // Build inner arrays
+        let inner_type = DataTypes::int();
+        let mut inner1 = FlussArrayWriter::new(2, &inner_type);
+        inner1.write_int(0, 1);
+        inner1.write_int(1, 2);
+        let inner1_arr = inner1.complete().unwrap();
+
+        let mut inner2 = FlussArrayWriter::new(1, &inner_type);
+        inner2.write_int(0, 99);
+        let inner2_arr = inner2.complete().unwrap();
+
+        // Build outer array
+        let outer_type = DataTypes::array(DataTypes::int());
+        let mut outer_writer = FlussArrayWriter::new(2, &outer_type);
+        outer_writer.write_array(0, &inner1_arr);
+        outer_writer.write_array(1, &inner2_arr);
+        let outer_arr = outer_writer.complete().unwrap();
+
+        writer.write_array(outer_arr.as_bytes());
+
+        let bytes = writer.to_bytes();
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        let read_outer = row.get_array(0).unwrap();
+        assert_eq!(read_outer.size(), 2);
+
+        let nested1 = read_outer.get_array(0).unwrap();
+        assert_eq!(nested1.size(), 2);
+        assert_eq!(nested1.get_int(0).unwrap(), 1);
+        assert_eq!(nested1.get_int(1).unwrap(), 2);
+
+        let nested2 = read_outer.get_array(1).unwrap();
+        assert_eq!(nested2.size(), 1);
+        assert_eq!(nested2.get_int(0).unwrap(), 99);
+    }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 00e53aa..4ae442f 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -18,6 +18,7 @@
 use crate::metadata::RowType;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
+    error::{Error::IllegalArgument, Result},
     metadata::DataType,
     row::{Datum, Decimal, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
     util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
@@ -49,7 +50,7 @@ impl<'a> CompactedRowDeserializer<'a> {
         self.row_type.as_ref()
     }
 
-    pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
+    pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
Result<GenericRow<'a>> {
         let mut row = GenericRow::new(self.row_type.fields().len());
         let mut cursor = reader.initial_position();
         for (col_pos, data_field) in self.row_type.fields().iter().enumerate() 
{
@@ -60,41 +61,41 @@ impl<'a> CompactedRowDeserializer<'a> {
             }
             let (datum, next_cursor) = match dtype {
                 DataType::Boolean(_) => {
-                    let (val, next) = reader.read_boolean(cursor);
+                    let (val, next) = reader.read_boolean(cursor)?;
                     (Datum::Bool(val), next)
                 }
                 DataType::TinyInt(_) => {
-                    let (val, next) = reader.read_byte(cursor);
+                    let (val, next) = reader.read_byte(cursor)?;
                     (Datum::Int8(val as i8), next)
                 }
                 DataType::SmallInt(_) => {
-                    let (val, next) = reader.read_short(cursor);
+                    let (val, next) = reader.read_short(cursor)?;
                     (Datum::Int16(val), next)
                 }
                 DataType::Int(_) => {
-                    let (val, next) = reader.read_int(cursor);
+                    let (val, next) = reader.read_int(cursor)?;
                     (Datum::Int32(val), next)
                 }
                 DataType::BigInt(_) => {
-                    let (val, next) = reader.read_long(cursor);
+                    let (val, next) = reader.read_long(cursor)?;
                     (Datum::Int64(val), next)
                 }
                 DataType::Float(_) => {
-                    let (val, next) = reader.read_float(cursor);
+                    let (val, next) = reader.read_float(cursor)?;
                     (Datum::Float32(val.into()), next)
                 }
                 DataType::Double(_) => {
-                    let (val, next) = reader.read_double(cursor);
+                    let (val, next) = reader.read_double(cursor)?;
                     (Datum::Float64(val.into()), next)
                 }
                 // TODO: use read_char(length) in the future, but need to keep 
compatibility
                 DataType::Char(_) | DataType::String(_) => {
-                    let (val, next) = reader.read_string(cursor);
+                    let (val, next) = reader.read_string(cursor)?;
                     (Datum::String(val.into()), next)
                 }
                 // TODO: use read_binary(length) in the future, but need to 
keep compatibility
                 DataType::Bytes(_) | DataType::Binary(_) => {
-                    let (val, next) = reader.read_bytes(cursor);
+                    let (val, next) = reader.read_bytes(cursor)?;
                     (Datum::Blob(val.into()), next)
                 }
                 DataType::Decimal(decimal_type) => {
@@ -102,42 +103,57 @@ impl<'a> CompactedRowDeserializer<'a> {
                     let scale = decimal_type.scale();
                     if Decimal::is_compact_precision(precision) {
                         // Compact: stored as i64
-                        let (val, next) = reader.read_long(cursor);
-                        let decimal = Decimal::from_unscaled_long(val, 
precision, scale)
-                            .expect("Failed to create decimal from unscaled 
long");
+                        let (val, next) = reader.read_long(cursor)?;
+                        let decimal =
+                            Decimal::from_unscaled_long(val, precision, 
scale).map_err(|e| {
+                                IllegalArgument {
+                                    message: format!(
+                                        "Failed to create decimal from 
unscaled long: {e}"
+                                    ),
+                                }
+                            })?;
                         (Datum::Decimal(decimal), next)
                     } else {
                         // Non-compact: stored as minimal big-endian bytes
-                        let (bytes, next) = reader.read_bytes(cursor);
+                        let (bytes, next) = reader.read_bytes(cursor)?;
                         let decimal = Decimal::from_unscaled_bytes(bytes, 
precision, scale)
-                            .expect("Failed to create decimal from unscaled 
bytes");
+                            .map_err(|e| IllegalArgument {
+                                message: format!(
+                                    "Failed to create decimal from unscaled 
bytes: {e}"
+                                ),
+                            })?;
                         (Datum::Decimal(decimal), next)
                     }
                 }
                 DataType::Date(_) => {
-                    let (val, next) = reader.read_int(cursor);
+                    let (val, next) = reader.read_int(cursor)?;
                     (Datum::Date(crate::row::datum::Date::new(val)), next)
                 }
                 DataType::Time(_) => {
-                    let (val, next) = reader.read_int(cursor);
+                    let (val, next) = reader.read_int(cursor)?;
                     (Datum::Time(crate::row::datum::Time::new(val)), next)
                 }
                 DataType::Timestamp(timestamp_type) => {
                     let precision = timestamp_type.precision();
                     if crate::row::datum::TimestampNtz::is_compact(precision) {
                         // Compact: only milliseconds
-                        let (millis, next) = reader.read_long(cursor);
+                        let (millis, next) = reader.read_long(cursor)?;
                         (
                             
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(millis)),
                             next,
                         )
                     } else {
                         // Non-compact: milliseconds + nanos
-                        let (millis, mid) = reader.read_long(cursor);
-                        let (nanos, next) = reader.read_int(mid);
-                        let timestamp =
-                            
crate::row::datum::TimestampNtz::from_millis_nanos(millis, nanos)
-                                .expect("Invalid nano_of_millisecond value in 
compacted row");
+                        let (millis, mid) = reader.read_long(cursor)?;
+                        let (nanos, next) = reader.read_int(mid)?;
+                        let timestamp = 
crate::row::datum::TimestampNtz::from_millis_nanos(
+                            millis, nanos,
+                        )
+                        .map_err(|e| IllegalArgument {
+                            message: format!(
+                                "Invalid nano_of_millisecond value in 
compacted row timestamp: {e}"
+                            ),
+                        })?;
                         (Datum::TimestampNtz(timestamp), next)
                     }
                 }
@@ -145,29 +161,44 @@ impl<'a> CompactedRowDeserializer<'a> {
                     let precision = timestamp_ltz_type.precision();
                     if crate::row::datum::TimestampLtz::is_compact(precision) {
                         // Compact: only epoch milliseconds
-                        let (epoch_millis, next) = reader.read_long(cursor);
+                        let (epoch_millis, next) = reader.read_long(cursor)?;
                         (
                             
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(epoch_millis)),
                             next,
                         )
                     } else {
                         // Non-compact: epoch milliseconds + nanos
-                        let (epoch_millis, mid) = reader.read_long(cursor);
-                        let (nanos, next) = reader.read_int(mid);
-                        let timestamp_ltz =
-                            
crate::row::datum::TimestampLtz::from_millis_nanos(epoch_millis, nanos)
-                                .expect("Invalid nano_of_millisecond value in 
compacted row");
+                        let (epoch_millis, mid) = reader.read_long(cursor)?;
+                        let (nanos, next) = reader.read_int(mid)?;
+                        let timestamp_ltz = 
crate::row::datum::TimestampLtz::from_millis_nanos(
+                            epoch_millis,
+                            nanos,
+                        )
+                        .map_err(|e| IllegalArgument {
+                            message: format!(
+                                "Invalid nano_of_millisecond value in 
compacted row timestamp_ltz: {e}"
+                            ),
+                        })?;
                         (Datum::TimestampLtz(timestamp_ltz), next)
                     }
                 }
+                DataType::Array(_) => {
+                    let (bytes, next) = reader.read_bytes(cursor)?;
+                    let array = 
crate::row::binary_array::FlussArray::from_bytes(bytes)?;
+                    (Datum::Array(array), next)
+                }
                 _ => {
-                    panic!("Unsupported DataType in CompactedRowDeserializer: 
{dtype:?}");
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Unsupported DataType in CompactedRowDeserializer: 
{dtype:?}"
+                        ),
+                    });
                 }
             };
             cursor = next_cursor;
             row.set_field(col_pos, datum);
         }
-        row
+        Ok(row)
     }
 }
 
@@ -202,6 +233,21 @@ impl<'a> CompactedRowReader<'a> {
         self.offset + self.header_size_in_bytes
     }
 
+    fn checked_pos(&self, pos: usize, width: usize, context: &str) -> 
Result<usize> {
+        let next = pos.checked_add(width).ok_or_else(|| IllegalArgument {
+            message: format!("Overflow while reading {context}: pos={pos}, 
width={width}"),
+        })?;
+        if next > self.limit {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Out-of-bounds while reading {context}: pos={pos}, 
width={width}, limit={}",
+                    self.limit
+                ),
+            });
+        }
+        Ok(next)
+    }
+
     pub fn is_null_at(&self, col_pos: usize) -> bool {
         let byte_index = col_pos >> 3;
         let bit = col_pos & 7;
@@ -210,79 +256,73 @@ impl<'a> CompactedRowReader<'a> {
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
-        let (val, next) = self.read_byte(pos);
-        (val != 0, next)
+    pub fn read_boolean(&self, pos: usize) -> Result<(bool, usize)> {
+        let (val, next) = self.read_byte(pos)?;
+        Ok((val != 0, next))
     }
 
-    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
-        debug_assert!(pos < self.limit);
-        (self.segment[pos], pos + 1)
+    pub fn read_byte(&self, pos: usize) -> Result<(u8, usize)> {
+        let next = self.checked_pos(pos, 1, "byte")?;
+        Ok((self.segment[pos], next))
     }
 
-    pub fn read_short(&self, pos: usize) -> (i16, usize) {
-        let next_pos = pos + 2;
-        debug_assert!(next_pos <= self.limit);
-        let bytes_slice = &self.segment[pos..pos + 2];
-        let val = i16::from_ne_bytes(
-            bytes_slice
-                .try_into()
-                .expect("Slice must be exactly 2 bytes long"),
-        );
-        (val, next_pos)
+    pub fn read_short(&self, pos: usize) -> Result<(i16, usize)> {
+        let next_pos = self.checked_pos(pos, 2, "short")?;
+        let mut arr = [0u8; 2];
+        arr.copy_from_slice(&self.segment[pos..next_pos]);
+        Ok((i16::from_ne_bytes(arr), next_pos))
     }
 
-    pub fn read_int(&self, pos: usize) -> (i32, usize) {
+    pub fn read_int(&self, pos: usize) -> Result<(i32, usize)> {
         match read_unsigned_varint_at(self.segment, pos, 
CompactedRowWriter::MAX_INT_SIZE) {
-            Ok((value, next_pos)) => (value as i32, next_pos),
-            Err(_) => panic!("Invalid VarInt32 input stream."),
+            Ok((value, next_pos)) => Ok((value as i32, next_pos)),
+            Err(e) => Err(IllegalArgument {
+                message: format!("Invalid VarInt32 input stream at pos {pos}: 
{e}"),
+            }),
         }
     }
 
-    pub fn read_long(&self, pos: usize) -> (i64, usize) {
+    pub fn read_long(&self, pos: usize) -> Result<(i64, usize)> {
         match read_unsigned_varint_u64_at(self.segment, pos, 
CompactedRowWriter::MAX_LONG_SIZE) {
-            Ok((value, next_pos)) => (value as i64, next_pos),
-            Err(_) => panic!("Invalid VarInt64 input stream."),
+            Ok((value, next_pos)) => Ok((value as i64, next_pos)),
+            Err(e) => Err(IllegalArgument {
+                message: format!("Invalid VarInt64 input stream at pos {pos}: 
{e}"),
+            }),
         }
     }
 
-    pub fn read_float(&self, pos: usize) -> (f32, usize) {
-        let next_pos = pos + 4;
-        debug_assert!(next_pos <= self.limit);
-        let val = f32::from_ne_bytes(
-            self.segment[pos..pos + 4]
-                .try_into()
-                .expect("Slice must be exactly 4 bytes long"),
-        );
-        (val, next_pos)
+    pub fn read_float(&self, pos: usize) -> Result<(f32, usize)> {
+        let next_pos = self.checked_pos(pos, 4, "float")?;
+        let mut arr = [0u8; 4];
+        arr.copy_from_slice(&self.segment[pos..next_pos]);
+        Ok((f32::from_ne_bytes(arr), next_pos))
     }
 
-    pub fn read_double(&self, pos: usize) -> (f64, usize) {
-        let next_pos = pos + 8;
-        debug_assert!(next_pos <= self.limit);
-        let val = f64::from_ne_bytes(
-            self.segment[pos..pos + 8]
-                .try_into()
-                .expect("Slice must be exactly 8 bytes long"),
-        );
-        (val, next_pos)
+    pub fn read_double(&self, pos: usize) -> Result<(f64, usize)> {
+        let next_pos = self.checked_pos(pos, 8, "double")?;
+        let mut arr = [0u8; 8];
+        arr.copy_from_slice(&self.segment[pos..next_pos]);
+        Ok((f64::from_ne_bytes(arr), next_pos))
     }
 
-    pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
+    pub fn read_binary(&self, pos: usize) -> Result<(&'a [u8], usize)> {
         self.read_bytes(pos)
     }
 
-    pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
-        let (len, data_pos) = self.read_int(pos);
-        let len = len as usize;
-        let next_pos = data_pos + len;
-        debug_assert!(next_pos <= self.limit);
-        (&self.segment[data_pos..next_pos], next_pos)
+    pub fn read_bytes(&self, pos: usize) -> Result<(&'a [u8], usize)> {
+        let (len, data_pos) = self.read_int(pos)?;
+        let len = usize::try_from(len).map_err(|_| IllegalArgument {
+            message: format!("Negative length while reading bytes at pos 
{pos}: {len}"),
+        })?;
+        let next_pos = self.checked_pos(data_pos, len, "bytes payload")?;
+        Ok((&self.segment[data_pos..next_pos], next_pos))
     }
 
-    pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
-        let (bytes, next_pos) = self.read_bytes(pos);
-        let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
-        (s, next_pos)
+    pub fn read_string(&self, pos: usize) -> Result<(&'a str, usize)> {
+        let (bytes, next_pos) = self.read_bytes(pos)?;
+        let s = from_utf8(bytes).map_err(|e| IllegalArgument {
+            message: format!("Invalid UTF-8 when reading string at pos {pos}: 
{e}"),
+        })?;
+        Ok((s, next_pos))
     }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index ac0100e..3627174 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -165,6 +165,10 @@ impl BinaryWriter for CompactedRowWriter {
         self.write_bytes(&bytes[..length.min(bytes.len())])
     }
 
+    fn write_array(&mut self, value: &[u8]) {
+        self.write_bytes(value)
+    }
+
     fn complete(&mut self) {
         // do nothing
     }
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 9b2e80a..2f1d183 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -18,12 +18,14 @@
 use crate::error::Error::RowConvertError;
 use crate::error::Result;
 use crate::row::Decimal;
+use crate::row::binary_array::FlussArray;
 use arrow::array::{
     ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
     FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
-    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
-    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
-    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
+    Int32Builder, Int64Builder, ListBuilder, StringBuilder, 
Time32MillisecondBuilder,
+    Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, 
TimestampNanosecondBuilder,
+    TimestampSecondBuilder,
 };
 use arrow::datatypes as arrow_schema;
 use arrow::error::ArrowError;
@@ -68,6 +70,8 @@ pub enum Datum<'a> {
     TimestampNtz(TimestampNtz),
     #[display("{0}")]
     TimestampLtz(TimestampLtz),
+    #[display("{0}")]
+    Array(FlussArray),
 }
 
 impl Datum<'_> {
@@ -123,6 +127,13 @@ impl Datum<'_> {
             _ => panic!("not a timestamp ltz: {self:?}"),
         }
     }
+
+    pub fn as_array(&self) -> &FlussArray {
+        match self {
+            Self::Array(a) => a,
+            _ => panic!("not an array: {self:?}"),
+        }
+    }
 }
 
 // ----------- implement from
@@ -388,6 +399,13 @@ impl<'a> From<TimestampLtz> for Datum<'a> {
     }
 }
 
+impl<'a> From<FlussArray> for Datum<'a> {
+    #[inline]
+    fn from(arr: FlussArray) -> Datum<'a> {
+        Datum::Array(arr)
+    }
+}
+
 pub trait ToArrow {
     fn append_to(
         &self,
@@ -494,6 +512,89 @@ impl AppendResult for std::result::Result<(), ArrowError> {
     }
 }
 
+fn append_fluss_array_to_list_builder(
+    arr: &FlussArray,
+    builder: &mut dyn ArrayBuilder,
+    data_type: &arrow_schema::DataType,
+) -> Result<()> {
+    use crate::record::from_arrow_type;
+
+    let list_builder = builder
+        .as_any_mut()
+        .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
+        .ok_or_else(|| RowConvertError {
+            message: "Builder type mismatch for Array: expected 
ListBuilder".to_string(),
+        })?;
+
+    let element_arrow_type = match data_type {
+        arrow_schema::DataType::List(field) => field.data_type().clone(),
+        _ => {
+            return Err(RowConvertError {
+                message: format!("Expected List Arrow type for Array datum, 
got: {data_type:?}"),
+            });
+        }
+    };
+
+    let element_fluss_type = from_arrow_type(&element_arrow_type)?;
+    let values_builder = list_builder.values();
+
+    for i in 0..arr.size() {
+        if arr.is_null_at(i) {
+            // TODO: Datum::Null triggers a chain of downcast attempts in 
append_to.
+            // For sparse arrays with many nulls, call append_null directly on 
the
+            // typed inner builder to avoid the overhead.
+            let null_datum = Datum::Null;
+            null_datum.append_to(values_builder, &element_arrow_type)?;
+        } else {
+            let datum = read_datum_from_fluss_array(arr, i, 
&element_fluss_type)?;
+            datum.append_to(values_builder, &element_arrow_type)?;
+        }
+    }
+    list_builder.append(true);
+    Ok(())
+}
+
+fn read_datum_from_fluss_array<'a>(
+    arr: &FlussArray,
+    pos: usize,
+    element_type: &crate::metadata::DataType,
+) -> Result<Datum<'a>> {
+    use crate::metadata::DataType;
+
+    Ok(match element_type {
+        DataType::Boolean(_) => Datum::Bool(arr.get_boolean(pos)?),
+        DataType::TinyInt(_) => Datum::Int8(arr.get_byte(pos)?),
+        DataType::SmallInt(_) => Datum::Int16(arr.get_short(pos)?),
+        DataType::Int(_) => Datum::Int32(arr.get_int(pos)?),
+        DataType::BigInt(_) => Datum::Int64(arr.get_long(pos)?),
+        DataType::Float(_) => Datum::Float32(arr.get_float(pos)?.into()),
+        DataType::Double(_) => Datum::Float64(arr.get_double(pos)?.into()),
+        DataType::Char(_) | DataType::String(_) => {
+            Datum::String(Cow::Owned(arr.get_string(pos)?.to_string()))
+        }
+        DataType::Binary(_) | DataType::Bytes(_) => {
+            Datum::Blob(Cow::Owned(arr.get_binary(pos)?.to_vec()))
+        }
+        DataType::Decimal(dt) => {
+            Datum::Decimal(arr.get_decimal(pos, dt.precision(), dt.scale())?)
+        }
+        DataType::Date(_) => Datum::Date(arr.get_date(pos)?),
+        DataType::Time(_) => Datum::Time(arr.get_time(pos)?),
+        DataType::Timestamp(t) => 
Datum::TimestampNtz(arr.get_timestamp_ntz(pos, t.precision())?),
+        DataType::TimestampLTz(t) => {
+            Datum::TimestampLtz(arr.get_timestamp_ltz(pos, t.precision())?)
+        }
+        DataType::Array(_) => Datum::Array(arr.get_array(pos)?),
+        _ => {
+            return Err(RowConvertError {
+                message: format!(
+                    "Unsupported element type for FlussArray → Arrow 
conversion: {element_type:?}"
+                ),
+            });
+        }
+    })
+}
+
 impl Datum<'_> {
     pub fn append_to(
         &self,
@@ -540,6 +641,18 @@ impl Datum<'_> {
                 append_null_to_arrow!(TimestampMillisecondBuilder);
                 append_null_to_arrow!(TimestampMicrosecondBuilder);
                 append_null_to_arrow!(TimestampNanosecondBuilder);
+                if let arrow_schema::DataType::List(_) = data_type {
+                    let b = builder
+                        .as_any_mut()
+                        .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
+                        .ok_or_else(|| RowConvertError {
+                            message:
+                                "Expected ListBuilder<Box<dyn ArrayBuilder>> 
for List Arrow type"
+                                    .to_string(),
+                        })?;
+                    b.append_null();
+                    return Ok(());
+                }
             }
             Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
             Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
@@ -742,6 +855,9 @@ impl Datum<'_> {
                     message: "Builder type mismatch for 
TimestampLtz".to_string(),
                 });
             }
+            Datum::Array(arr) => {
+                return append_fluss_array_to_list_builder(arr, builder, 
data_type);
+            }
         }
 
         Err(RowConvertError {
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs 
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index d201450..877b3ec 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -64,8 +64,12 @@ impl CompactedKeyEncoder {
 
         for pos in &encode_field_pos {
             let data_type = row_type.fields().get(*pos).unwrap().data_type();
-            field_getters.push(FieldGetter::create(data_type, *pos));
-            
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
+            // Validate key type support first, so unsupported types return a
+            // typed error instead of panicking in FieldGetter::create.
+            let field_encoder = 
CompactedKeyWriter::create_value_writer(data_type)?;
+            let field_getter = FieldGetter::create(data_type, *pos);
+            field_getters.push(field_getter);
+            field_encoders.push(field_encoder);
         }
 
         Ok(CompactedKeyEncoder {
@@ -82,18 +86,19 @@ impl KeyEncoder for CompactedKeyEncoder {
         self.compacted_encoder.reset();
 
         // iterate all the fields of the row, and encode each field
-        for (pos, field_getter) in self.field_getters.iter().enumerate() {
+        for (pos, (field_getter, field_encoder)) in self
+            .field_getters
+            .iter()
+            .zip(self.field_encoders.iter())
+            .enumerate()
+        {
             match &field_getter.get_field(row)? {
                 Datum::Null => {
                     return Err(IllegalArgument {
                         message: format!("Cannot encode key with null value at 
position: {pos:?}"),
                     });
                 }
-                value => self.field_encoders.get(pos).unwrap().write_value(
-                    &mut self.compacted_encoder,
-                    pos,
-                    value,
-                )?,
+                value => field_encoder.write_value(&mut 
self.compacted_encoder, pos, value)?,
             }
         }
 
@@ -105,8 +110,55 @@ impl KeyEncoder for CompactedKeyEncoder {
 mod tests {
     use super::*;
     use crate::metadata::DataTypes;
+    use crate::row::binary_array::FlussArrayWriter;
     use crate::row::{Datum, GenericRow};
 
+    fn build_int_array(values: &[i32]) -> crate::row::FlussArray {
+        let mut w = FlussArrayWriter::new(values.len(), &DataTypes::int());
+        for (i, v) in values.iter().enumerate() {
+            w.write_int(i, *v);
+        }
+        w.complete().unwrap()
+    }
+
+    fn build_nullable_int_array(values: &[Option<i32>]) -> 
crate::row::FlussArray {
+        let mut w = FlussArrayWriter::new(values.len(), &DataTypes::int());
+        for (i, v) in values.iter().enumerate() {
+            match v {
+                Some(value) => w.write_int(i, *value),
+                None => w.set_null_at(i),
+            }
+        }
+        w.complete().unwrap()
+    }
+
+    fn build_float_array(values: &[f32]) -> crate::row::FlussArray {
+        let mut w = FlussArrayWriter::new(values.len(), 
&DataTypes::float().as_non_nullable());
+        for (i, v) in values.iter().enumerate() {
+            w.write_float(i, *v);
+        }
+        w.complete().unwrap()
+    }
+
+    fn build_nested_string_array() -> crate::row::FlussArray {
+        let mut inner_1 = FlussArrayWriter::new(3, &DataTypes::string());
+        inner_1.write_string(0, "a");
+        inner_1.set_null_at(1);
+        inner_1.write_string(2, "c");
+        let inner_1 = inner_1.complete().unwrap();
+
+        let mut inner_2 = FlussArrayWriter::new(2, &DataTypes::string());
+        inner_2.write_string(0, "hello");
+        inner_2.write_string(1, "world");
+        let inner_2 = inner_2.complete().unwrap();
+
+        let mut outer = FlussArrayWriter::new(3, 
&DataTypes::array(DataTypes::string()));
+        outer.write_array(0, &inner_1);
+        outer.set_null_at(1);
+        outer.write_array(2, &inner_2);
+        outer.complete().unwrap()
+    }
+
     pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder {
         CompactedKeyEncoder::new(row_type, 
(0..row_type.fields().len()).collect())
             .expect("CompactedKeyEncoder initialization failed")
@@ -237,6 +289,51 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_array_type_allowed_as_key() {
+        // Java's CompactedKeyEncoder allows Array as a key column type
+        // (the server rejects unsupported key types at table-creation time).
+        let row_type =
+            RowType::with_data_types(vec![DataTypes::int(), 
DataTypes::array(DataTypes::int())]);
+        let mut encoder = CompactedKeyEncoder::new(&row_type, vec![0, 
1]).unwrap();
+
+        let row_a = GenericRow::from_data(vec![
+            Datum::Int32(42),
+            Datum::Array(build_int_array(&[10, 20])),
+        ]);
+        let row_b = GenericRow::from_data(vec![
+            Datum::Int32(42),
+            Datum::Array(build_int_array(&[10, 30])),
+        ]);
+
+        let encoded_a = encoder.encode_key(&row_a).unwrap();
+        let encoded_b = encoder.encode_key(&row_b).unwrap();
+
+        assert!(!encoded_a.is_empty());
+        assert_ne!(
+            encoded_a.iter().as_slice(),
+            encoded_b.iter().as_slice(),
+            "Array key payload should affect compacted key encoding"
+        );
+    }
+
+    #[test]
+    fn test_map_type_rejected_as_key() {
+        let row_type = RowType::with_data_types(vec![
+            DataTypes::int(),
+            DataTypes::map(DataTypes::int(), DataTypes::string()),
+        ]);
+        match CompactedKeyEncoder::new(&row_type, vec![0, 1]) {
+            Ok(_) => panic!("Expected error when using Map as key type"),
+            Err(err) => {
+                assert!(
+                    err.to_string().contains("Cannot use"),
+                    "Expected 'Cannot use' error, got: {err}"
+                );
+            }
+        }
+    }
+
     #[test]
     fn test_all_data_types_java_compatible() {
         // Test encoding compatibility with Java using reference from:
@@ -263,9 +360,11 @@ mod tests {
             DataType::Timestamp(TimestampType::with_nullable(false, 
5).unwrap()), // TIMESTAMP(5)
             DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 
1).unwrap()), // TIMESTAMP_LTZ(1)
             DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 
5).unwrap()), // TIMESTAMP_LTZ(5)
-                                                                               
         // TODO: Add support for ARRAY type
-                                                                               
         // TODO: Add support for MAP type
-                                                                               
         // TODO: Add support for ROW type
+            DataTypes::array(DataTypes::int()), // ARRAY<INT>
+            DataTypes::array(DataTypes::float().as_non_nullable()), // 
ARRAY<FLOAT NOT NULL>
+            DataTypes::array(DataTypes::array(DataTypes::string())), // 
ARRAY<ARRAY<STRING>>
+                                                // TODO: Add support for MAP 
type
+                                                // TODO: Add support for ROW 
type
         ]);
 
         // Exact values from Java's IndexedRowTest.genRecordForAllTypes()
@@ -296,6 +395,26 @@ mod tests {
             
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), // 
TIMESTAMP(5)
             
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), // 
TIMESTAMP_LTZ(1)
             
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), // 
TIMESTAMP_LTZ(5)
+            Datum::Array(build_nullable_int_array(&[
+                Some(1),
+                Some(2),
+                Some(3),
+                Some(4),
+                Some(5),
+                Some(-11),
+                None,
+                Some(444),
+                Some(102234),
+            ])), // ARRAY<INT>: GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234)
+            Datum::Array(build_float_array(&[
+                0.1_f32,
+                1.1_f32,
+                -0.5_f32,
+                6.6_f32,
+                f32::MAX,
+                f32::from_bits(1),
+            ])), // ARRAY<FLOAT NOT NULL>: GenericArray.of(0.1f, 1.1f, -0.5f, 
6.6f, MAX, MIN)
+            Datum::Array(build_nested_string_array()), // ARRAY<ARRAY<STRING>>
         ]);
 
         // Expected bytes from Java's encoded_key.hex reference file
@@ -339,6 +458,25 @@ mod tests {
             0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
             // TIMESTAMP_LTZ(5): 1698235273182
             0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
+            // ARRAY<INT>: GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234)
+            0x30, 0x09, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x01, 0x00, 
0x00,
+            0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00, 
0x00,
+            0x00, 0x05, 0x00, 0x00, 0x00, 0xF5, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 
0x00,
+            0x00, 0xBC, 0x01, 0x00, 0x00, 0x5A, 0x8F, 0x01, 0x00, 0x00, 0x00, 
0x00,
+            0x00,
+            // ARRAY<FLOAT NOT NULL>: GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, 
MAX, MIN)
+            0x20, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xCD, 0xCC, 
0xCC,
+            0x3D, 0xCD, 0xCC, 0x8C, 0x3F, 0x00, 0x00, 0x00, 0xBF, 0x33, 0x33, 
0xD3,
+            0x40, 0xFF, 0xFF, 0x7F, 0x7F, 0x01, 0x00, 0x00, 0x00,
+            // ARRAY<ARRAY<STRING>>
+            0x58, 0x03, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x20, 0x00, 
0x00,
+            0x00, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00,
+            0x00, 0x18, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x03, 0x00, 
0x00,
+            0x00, 0x02, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00,
+            0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x00, 
0x00,
+            0x00, 0x00, 0x00, 0x00, 0x81, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00,
+            0x00, 0x68, 0x65, 0x6C, 0x6C, 0x6F, 0x00, 0x00, 0x85, 0x77, 0x6F, 
0x72,
+            0x6C, 0x64, 0x00, 0x00, 0x85,
         ];
 
         let mut encoder = for_test_row_type(&row_type);
diff --git a/crates/fluss/src/row/field_getter.rs 
b/crates/fluss/src/row/field_getter.rs
index d6b9fc9..69e0860 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -82,6 +82,8 @@ impl FieldGetter {
                 pos,
                 precision: t.precision(),
             },
+            // TODO: add Map and Row variants when get_map/get_row are 
available in InternalRow.
+            DataType::Array(_) => InnerFieldGetter::Array { pos },
             _ => unimplemented!("DataType {:?} is currently unimplemented", 
data_type),
         };
 
@@ -149,6 +151,9 @@ pub enum InnerFieldGetter {
         pos: usize,
         precision: u32,
     },
+    Array {
+        pos: usize,
+    },
 }
 
 impl InnerFieldGetter {
@@ -177,7 +182,9 @@ impl InnerFieldGetter {
             }
             InnerFieldGetter::TimestampLtz { pos, precision } => {
                 Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?)
-            } //TODO Array, Map, Row
+            }
+            // TODO: add Map and Row field getter support once their binary 
forms are implemented.
+            InnerFieldGetter::Array { pos } => 
Datum::Array(row.get_array(*pos)?),
         })
     }
 
@@ -198,7 +205,51 @@ impl InnerFieldGetter {
             | Self::Date { pos }
             | Self::Time { pos }
             | Self::Timestamp { pos, .. }
-            | Self::TimestampLtz { pos, .. } => *pos,
+            | Self::TimestampLtz { pos, .. }
+            | Self::Array { pos } => *pos,
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+    use crate::row::GenericRow;
+    use crate::row::binary_array::FlussArrayWriter;
+
+    #[test]
+    fn test_field_getter_array() {
+        let elem_type = DataTypes::int();
+        let mut arr_writer = FlussArrayWriter::new(2, &elem_type);
+        arr_writer.write_int(0, 10);
+        arr_writer.write_int(1, 20);
+        let arr = arr_writer.complete().unwrap();
+
+        let mut row = GenericRow::new(2);
+        row.set_field(0, Datum::Int32(42));
+        row.set_field(1, Datum::Array(arr.clone()));
+
+        let getter = FieldGetter::create(&DataTypes::array(DataTypes::int()), 
1);
+        let datum = getter.get_field(&row).unwrap();
+
+        match datum {
+            Datum::Array(a) => {
+                assert_eq!(a.size(), 2);
+                assert_eq!(a.get_int(0).unwrap(), 10);
+                assert_eq!(a.get_int(1).unwrap(), 20);
+            }
+            _ => panic!("Expected Array datum"),
+        }
+    }
+
+    #[test]
+    fn test_field_getter_nullable_array() {
+        let row = GenericRow::from_data(vec![Datum::Null]);
+
+        let data_type = DataTypes::array(DataTypes::int());
+        let getter = FieldGetter::create(&data_type, 0);
+        let datum = getter.get_field(&row).unwrap();
+        assert!(datum.is_null());
+    }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index ef99ba2..359a9a5 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod binary_array;
 mod column;
 
 pub(crate) mod datum;
@@ -28,6 +29,7 @@ pub mod field_getter;
 mod row_decoder;
 
 use crate::client::WriteFormat;
+pub use binary_array::FlussArray;
 use bytes::Bytes;
 pub use column::*;
 pub use compacted::CompactedRow;
@@ -119,6 +121,9 @@ pub trait InternalRow: Send + Sync {
     /// Returns the binary value at the given position
     fn get_bytes(&self, pos: usize) -> Result<&[u8]>;
 
+    /// Returns the array value at the given position
+    fn get_array(&self, pos: usize) -> Result<FlussArray>;
+
     /// Returns encoded bytes if already encoded
     fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
         None
@@ -274,6 +279,15 @@ impl<'a> InternalRow for GenericRow<'a> {
             }),
         }
     }
+
+    fn get_array(&self, pos: usize) -> Result<FlussArray> {
+        match self.get_value(pos)? {
+            Datum::Array(a) => Ok(a.clone()),
+            other => Err(IllegalArgument {
+                message: format!("type mismatch at position {pos}: expected 
Array, got {other:?}"),
+            }),
+        }
+    }
 }
 
 impl<'a> GenericRow<'a> {
diff --git a/website/docs/user-guide/rust/api-reference.md 
b/website/docs/user-guide/rust/api-reference.md
index a4befa5..15a62c1 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -407,6 +407,19 @@ Implements the `InternalRow` trait (see below).
 | `fn get_bytes(&self, idx: usize) -> Result<&[u8]>`                           
          | Get bytes value                         |
 | `fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]>`           
          | Get fixed-length binary value           |
 | `fn get_char(&self, idx: usize, length: usize) -> Result<&str>`              
          | Get fixed-length char value             |
+| `fn get_array(&self, idx: usize) -> Result<FlussArray>`                      
          | Get array value                         |
+
+## `FlussArray`
+
+`FlussArray` is the Rust row representation for `ARRAY` values. You usually 
obtain it from `InternalRow::get_array()`.
+
+| Method | Description |
+|--------|-------------|
+| `fn size(&self) -> usize` | Number of elements in the array |
+| `fn is_null_at(&self, pos: usize) -> bool` | Check whether an element is 
null |
+| `fn as_bytes(&self) -> &[u8]` | Get encoded bytes of the array |
+
+Element getters mirror `InternalRow` typed getters and return `Result<T>`. For 
example, use `get_int()`, `get_long()`, and `get_double()` for primitive 
elements, and `get_string()`, `get_binary()`, `get_decimal()`, 
`get_timestamp_ntz()`, `get_timestamp_ltz()`, and `get_array()` for 
variable-length or nested elements.
 
 ## `ChangeType`
 
diff --git a/website/docs/user-guide/rust/data-types.md 
b/website/docs/user-guide/rust/data-types.md
index 143fe34..63b7fa6 100644
--- a/website/docs/user-guide/rust/data-types.md
+++ b/website/docs/user-guide/rust/data-types.md
@@ -21,6 +21,7 @@ sidebar_position: 3
 | `TIMESTAMP_LTZ` | `TimestampLtz` | `get_timestamp_ltz(idx, precision)`  | 
`set_field(idx, TimestampLtz)` |
 | `BYTES`         | `&[u8]`        | `get_bytes()`                        | 
`set_field(idx, &[u8])`        |
 | `BINARY(n)`     | `&[u8]`        | `get_binary(idx, length)`            | 
`set_field(idx, &[u8])`        |
+| `ARRAY<T>`      | `FlussArray`   | `get_array()`                        | 
`set_field(idx, FlussArray)`   |
 
 ## Constructing Special Types
 
@@ -59,6 +60,29 @@ let data: Vec<Datum> = vec![1i32.into(), "hello".into(), 
Datum::Null];
 let row = GenericRow::from_data(data);
 ```
 
+## Arrays
+
+Use `DataTypes::array(element_type)` in schema definitions. At runtime, read 
arrays with `row.get_array(idx)?`.
+
+To construct array values for writes, build a `FlussArray` and wrap it with 
`Datum::Array`:
+
+```rust
+use fluss::metadata::DataTypes;
+use fluss::row::binary_array::FlussArrayWriter;
+use fluss::row::{Datum, GenericRow};
+
+let mut writer = FlussArrayWriter::new(3, &DataTypes::int());
+writer.write_int(0, 10);
+writer.write_int(1, 20);
+writer.set_null_at(2);
+let arr = writer.complete()?;
+
+let mut row = GenericRow::new(1);
+row.set_field(0, Datum::Array(arr));
+```
+
+`ARRAY` is supported for row values and nested row fields. For key encoding, 
Rust follows Java parity: `ARRAY` can be encoded by the compacted key encoder, 
while table-level key constraints are validated by the server (which may reject 
unsupported key types).
+
 ## Reading Row Data
 
 ```rust

Reply via email to