This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f8cadad4 [fluss-client] Support Complex Data Types on the Java Typed 
API (Map and Array) (#2814)
2f8cadad4 is described below

commit 2f8cadad425a95533be79a8d7f36356993ef7b9f
Author: nhuantho <[email protected]>
AuthorDate: Wed Mar 11 19:58:39 2026 +0700

    [fluss-client] Support Complex Data Types on the Java Typed API (Map and 
Array) (#2814)
    
    * [fluss-client] Add pojo array to fluss array
    
    * [fluss-client] Add pojo map to fluss map
    
    * [fluss-client] Create shared utilities for POJO type and Fluss type
    
    * [fluss-client] Create shared utilities for Fluss type to Pojo type
    
    * [fluss-client] Add Fluss array to Pojo array
    
    * [fluss-client] Add Fluss map to Pojo map
    
    * [ci] Fix Compile Java 8
    
    * Revert _partial_config.mdx
    
    * [fluss-client] Hanle primitive arrays
    
    * [fluss-client] Refactor PojoMapToFlussMap to follow the same conversion 
logic as PojoArrayToFlussArray
    
    * [fluss-client] Change validateCompatibility logic for MAP and ARRAY types
    
    * small fixes and add tests to FlussTypedClient
    
    * fix checkstyle
    
    * fix checkstyle
    
    ---------
    
    Co-authored-by: ipolyzos <[email protected]>
---
 .../fluss/client/converter/ConverterCommons.java   |  32 +-
 .../client/converter/FlussArrayToPojoArray.java    | 184 +++++++++
 .../fluss/client/converter/FlussMapToPojoMap.java  |  69 ++++
 .../converter/FlussTypeToPojoTypeConverter.java    | 122 ++++++
 .../client/converter/PojoArrayToFlussArray.java    |  99 +++++
 .../fluss/client/converter/PojoMapToFlussMap.java  |  58 +++
 .../fluss/client/converter/PojoToRowConverter.java | 213 ++--------
 ...rter.java => PojoTypeToFlussTypeConverter.java} | 236 +++++------
 .../fluss/client/converter/RowToPojoConverter.java | 179 +++------
 .../client/converter/ConvertersTestFixtures.java   |  29 +-
 .../converter/FlussArrayToPojoArrayTest.java       | 431 ++++++++++++++++++++
 .../converter/PojoArrayToFlussArrayTest.java       | 437 +++++++++++++++++++++
 .../client/converter/PojoToRowConverterTest.java   |  67 +++-
 .../client/converter/RowToPojoConverterTest.java   | 107 ++++-
 .../fluss/client/table/FlussTypedClientITCase.java | 337 ++++++++++++++++
 15 files changed, 2124 insertions(+), 476 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
index aeb36419a..dbc883f36 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -24,14 +24,13 @@ import org.apache.fluss.types.RowType;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.fluss.client.converter.RowToPojoConverter.charLengthExceptionMessage;
-
 /**
  * Internal shared utilities for POJO and Fluss InternalRow conversions.
  *
@@ -112,8 +111,29 @@ final class ConverterCommons {
     }
 
     static void validateCompatibility(DataType fieldType, PojoType.Property 
prop) {
-        Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
+        DataTypeRoot typeRoot = fieldType.getTypeRoot();
         Class<?> actual = prop.type;
+        if (typeRoot == DataTypeRoot.ARRAY) {
+            if (!actual.isArray() && 
!Collection.class.isAssignableFrom(actual)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Field '%s' must be an array or Collection for 
ARRAY type, got %s",
+                                prop.name, actual.getName()));
+            }
+            return;
+        }
+
+        if (typeRoot == DataTypeRoot.MAP) {
+            if (!Map.class.isAssignableFrom(actual)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Field '%s' must be a Map for MAP type, got 
%s",
+                                prop.name, actual.getName()));
+            }
+            return;
+        }
+
+        Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
         if (supported == null) {
             throw new UnsupportedOperationException(
                     String.format(
@@ -128,6 +148,12 @@ final class ConverterCommons {
         }
     }
 
+    public static String charLengthExceptionMessage(String fieldName, int 
length) {
+        return String.format(
+                "Field %s expects exactly one character for CHAR type, got 
length %d.",
+                fieldName, length);
+    }
+
     static BinaryString toBinaryStringForText(Object v, String fieldName, 
DataTypeRoot root) {
         final String s = String.valueOf(v);
         if (root == DataTypeRoot.CHAR && s.length() != 1) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
new file mode 100644
index 000000000..cc9efd31f
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
+
+import java.lang.reflect.Array;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Adapter class for converting Fluss InternalArray to Pojo Array. */
+public class FlussArrayToPojoArray {
+    private final InternalArray flussArray;
+    private final String fieldName;
+    private final Class<?> pojoType;
+    /** Pre-compiled per-element converter; avoids repeating the type-switch 
on every element. */
+    private final ElementConverter elementConverter;
+
+    public FlussArrayToPojoArray(
+            InternalArray flussArray, DataType elementType, String fieldName, 
Class<?> pojoType) {
+        this.flussArray = flussArray;
+        this.fieldName = fieldName;
+        this.pojoType = pojoType != null ? pojoType : Object.class;
+        this.elementConverter = buildElementConverter(elementType, fieldName, 
this.pojoType);
+    }
+
+    public Object convertArray() {
+        if (flussArray == null) {
+            return null;
+        }
+
+        int size = flussArray.size();
+        Object result = Array.newInstance(pojoType, size);
+        for (int i = 0; i < size; i++) {
+            Object element = convertElementValue(i);
+            if (element == null && pojoType.isPrimitive()) {
+                throw new NullPointerException(
+                        String.format(
+                                "Field '%s': cannot store null into a 
primitive array of type %s[]."
+                                        + " Use a boxed type (e.g. Integer[]) 
or ensure all elements are non-null.",
+                                fieldName, pojoType.getName()));
+            }
+            Array.set(result, i, element);
+        }
+        return result;
+    }
+
+    /** Converts the Fluss array to a {@link List}, preserving null elements. 
*/
+    List<Object> convertList() {
+        if (flussArray == null) {
+            return null;
+        }
+        int size = flussArray.size();
+        List<Object> result = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            result.add(convertElementValue(i));
+        }
+        return result;
+    }
+
+    Object convertElementValue(int index) {
+        if (flussArray.isNullAt(index)) {
+            return null;
+        }
+        return elementConverter.convert(flussArray, index);
+    }
+
+    /**
+     * Resolves the element type once and returns a pre-compiled converter. 
This avoids repeating
+     * the type dispatch (getTypeRoot() switch + casts) on every array element.
+     */
+    private static ElementConverter buildElementConverter(
+            DataType elementType, String fieldName, Class<?> pojoType) {
+        switch (elementType.getTypeRoot()) {
+            case BOOLEAN:
+                return InternalArray::getBoolean;
+            case TINYINT:
+                return InternalArray::getByte;
+            case SMALLINT:
+                return InternalArray::getShort;
+            case INTEGER:
+                return InternalArray::getInt;
+            case BIGINT:
+                return InternalArray::getLong;
+            case FLOAT:
+                return InternalArray::getFloat;
+            case DOUBLE:
+                return InternalArray::getDouble;
+            case CHAR:
+            case STRING:
+                // Default to String when the POJO element type is unspecified 
(Object[])
+                final Class<?> textTarget = (pojoType == Object.class) ? 
String.class : pojoType;
+                return (arr, i) ->
+                        FlussTypeToPojoTypeConverter.convertTextValue(
+                                elementType, fieldName, textTarget, 
arr.getString(i));
+            case BINARY:
+            case BYTES:
+                return InternalArray::getBytes;
+            case DECIMAL:
+                {
+                    final DecimalType decimalType = (DecimalType) elementType;
+                    final int precision = decimalType.getPrecision();
+                    final int scale = decimalType.getScale();
+                    return (arr, i) -> {
+                        Decimal d = arr.getDecimal(i, precision, scale);
+                        return 
FlussTypeToPojoTypeConverter.convertDecimalValue(d);
+                    };
+                }
+            case DATE:
+                return (arr, i) -> 
FlussTypeToPojoTypeConverter.convertDateValue(arr.getInt(i));
+            case TIME_WITHOUT_TIME_ZONE:
+                return (arr, i) -> 
FlussTypeToPojoTypeConverter.convertTimeValue(arr.getInt(i));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(elementType);
+                    return (arr, i) ->
+                            
FlussTypeToPojoTypeConverter.convertTimestampNtzValue(
+                                    arr.getTimestampNtz(i, precision));
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(elementType);
+                    // Default to Instant when the POJO element type is 
unspecified (Object[])
+                    final Class<?> tsTarget = (pojoType == Object.class) ? 
Instant.class : pojoType;
+                    return (arr, i) ->
+                            
FlussTypeToPojoTypeConverter.convertTimestampLtzValue(
+                                    arr.getTimestampLtz(i, precision), 
fieldName, tsTarget);
+                }
+            case ARRAY:
+                {
+                    final ArrayType nestedArrayType = (ArrayType) elementType;
+                    final Class<?> componentType = pojoType.getComponentType();
+                    return (arr, i) -> {
+                        InternalArray innerArray = arr.getArray(i);
+                        return innerArray == null
+                                ? null
+                                : new FlussArrayToPojoArray(
+                                                innerArray,
+                                                
nestedArrayType.getElementType(),
+                                                fieldName,
+                                                componentType)
+                                        .convertArray();
+                    };
+                }
+            case MAP:
+                return (arr, i) ->
+                        new FlussMapToPojoMap(arr.getMap(i), (MapType) 
elementType, fieldName)
+                                .convertMap();
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported field type %s for field %s.",
+                                elementType.getTypeRoot(), fieldName));
+        }
+    }
+
+    @FunctionalInterface
+    private interface ElementConverter {
+        Object convert(InternalArray array, int index);
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
new file mode 100644
index 000000000..d1a7474aa
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.types.MapType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Adapter class for converting Fluss InternalMap to Pojo map. */
+public class FlussMapToPojoMap {
+    private final InternalMap flussMap;
+    private final MapType mapType;
+    private final String fieldName;
+
+    public FlussMapToPojoMap(InternalMap flussMap, MapType mapType, String 
fieldName) {
+        this.flussMap = flussMap;
+        this.mapType = mapType;
+        this.fieldName = fieldName;
+    }
+
+    public Object convertMap() {
+        if (flussMap == null) {
+            return null;
+        }
+
+        List<Object> keys =
+                new FlussArrayToPojoArray(
+                                flussMap.keyArray(), mapType.getKeyType(), 
fieldName, Object.class)
+                        .convertList();
+
+        if (keys == null || keys.isEmpty()) {
+            return new HashMap<>();
+        }
+
+        List<Object> values =
+                new FlussArrayToPojoArray(
+                                flussMap.valueArray(),
+                                mapType.getValueType(),
+                                fieldName,
+                                Object.class)
+                        .convertList();
+
+        // Build the result map
+        Map<Object, Object> result = new HashMap<>(keys.size() * 2);
+        for (int i = 0; i < keys.size(); i++) {
+            result.put(keys.get(i), values == null ? null : values.get(i));
+        }
+        return result;
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
new file mode 100644
index 000000000..2b693f4e3
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+
+/** Shared utilities for Fluss type and Pojo type. */
+public class FlussTypeToPojoTypeConverter {
+    /**
+     * Converts a text value (CHAR/STRING) read from an InternalRow into the 
target Java type
+     * declared by the POJO property.
+     *
+     * <p>Supported target types are String and Character. For CHAR columns, 
this enforces that the
+     * value has exactly one character. For Character targets, empty strings 
are rejected.
+     *
+     * @param fieldType Fluss column DataType (must be CHAR or STRING)
+     * @param fieldName The field name
+     * @param pojoType The pojo type
+     * @param s The BinaryString read from the row
+     * @return Converted Java value (String or Character)
+     * @throws IllegalArgumentException if the target type is unsupported or 
constraints are
+     *     violated
+     */
+    static Object convertTextValue(
+            DataType fieldType, String fieldName, Class<?> pojoType, 
BinaryString s) {
+        if (s == null) {
+            return null;
+        }
+
+        String v = s.toString();
+        if (pojoType == String.class) {
+            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
+                throw new IllegalArgumentException(
+                        ConverterCommons.charLengthExceptionMessage(fieldName, 
v.length()));
+            }
+            return v;
+        } else if (pojoType == Character.class) {
+            if (v.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Field %s expects Character, but the string 
value is empty.",
+                                fieldName));
+            }
+            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
+                throw new IllegalArgumentException(
+                        ConverterCommons.charLengthExceptionMessage(fieldName, 
v.length()));
+            }
+            return v.charAt(0);
+        }
+        throw new IllegalArgumentException(
+                String.format(
+                        "Field %s is not a String or Character. Cannot convert 
from string.",
+                        fieldName));
+    }
+
+    /**
+     * Converts a DECIMAL value from an InternalRow into a BigDecimal using 
the column's precision
+     * and scale. The row position is assumed non-null (caller checks), so 
this never returns null.
+     */
+    static BigDecimal convertDecimalValue(Decimal d) {
+        return d.toBigDecimal();
+    }
+
+    /** Converts a DATE value stored as int days since epoch to a LocalDate. */
+    static LocalDate convertDateValue(int daysSinceEpoch) {
+        return LocalDate.ofEpochDay(daysSinceEpoch);
+    }
+
+    /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to 
a LocalTime. */
+    static LocalTime convertTimeValue(int millisOfDay) {
+        return LocalTime.ofNanoOfDay(millisOfDay * 1_000_000L);
+    }
+
+    /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime 
honoring precision. */
+    static Object convertTimestampNtzValue(TimestampNtz t) {
+        return t.toLocalDateTime();
+    }
+
+    /**
+     * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or 
OffsetDateTime in UTC,
+     * depending on the target POJO property type.
+     */
+    static Object convertTimestampLtzValue(TimestampLtz t, String fieldName, 
Class<?> pojoType) {
+        if (pojoType == Instant.class) {
+            return t.toInstant();
+        } else if (pojoType == OffsetDateTime.class) {
+            return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
+        }
+        throw new IllegalArgumentException(
+                String.format(
+                        "Field %s is not an Instant or OffsetDateTime. Cannot 
convert from TimestampData.",
+                        fieldName));
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
new file mode 100644
index 000000000..2b27ddccf
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+
+import java.util.Collection;
+
+import static 
org.apache.fluss.client.converter.PojoTypeToFlussTypeConverter.convertElementValue;
+
+/** Adapter class for converting Pojo Array to Fluss InternalArray. */
+public class PojoArrayToFlussArray {
+    private final Object obj;
+    private final DataType fieldType;
+    private final String fieldName;
+
+    public PojoArrayToFlussArray(Object obj, DataType fieldType, String 
fieldName) {
+        this.obj = obj;
+        this.fieldType = fieldType;
+        this.fieldName = fieldName;
+    }
+
+    public GenericArray convertArray() {
+        if (obj == null) {
+            return null;
+        }
+
+        ArrayType arrayType = (ArrayType) fieldType;
+        DataType elementType = arrayType.getElementType();
+
+        // Handle primitive arrays
+        if (obj instanceof boolean[]) {
+            return new GenericArray((boolean[]) obj);
+        } else if (obj instanceof long[]) {
+            return new GenericArray((long[]) obj);
+        } else if (obj instanceof double[]) {
+            return new GenericArray((double[]) obj);
+        } else if (obj instanceof float[]) {
+            return new GenericArray((float[]) obj);
+        } else if (obj instanceof short[]) {
+            return new GenericArray((short[]) obj);
+        } else if (obj instanceof byte[]) {
+            return new GenericArray((byte[]) obj);
+        } else if (obj instanceof int[]) {
+            return new GenericArray((int[]) obj);
+        }
+        // Handle boxed wrapper arrays
+        else if (obj instanceof Boolean[]) {
+            return new GenericArray((Boolean[]) obj);
+        } else if (obj instanceof Long[]) {
+            return new GenericArray((Long[]) obj);
+        } else if (obj instanceof Double[]) {
+            return new GenericArray((Double[]) obj);
+        } else if (obj instanceof Float[]) {
+            return new GenericArray((Float[]) obj);
+        } else if (obj instanceof Short[]) {
+            return new GenericArray((Short[]) obj);
+        } else if (obj instanceof Byte[]) {
+            return new GenericArray((Byte[]) obj);
+        }
+
+        // Handle Object[] and java.util.Collection
+        Object[] elements;
+        if (obj instanceof Object[]) {
+            elements = (Object[]) obj;
+        } else if (obj instanceof Collection) {
+            elements = ((Collection<?>) obj).toArray();
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field %s has unsupported array type: %s. Expected 
array or Collection.",
+                            fieldName, obj.getClass().getName()));
+        }
+
+        Object[] converted = new Object[elements.length];
+        for (int i = 0; i < elements.length; i++) {
+            converted[i] = convertElementValue(elements[i], elementType, 
fieldName);
+        }
+        return new GenericArray(converted);
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
new file mode 100644
index 000000000..a3bdf8917
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.types.MapType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.fluss.client.converter.PojoTypeToFlussTypeConverter.convertElementValue;
+
+/** Adapter class for converting Pojo Map to Fluss InternalMap. */
+public class PojoMapToFlussMap {
+    private final Map<?, ?> pojoMap;
+    private final MapType mapType;
+    private final String fieldName;
+
+    public PojoMapToFlussMap(Map<?, ?> pojoMap, MapType mapType, String 
fieldName) {
+        this.pojoMap = pojoMap;
+        this.mapType = mapType;
+        this.fieldName = fieldName;
+    }
+
+    public GenericMap convertMap() {
+        if (pojoMap == null) {
+            return null;
+        }
+
+        Map<Object, Object> converted = new HashMap<>(pojoMap.size() * 2);
+        for (Map.Entry<?, ?> entry : pojoMap.entrySet()) {
+            Object convertedKey =
+                    convertElementValue(entry.getKey(), mapType.getKeyType(), 
fieldName);
+            Object convertedValue =
+                    convertElementValue(entry.getValue(), 
mapType.getValueType(), fieldName);
+            converted.put(convertedKey, convertedValue);
+        }
+
+        // Build the result map
+        return new GenericMap(converted);
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
index 0f4f6d8a4..14e46634a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -17,26 +17,17 @@
 
 package org.apache.fluss.client.converter;
 
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericRow;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Converter for writer path: converts POJO instances to Fluss InternalRow 
according to a (possibly
@@ -130,23 +121,42 @@ public final class PojoToRowConverter<T> {
                 return prop::read;
             case CHAR:
             case STRING:
-                return (obj) -> convertTextValue(fieldType, prop, 
prop.read(obj));
+                return (obj) ->
+                        PojoTypeToFlussTypeConverter.convertTextValue(
+                                fieldType, prop.name, prop.read(obj));
             case DECIMAL:
-                return (obj) -> convertDecimalValue((DecimalType) fieldType, 
prop, prop.read(obj));
+                return (obj) ->
+                        PojoTypeToFlussTypeConverter.convertDecimalValue(
+                                (DecimalType) fieldType, prop.name, 
prop.read(obj));
             case DATE:
-                return (obj) -> convertDateValue(prop, prop.read(obj));
+                return (obj) ->
+                        
PojoTypeToFlussTypeConverter.convertDateValue(prop.name, prop.read(obj));
             case TIME_WITHOUT_TIME_ZONE:
-                return (obj) -> convertTimeValue(prop, prop.read(obj));
+                return (obj) ->
+                        
PojoTypeToFlussTypeConverter.convertTimeValue(prop.name, prop.read(obj));
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 {
                     final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (obj) -> convertTimestampNtzValue(precision, prop, 
prop.read(obj));
+                    return (obj) ->
+                            
PojoTypeToFlussTypeConverter.convertTimestampNtzValue(
+                                    precision, prop.name, prop.read(obj));
                 }
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 {
                     final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (obj) -> convertTimestampLtzValue(precision, prop, 
prop.read(obj));
+                    return (obj) ->
+                            
PojoTypeToFlussTypeConverter.convertTimestampLtzValue(
+                                    precision, prop.name, prop.read(obj));
                 }
+            case ARRAY:
+                return (obj) ->
+                        new PojoArrayToFlussArray(prop.read(obj), fieldType, 
prop.name)
+                                .convertArray();
+            case MAP:
+                return (obj) ->
+                        new PojoMapToFlussMap(
+                                        (Map<?, ?>) prop.read(obj), (MapType) 
fieldType, prop.name)
+                                .convertMap();
             default:
                 throw new UnsupportedOperationException(
                         String.format(
@@ -155,175 +165,6 @@ public final class PojoToRowConverter<T> {
         }
     }
 
-    /**
-     * Converts a text value (String or Character) from a POJO property to 
Fluss BinaryString.
-     *
-     * <p>For CHAR columns, enforces that the text has exactly one character. 
Nulls are passed
-     * through.
-     */
-    private static @Nullable BinaryString convertTextValue(
-            DataType fieldType, PojoType.Property prop, @Nullable Object v) {
-        if (v == null) {
-            return null;
-        }
-        return ConverterCommons.toBinaryStringForText(v, prop.name, 
fieldType.getTypeRoot());
-    }
-
-    /** Converts a BigDecimal POJO property to Fluss Decimal respecting 
precision and scale. */
-    private static @Nullable Decimal convertDecimalValue(
-            DecimalType decimalType, PojoType.Property prop, @Nullable Object 
v) {
-        if (v == null) {
-            return null;
-        }
-        if (!(v instanceof BigDecimal)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Field %s is not a BigDecimal. Cannot convert to 
Decimal.", prop.name));
-        }
-        final int precision = decimalType.getPrecision();
-        final int scale = decimalType.getScale();
-
-        // Scale with a deterministic rounding mode to avoid 
ArithmeticException when rounding is
-        // needed.
-        BigDecimal bd = (BigDecimal) v;
-        BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
-
-        if (scaled.precision() > precision) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Decimal value for field %s exceeds precision %d 
after scaling to %d: %s",
-                            prop.name, precision, scale, scaled));
-        }
-
-        return Decimal.fromBigDecimal(scaled, precision, scale);
-    }
-
-    /** Converts a LocalDate POJO property to number of days since epoch. */
-    private static @Nullable Integer convertDateValue(PojoType.Property prop, 
@Nullable Object v) {
-        if (v == null) {
-            return null;
-        }
-        if (!(v instanceof LocalDate)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Field %s is not a LocalDate. Cannot convert to 
int days.", prop.name));
-        }
-        return (int) ((LocalDate) v).toEpochDay();
-    }
-
-    /** Converts a LocalTime POJO property to milliseconds of day. */
-    private static @Nullable Integer convertTimeValue(PojoType.Property prop, 
@Nullable Object v) {
-        if (v == null) {
-            return null;
-        }
-        if (!(v instanceof LocalTime)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Field %s is not a LocalTime. Cannot convert to 
int millis.",
-                            prop.name));
-        }
-        LocalTime t = (LocalTime) v;
-        return (int) (t.toNanoOfDay() / 1_000_000);
-    }
-
-    /**
-     * Converts a LocalDateTime POJO property to Fluss TimestampNtz, 
respecting the specified
-     * precision.
-     *
-     * @param precision the timestamp precision (0-9)
-     * @param prop the POJO property metadata
-     * @param v the value to convert
-     * @return TimestampNtz with precision applied, or null if v is null
-     */
-    private static @Nullable TimestampNtz convertTimestampNtzValue(
-            int precision, PojoType.Property prop, @Nullable Object v) {
-        if (v == null) {
-            return null;
-        }
-        if (!(v instanceof LocalDateTime)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Field %s is not a LocalDateTime. Cannot convert 
to TimestampNtz.",
-                            prop.name));
-        }
-        LocalDateTime ldt = (LocalDateTime) v;
-        LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
-        return TimestampNtz.fromLocalDateTime(truncated);
-    }
-
-    /**
-     * Converts an Instant or OffsetDateTime POJO property to Fluss 
TimestampLtz (UTC based),
-     * respecting the specified precision.
-     *
-     * @param precision the timestamp precision (0-9)
-     * @param prop the POJO property metadata
-     * @param v the value to convert
-     * @return TimestampLtz with precision applied, or null if v is null
-     */
-    private static @Nullable TimestampLtz convertTimestampLtzValue(
-            int precision, PojoType.Property prop, @Nullable Object v) {
-        if (v == null) {
-            return null;
-        }
-        Instant instant;
-        if (v instanceof Instant) {
-            instant = (Instant) v;
-        } else if (v instanceof OffsetDateTime) {
-            instant = ((OffsetDateTime) v).toInstant();
-        } else {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Field %s is not an Instant or OffsetDateTime. 
Cannot convert to TimestampLtz.",
-                            prop.name));
-        }
-        Instant truncated = truncateToTimestampPrecision(instant, precision);
-        return TimestampLtz.fromInstant(truncated);
-    }
-
-    /**
-     * Truncates a LocalDateTime to the specified timestamp precision.
-     *
-     * @param ldt the LocalDateTime to truncate
-     * @param precision the precision (0-9)
-     * @return truncated LocalDateTime
-     */
-    private static LocalDateTime truncateToTimestampPrecision(LocalDateTime 
ldt, int precision) {
-        if (precision >= 9) {
-            return ldt;
-        }
-        int nanos = ldt.getNano();
-        int truncatedNanos = truncateNanos(nanos, precision);
-        return ldt.withNano(truncatedNanos);
-    }
-
-    /**
-     * Truncates an Instant to the specified timestamp precision.
-     *
-     * @param instant the Instant to truncate
-     * @param precision the precision (0-9)
-     * @return truncated Instant
-     */
-    private static Instant truncateToTimestampPrecision(Instant instant, int 
precision) {
-        if (precision >= 9) {
-            return instant;
-        }
-        int nanos = instant.getNano();
-        int truncatedNanos = truncateNanos(nanos, precision);
-        return Instant.ofEpochSecond(instant.getEpochSecond(), truncatedNanos);
-    }
-
-    /**
-     * Truncates nanoseconds to the specified precision.
-     *
-     * @param nanos the nanoseconds value (0-999,999,999)
-     * @param precision the precision (0-9)
-     * @return truncated nanoseconds
-     */
-    private static int truncateNanos(int nanos, int precision) {
-        int divisor = (int) Math.pow(10, 9 - precision);
-        return (nanos / divisor) * divisor;
-    }
-
     private interface FieldToRow {
         Object readAndConvert(Object pojo) throws Exception;
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
similarity index 55%
copy from 
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
copy to 
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
index 0f4f6d8a4..3c8ce1bfd 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +20,12 @@ package org.apache.fluss.client.converter;
 
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
-import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DecimalType;
-import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.MapType;
 
 import javax.annotation.Nullable;
 
@@ -36,149 +36,34 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
-import java.util.List;
-
-/**
- * Converter for writer path: converts POJO instances to Fluss InternalRow 
according to a (possibly
- * projected) RowType. Validation is done against the full table schema.
- */
-public final class PojoToRowConverter<T> {
-
-    private final PojoType<T> pojoType;
-    private final RowType tableSchema;
-    private final RowType projection;
-    private final List<String> projectionFieldNames;
-    private final FieldToRow[] fieldConverters; // index corresponds to 
projection position
-
-    private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, 
RowType projection) {
-        this.pojoType = pojoType;
-        this.tableSchema = tableSchema;
-        this.projection = projection;
-        this.projectionFieldNames = projection.getFieldNames();
-        // For writer path, allow POJO to be a superset of the projection. It 
must contain all
-        // projected fields.
-        ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
-        ConverterCommons.validateProjectionSubset(projection, tableSchema);
-        this.fieldConverters = createFieldConverters();
-    }
-
-    public static <T> PojoToRowConverter<T> of(
-            Class<T> pojoClass, RowType tableSchema, RowType projection) {
-        return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema, 
projection);
-    }
-
-    public GenericRow toRow(@Nullable T pojo) {
-        if (pojo == null) {
-            return null;
-        }
-        GenericRow row = new GenericRow(projection.getFieldCount());
-        for (int i = 0; i < fieldConverters.length; i++) {
-            Object v;
-            try {
-                v = fieldConverters[i].readAndConvert(pojo);
-            } catch (RuntimeException re) {
-                throw re;
-            } catch (Exception e) {
-                throw new IllegalStateException(
-                        "Failed to access field '"
-                                + projectionFieldNames.get(i)
-                                + "' from POJO "
-                                + pojoType.getPojoClass().getName(),
-                        e);
-            }
-            row.setField(i, v);
-        }
-        return row;
-    }
-
-    private FieldToRow[] createFieldConverters() {
-        FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
-        for (int i = 0; i < projection.getFieldCount(); i++) {
-            String fieldName = projectionFieldNames.get(i);
-            DataType fieldType = projection.getTypeAt(i);
-            PojoType.Property prop = requireProperty(fieldName);
-            ConverterCommons.validateCompatibility(fieldType, prop);
-            arr[i] = createFieldConverter(prop, fieldType);
-        }
-        return arr;
-    }
-
-    private PojoType.Property requireProperty(String fieldName) {
-        PojoType.Property p = pojoType.getProperty(fieldName);
-        if (p == null) {
-            throw new IllegalArgumentException(
-                    "Field '"
-                            + fieldName
-                            + "' not found in POJO class "
-                            + pojoType.getPojoClass().getName()
-                            + ".");
-        }
-        return p;
-    }
-
-    private static FieldToRow createFieldConverter(PojoType.Property prop, 
DataType fieldType) {
-        switch (fieldType.getTypeRoot()) {
-            case BOOLEAN:
-            case TINYINT:
-            case SMALLINT:
-            case INTEGER:
-            case BIGINT:
-            case FLOAT:
-            case DOUBLE:
-            case BINARY:
-            case BYTES:
-                return prop::read;
-            case CHAR:
-            case STRING:
-                return (obj) -> convertTextValue(fieldType, prop, 
prop.read(obj));
-            case DECIMAL:
-                return (obj) -> convertDecimalValue((DecimalType) fieldType, 
prop, prop.read(obj));
-            case DATE:
-                return (obj) -> convertDateValue(prop, prop.read(obj));
-            case TIME_WITHOUT_TIME_ZONE:
-                return (obj) -> convertTimeValue(prop, prop.read(obj));
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                {
-                    final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (obj) -> convertTimestampNtzValue(precision, prop, 
prop.read(obj));
-                }
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                {
-                    final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (obj) -> convertTimestampLtzValue(precision, prop, 
prop.read(obj));
-                }
-            default:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Unsupported field type %s for field %s.",
-                                fieldType.getTypeRoot(), prop.name));
-        }
-    }
+import java.util.Map;
 
+/** Shared utilities for POJO type and Fluss type. */
+public class PojoTypeToFlussTypeConverter {
     /**
      * Converts a text value (String or Character) from a POJO property to 
Fluss BinaryString.
      *
      * <p>For CHAR columns, enforces that the text has exactly one character. 
Nulls are passed
      * through.
      */
-    private static @Nullable BinaryString convertTextValue(
-            DataType fieldType, PojoType.Property prop, @Nullable Object v) {
+    static @Nullable BinaryString convertTextValue(
+            DataType fieldType, String fieldName, @Nullable Object v) {
         if (v == null) {
             return null;
         }
-        return ConverterCommons.toBinaryStringForText(v, prop.name, 
fieldType.getTypeRoot());
+        return ConverterCommons.toBinaryStringForText(v, fieldName, 
fieldType.getTypeRoot());
     }
 
     /** Converts a BigDecimal POJO property to Fluss Decimal respecting 
precision and scale. */
-    private static @Nullable Decimal convertDecimalValue(
-            DecimalType decimalType, PojoType.Property prop, @Nullable Object 
v) {
+    static @Nullable Decimal convertDecimalValue(
+            DecimalType decimalType, String fieldName, @Nullable Object v) {
         if (v == null) {
             return null;
         }
         if (!(v instanceof BigDecimal)) {
             throw new IllegalArgumentException(
                     String.format(
-                            "Field %s is not a BigDecimal. Cannot convert to 
Decimal.", prop.name));
+                            "Field %s is not a BigDecimal. Cannot convert to 
Decimal.", fieldName));
         }
         final int precision = decimalType.getPrecision();
         final int scale = decimalType.getScale();
@@ -192,27 +77,27 @@ public final class PojoToRowConverter<T> {
             throw new IllegalArgumentException(
                     String.format(
                             "Decimal value for field %s exceeds precision %d 
after scaling to %d: %s",
-                            prop.name, precision, scale, scaled));
+                            fieldName, precision, scale, scaled));
         }
 
         return Decimal.fromBigDecimal(scaled, precision, scale);
     }
 
     /** Converts a LocalDate POJO property to number of days since epoch. */
-    private static @Nullable Integer convertDateValue(PojoType.Property prop, 
@Nullable Object v) {
+    static @Nullable Integer convertDateValue(String fieldName, @Nullable 
Object v) {
         if (v == null) {
             return null;
         }
         if (!(v instanceof LocalDate)) {
             throw new IllegalArgumentException(
                     String.format(
-                            "Field %s is not a LocalDate. Cannot convert to 
int days.", prop.name));
+                            "Field %s is not a LocalDate. Cannot convert to 
int days.", fieldName));
         }
         return (int) ((LocalDate) v).toEpochDay();
     }
 
     /** Converts a LocalTime POJO property to milliseconds of day. */
-    private static @Nullable Integer convertTimeValue(PojoType.Property prop, 
@Nullable Object v) {
+    static @Nullable Integer convertTimeValue(String fieldName, @Nullable 
Object v) {
         if (v == null) {
             return null;
         }
@@ -220,7 +105,7 @@ public final class PojoToRowConverter<T> {
             throw new IllegalArgumentException(
                     String.format(
                             "Field %s is not a LocalTime. Cannot convert to 
int millis.",
-                            prop.name));
+                            fieldName));
         }
         LocalTime t = (LocalTime) v;
         return (int) (t.toNanoOfDay() / 1_000_000);
@@ -231,12 +116,12 @@ public final class PojoToRowConverter<T> {
      * precision.
      *
      * @param precision the timestamp precision (0-9)
-     * @param prop the POJO property metadata
+     * @param fieldName the field name
      * @param v the value to convert
      * @return TimestampNtz with precision applied, or null if v is null
      */
-    private static @Nullable TimestampNtz convertTimestampNtzValue(
-            int precision, PojoType.Property prop, @Nullable Object v) {
+    static @Nullable TimestampNtz convertTimestampNtzValue(
+            int precision, String fieldName, @Nullable Object v) {
         if (v == null) {
             return null;
         }
@@ -244,7 +129,7 @@ public final class PojoToRowConverter<T> {
             throw new IllegalArgumentException(
                     String.format(
                             "Field %s is not a LocalDateTime. Cannot convert 
to TimestampNtz.",
-                            prop.name));
+                            fieldName));
         }
         LocalDateTime ldt = (LocalDateTime) v;
         LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
@@ -256,12 +141,12 @@ public final class PojoToRowConverter<T> {
      * respecting the specified precision.
      *
      * @param precision the timestamp precision (0-9)
-     * @param prop the POJO property metadata
+     * @param fieldName the field name
      * @param v the value to convert
      * @return TimestampLtz with precision applied, or null if v is null
      */
-    private static @Nullable TimestampLtz convertTimestampLtzValue(
-            int precision, PojoType.Property prop, @Nullable Object v) {
+    static @Nullable TimestampLtz convertTimestampLtzValue(
+            int precision, String fieldName, @Nullable Object v) {
         if (v == null) {
             return null;
         }
@@ -274,7 +159,7 @@ public final class PojoToRowConverter<T> {
             throw new IllegalArgumentException(
                     String.format(
                             "Field %s is not an Instant or OffsetDateTime. 
Cannot convert to TimestampLtz.",
-                            prop.name));
+                            fieldName));
         }
         Instant truncated = truncateToTimestampPrecision(instant, precision);
         return TimestampLtz.fromInstant(truncated);
@@ -324,7 +209,62 @@ public final class PojoToRowConverter<T> {
         return (nanos / divisor) * divisor;
     }
 
-    private interface FieldToRow {
-        Object readAndConvert(Object pojo) throws Exception;
+    /**
+     * Converts a Pojo element to Fluss internal type.
+     *
+     * @param obj the pojo
+     * @param elementType the data type of Fluss
+     * @param fieldName the field name
+     */
+    static @Nullable Object convertElementValue(
+            @Nullable Object obj, DataType elementType, String fieldName) {
+        if (obj == null) {
+            return null;
+        }
+
+        switch (elementType.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BINARY:
+            case BYTES:
+                return obj;
+            case CHAR:
+            case STRING:
+                return 
PojoTypeToFlussTypeConverter.convertTextValue(elementType, fieldName, obj);
+            case DECIMAL:
+                return PojoTypeToFlussTypeConverter.convertDecimalValue(
+                        (DecimalType) elementType, fieldName, obj);
+            case DATE:
+                return 
PojoTypeToFlussTypeConverter.convertDateValue(fieldName, obj);
+            case TIME_WITHOUT_TIME_ZONE:
+                return 
PojoTypeToFlussTypeConverter.convertTimeValue(fieldName, obj);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(elementType);
+                    return 
PojoTypeToFlussTypeConverter.convertTimestampNtzValue(
+                            precision, fieldName, obj);
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(elementType);
+                    return 
PojoTypeToFlussTypeConverter.convertTimestampLtzValue(
+                            precision, fieldName, obj);
+                }
+            case ARRAY:
+                return new PojoArrayToFlussArray(obj, elementType, 
fieldName).convertArray();
+            case MAP:
+                return new PojoMapToFlussMap((Map<?, ?>) obj, (MapType) 
elementType, fieldName)
+                        .convertMap();
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported field type %s for field %s.",
+                                elementType.getTypeRoot(), fieldName));
+        }
     }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
index a396285f5..aa7b0d315 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
@@ -17,26 +17,19 @@
 
 package org.apache.fluss.client.converter;
 
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeChecks;
-import org.apache.fluss.types.DataTypeRoot;
 import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
 
 import java.lang.reflect.InvocationTargetException;
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -50,6 +43,7 @@ public final class RowToPojoConverter<T> {
     private final RowType projection;
     private final List<String> projectionFieldNames;
     private final RowToField[] rowReaders;
+    private final PojoType.Property[] rowProps;
 
     private RowToPojoConverter(PojoType<T> pojoType, RowType tableSchema, 
RowType projection) {
         this.pojoType = pojoType;
@@ -58,7 +52,10 @@ public final class RowToPojoConverter<T> {
         this.projectionFieldNames = projection.getFieldNames();
         ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
         ConverterCommons.validateProjectionSubset(projection, tableSchema);
-        this.rowReaders = createRowReaders();
+        int fieldCount = projection.getFieldCount();
+        this.rowReaders = new RowToField[fieldCount];
+        this.rowProps = new PojoType.Property[fieldCount];
+        createRowReaders();
     }
 
     public static <T> RowToPojoConverter<T> of(
@@ -75,9 +72,8 @@ public final class RowToPojoConverter<T> {
             for (int i = 0; i < rowReaders.length; i++) {
                 if (!row.isNullAt(i)) {
                     Object v = rowReaders[i].convert(row, i);
-                    PojoType.Property prop = 
pojoType.getProperty(projectionFieldNames.get(i));
                     if (v != null) {
-                        prop.write(pojo, v);
+                        rowProps[i].write(pojo, v);
                     }
                 }
             }
@@ -96,16 +92,15 @@ public final class RowToPojoConverter<T> {
         }
     }
 
-    private RowToField[] createRowReaders() {
-        RowToField[] arr = new RowToField[projection.getFieldCount()];
-        for (int i = 0; i < projection.getFieldCount(); i++) {
+    private void createRowReaders() {
+        for (int i = 0; i < rowReaders.length; i++) {
             String name = projectionFieldNames.get(i);
             DataType type = projection.getTypeAt(i);
             PojoType.Property prop = requireProperty(name);
             ConverterCommons.validateCompatibility(type, prop);
-            arr[i] = createRowReader(type, prop);
+            rowReaders[i] = createRowReader(type, prop);
+            rowProps[i] = prop;
         }
-        return arr;
     }
 
     private PojoType.Property requireProperty(String fieldName) {
@@ -139,26 +134,68 @@ public final class RowToPojoConverter<T> {
                 return InternalRow::getDouble;
             case CHAR:
             case STRING:
-                return (row, pos) -> convertTextValue(fieldType, prop, 
row.getString(pos));
+                return (row, pos) ->
+                        FlussTypeToPojoTypeConverter.convertTextValue(
+                                fieldType, prop.name, prop.type, 
row.getString(pos));
             case BINARY:
             case BYTES:
                 return InternalRow::getBytes;
             case DECIMAL:
-                return (row, pos) -> convertDecimalValue((DecimalType) 
fieldType, row, pos);
+                DecimalType decimalType = (DecimalType) fieldType;
+                return (row, pos) ->
+                        FlussTypeToPojoTypeConverter.convertDecimalValue(
+                                row.getDecimal(
+                                        pos, decimalType.getPrecision(), 
decimalType.getScale()));
             case DATE:
-                return RowToPojoConverter::convertDateValue;
+                return (row, pos) -> 
FlussTypeToPojoTypeConverter.convertDateValue(row.getInt(pos));
             case TIME_WITHOUT_TIME_ZONE:
-                return RowToPojoConverter::convertTimeValue;
+                return (row, pos) -> 
FlussTypeToPojoTypeConverter.convertTimeValue(row.getInt(pos));
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 {
                     final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (row, pos) -> convertTimestampNtzValue(precision, 
row, pos);
+                    return (row, pos) ->
+                            
FlussTypeToPojoTypeConverter.convertTimestampNtzValue(
+                                    row.getTimestampNtz(pos, precision));
                 }
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 {
                     final int precision = 
DataTypeChecks.getPrecision(fieldType);
-                    return (row, pos) -> convertTimestampLtzValue(precision, 
prop, row, pos);
+                    return (row, pos) ->
+                            
FlussTypeToPojoTypeConverter.convertTimestampLtzValue(
+                                    row.getTimestampLtz(pos, precision), 
prop.name, prop.type);
                 }
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) fieldType;
+                if (Collection.class.isAssignableFrom(prop.type)) {
+                    // POJO field is a List / Collection — deserialize as 
ArrayList<Object>
+                    return (row, pos) -> {
+                        InternalArray array = row.getArray(pos);
+                        return array == null
+                                ? null
+                                : new FlussArrayToPojoArray(
+                                                array,
+                                                arrayType.getElementType(),
+                                                prop.name,
+                                                Object.class)
+                                        .convertList();
+                    };
+                }
+                final Class<?> componentType = prop.type.getComponentType();
+                return (row, pos) -> {
+                    InternalArray array = row.getArray(pos);
+                    return array == null
+                            ? null
+                            : new FlussArrayToPojoArray(
+                                            array,
+                                            arrayType.getElementType(),
+                                            prop.name,
+                                            componentType)
+                                    .convertArray();
+                };
+            case MAP:
+                return (row, pos) ->
+                        new FlussMapToPojoMap(row.getMap(pos), (MapType) 
fieldType, prop.name)
+                                .convertMap();
             default:
                 throw new UnsupportedOperationException(
                         String.format(
@@ -167,98 +204,6 @@ public final class RowToPojoConverter<T> {
         }
     }
 
-    /**
-     * Converts a text value (CHAR/STRING) read from an InternalRow into the 
target Java type
-     * declared by the POJO property.
-     *
-     * <p>Supported target types are String and Character. For CHAR columns, 
this enforces that the
-     * value has exactly one character. For Character targets, empty strings 
are rejected.
-     *
-     * @param fieldType Fluss column DataType (must be CHAR or STRING)
-     * @param prop The target POJO property (used for type and error messages)
-     * @param s The BinaryString read from the row
-     * @return Converted Java value (String or Character)
-     * @throws IllegalArgumentException if the target type is unsupported or 
constraints are
-     *     violated
-     */
-    private static Object convertTextValue(
-            DataType fieldType, PojoType.Property prop, BinaryString s) {
-        String v = s.toString();
-        if (prop.type == String.class) {
-            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
-                throw new IllegalArgumentException(
-                        charLengthExceptionMessage(prop.name, v.length()));
-            }
-            return v;
-        } else if (prop.type == Character.class) {
-            if (v.isEmpty()) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "Field %s expects Character, but the string 
value is empty.",
-                                prop.name));
-            }
-            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
-                throw new IllegalArgumentException(
-                        charLengthExceptionMessage(prop.name, v.length()));
-            }
-            return v.charAt(0);
-        }
-        throw new IllegalArgumentException(
-                String.format(
-                        "Field %s is not a String or Character. Cannot convert 
from string.",
-                        prop.name));
-    }
-
-    public static String charLengthExceptionMessage(String fieldName, int 
length) {
-        return String.format(
-                "Field %s expects exactly one character for CHAR type, got 
length %d.",
-                fieldName, length);
-    }
-
-    /**
-     * Converts a DECIMAL value from an InternalRow into a BigDecimal using 
the column's precision
-     * and scale. The row position is assumed non-null (caller checks), so 
this never returns null.
-     */
-    private static BigDecimal convertDecimalValue(
-            DecimalType decimalType, InternalRow row, int pos) {
-        Decimal d = row.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale());
-        return d.toBigDecimal();
-    }
-
-    /** Converts a DATE value stored as int days since epoch to a LocalDate. */
-    private static LocalDate convertDateValue(InternalRow row, int pos) {
-        return LocalDate.ofEpochDay(row.getInt(pos));
-    }
-
-    /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to 
a LocalTime. */
-    private static LocalTime convertTimeValue(InternalRow row, int pos) {
-        return LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L);
-    }
-
-    /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime 
honoring precision. */
-    private static Object convertTimestampNtzValue(int precision, InternalRow 
row, int pos) {
-        TimestampNtz t = row.getTimestampNtz(pos, precision);
-        return t.toLocalDateTime();
-    }
-
-    /**
-     * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or 
OffsetDateTime in UTC,
-     * depending on the target POJO property type.
-     */
-    private static Object convertTimestampLtzValue(
-            int precision, PojoType.Property prop, InternalRow row, int pos) {
-        TimestampLtz t = row.getTimestampLtz(pos, precision);
-        if (prop.type == Instant.class) {
-            return t.toInstant();
-        } else if (prop.type == OffsetDateTime.class) {
-            return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
-        }
-        throw new IllegalArgumentException(
-                String.format(
-                        "Field %s is not an Instant or OffsetDateTime. Cannot 
convert from TimestampData.",
-                        prop.name));
-    }
-
     private interface RowToField {
         Object convert(InternalRow row, int pos) throws Exception;
     }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
index 8863b9f23..8fecb7b38 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
@@ -28,6 +28,7 @@ import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -53,10 +54,13 @@ public final class ConvertersTestFixtures {
                 .field("timestampField", DataTypes.TIMESTAMP())
                 .field("timestampLtzField", DataTypes.TIMESTAMP_LTZ())
                 .field("offsetDateTimeField", DataTypes.TIMESTAMP_LTZ())
+                .field("arrayField", DataTypes.ARRAY(DataTypes.INT()))
+                .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
                 .build();
     }
 
     // ----------------------- Helper POJOs -----------------------
+
     /** Test POJO used for end-to-end converter tests. */
     public static class TestPojo {
         public Boolean booleanField;
@@ -74,6 +78,8 @@ public final class ConvertersTestFixtures {
         public LocalDateTime timestampField;
         public Instant timestampLtzField;
         public OffsetDateTime offsetDateTimeField;
+        public Integer[] arrayField;
+        public Map<String, Integer> mapField;
 
         public TestPojo() {}
 
@@ -92,7 +98,9 @@ public final class ConvertersTestFixtures {
                 LocalTime timeField,
                 LocalDateTime timestampField,
                 Instant timestampLtzField,
-                OffsetDateTime offsetDateTimeField) {
+                OffsetDateTime offsetDateTimeField,
+                Integer[] arrayField,
+                Map<String, Integer> mapField) {
             this.booleanField = booleanField;
             this.byteField = byteField;
             this.shortField = shortField;
@@ -108,6 +116,8 @@ public final class ConvertersTestFixtures {
             this.timestampField = timestampField;
             this.timestampLtzField = timestampLtzField;
             this.offsetDateTimeField = offsetDateTimeField;
+            this.arrayField = arrayField;
+            this.mapField = mapField;
         }
 
         public static TestPojo sample() {
@@ -126,7 +136,14 @@ public final class ConvertersTestFixtures {
                     LocalTime.of(15, 1, 30),
                     LocalDateTime.of(2025, 7, 23, 15, 1, 30),
                     Instant.parse("2025-07-23T15:01:30Z"),
-                    OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0, 
ZoneOffset.UTC));
+                    OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0, 
ZoneOffset.UTC),
+                    new Integer[] {1, 2},
+                    new HashMap<String, Integer>() {
+                        {
+                            put("test_1", 1);
+                            put("test_2", 2);
+                        }
+                    });
         }
 
         @Override
@@ -152,7 +169,9 @@ public final class ConvertersTestFixtures {
                     && Objects.equals(timeField, testPojo.timeField)
                     && Objects.equals(timestampField, testPojo.timestampField)
                     && Objects.equals(timestampLtzField, 
testPojo.timestampLtzField)
-                    && Objects.equals(offsetDateTimeField, 
testPojo.offsetDateTimeField);
+                    && Objects.equals(offsetDateTimeField, 
testPojo.offsetDateTimeField)
+                    && Arrays.equals(arrayField, testPojo.arrayField)
+                    && Objects.equals(mapField, testPojo.mapField);
         }
 
         @Override
@@ -172,8 +191,10 @@ public final class ConvertersTestFixtures {
                             timeField,
                             timestampField,
                             timestampLtzField,
-                            offsetDateTimeField);
+                            offsetDateTimeField,
+                            mapField);
             result = 31 * result + Arrays.hashCode(bytesField);
+            result = 31 * result + Arrays.hashCode(arrayField);
             return result;
         }
     }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
new file mode 100644
index 000000000..c241a966f
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FlussArrayToPojoArray}. */
+public class FlussArrayToPojoArrayTest {
+    @Test
+    public void testArrayWithAllTypes() {
+        RowType table =
+                RowType.builder()
+                        .field("booleanArray", 
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+                        .field("byteArray", 
DataTypes.ARRAY(DataTypes.TINYINT()))
+                        .field("shortArray", 
DataTypes.ARRAY(DataTypes.SMALLINT()))
+                        .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+                        .field("longArray", 
DataTypes.ARRAY(DataTypes.BIGINT()))
+                        .field("floatArray", 
DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .field("doubleArray", 
DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .field("stringArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("decimalArray", 
DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+                        .field("dateArray", DataTypes.ARRAY(DataTypes.DATE()))
+                        .field("timeArray", DataTypes.ARRAY(DataTypes.TIME()))
+                        .field("timestampArray", 
DataTypes.ARRAY(DataTypes.TIMESTAMP(3)))
+                        .field("timestampLtzArray", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+                        .field("nestedIntArray", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                        .field(
+                                "mapArray",
+                                
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())))
+                        .build();
+
+        PojoToRowConverter<ArrayPojo> writer = 
PojoToRowConverter.of(ArrayPojo.class, table, table);
+        RowToPojoConverter<ArrayPojo> reader = 
RowToPojoConverter.of(ArrayPojo.class, table, table);
+
+        ArrayPojo pojo = ArrayPojo.sample();
+
+        // POJO -> Row -> POJO
+        GenericRow row = writer.toRow(pojo);
+        ArrayPojo back = reader.fromRow(row);
+
+        // Verify boolean array
+        Object[] boolArray = back.booleanArray;
+        assertThat(boolArray.length).isEqualTo(2);
+        assertThat(boolArray).isEqualTo(new Boolean[] {true, false});
+
+        // Verify byte array
+        Object[] byteArray = back.byteArray;
+        assertThat(byteArray.length).isEqualTo(2);
+        assertThat(byteArray).isEqualTo(new Byte[] {1, 2});
+
+        // Verify short array
+        Object[] shortArray = back.shortArray;
+        assertThat(shortArray.length).isEqualTo(2);
+        assertThat(shortArray).isEqualTo(new Short[] {100, 200});
+
+        // Verify int array
+        Object[] intArray = back.intArray;
+        assertThat(intArray.length).isEqualTo(2);
+        assertThat(intArray).isEqualTo(new Integer[] {1000, 2000});
+
+        // Verify long array
+        Object[] longArray = back.longArray;
+        assertThat(longArray.length).isEqualTo(2);
+        assertThat(longArray).isEqualTo(new Long[] {10000L, 20000L});
+
+        // Verify float array
+        Object[] floatArray = back.floatArray;
+        assertThat(floatArray.length).isEqualTo(2);
+        assertThat(floatArray).isEqualTo(new Float[] {1.1f, 2.2f});
+
+        // Verify double array
+        Object[] doubleArray = back.doubleArray;
+        assertThat(doubleArray.length).isEqualTo(2);
+        assertThat(doubleArray).isEqualTo(new Double[] {1.11, 2.22});
+
+        // Verify string array
+        Object[] stringArray = back.stringArray;
+        assertThat(stringArray.length).isEqualTo(2);
+        assertThat(stringArray).isEqualTo(new String[] {"hello", "world"});
+
+        // Verify decimal array
+        Object[] decimalArray = back.decimalArray;
+        assertThat(decimalArray.length).isEqualTo(2);
+        assertThat(decimalArray)
+                .isEqualTo(new BigDecimal[] {new BigDecimal("123.45"), new 
BigDecimal("678.90")});
+
+        // Verify date array (days since epoch)
+        Object[] dateArray = back.dateArray;
+        assertThat(dateArray.length).isEqualTo(2);
+        assertThat(dateArray)
+                .isEqualTo(new LocalDate[] {LocalDate.of(2025, 1, 1), 
LocalDate.of(2025, 12, 31)});
+
+        // Verify time array (millis of day)
+        Object[] timeArray = back.timeArray;
+        assertThat(timeArray.length).isEqualTo(2);
+        assertThat(timeArray)
+                .isEqualTo(new LocalTime[] {LocalTime.MIDNIGHT, 
LocalTime.of(12, 30, 0)});
+
+        // Verify timestamp array
+        Object[] timestampArray = back.timestampArray;
+        assertThat(timestampArray.length).isEqualTo(2);
+        assertThat(timestampArray)
+                .isEqualTo(
+                        new LocalDateTime[] {
+                            LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+                            LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+                        });
+
+        // Verify timestampLtz array
+        Object[] timestampLtzArray = back.timestampLtzArray;
+        assertThat(timestampLtzArray.length).isEqualTo(2);
+        assertThat(timestampLtzArray)
+                .isEqualTo(
+                        new Instant[] {
+                            Instant.parse("2025-01-01T00:00:00Z"),
+                            Instant.parse("2025-12-31T23:59:59Z")
+                        });
+
+        // Verify nested array (array<array<int>>)
+        Object[][] nestedIntArray = back.nestedIntArray;
+        assertThat(nestedIntArray.length).isEqualTo(2);
+        assertThat(back.nestedIntArray)
+                .isEqualTo(
+                        new Integer[][] {
+                            {1, 2},
+                            {3, 4, 5}
+                        });
+        // Verify map array (array<map<string, int>>)
+        Object[] mapArray = back.mapArray;
+        assertThat(mapArray.length).isEqualTo(2);
+        assertThat(back.mapArray)
+                .isEqualTo(
+                        new HashMap[] {
+                            new HashMap<String, Integer>() {
+                                {
+                                    put("test_1", 1);
+                                    put("test_2", 2);
+                                }
+                            },
+                            new HashMap<String, Integer>() {
+                                {
+                                    put("test_3", 3);
+                                    put("test_4", 4);
+                                }
+                            }
+                        });
+    }
+
+    @Test
+    public void testArrayWithNullElements() {
+        RowType table =
+                RowType.builder()
+                        .field("stringArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("intObjectArray", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<NullableArrayPojo> writer =
+                PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+        RowToPojoConverter<NullableArrayPojo> reader =
+                RowToPojoConverter.of(NullableArrayPojo.class, table, table);
+
+        NullableArrayPojo pojo = new NullableArrayPojo();
+        pojo.stringArray = new String[] {"hello", null, "world"};
+        pojo.intObjectArray = new Integer[] {1, null, 3};
+
+        // POJO -> Row -> POJO
+        GenericRow row = writer.toRow(pojo);
+        NullableArrayPojo back = reader.fromRow(row);
+
+        Object[] stringArray = back.stringArray;
+        assertThat(stringArray.length).isEqualTo(3);
+        assertThat(stringArray).isEqualTo(new String[] {"hello", null, 
"world"});
+
+        Object[] intArray = back.intObjectArray;
+        assertThat(intArray.length).isEqualTo(3);
+        assertThat(intArray).isEqualTo(new Integer[] {1, null, 3});
+    }
+
+    @Test
+    public void testNullArrayField() {
+        RowType table =
+                RowType.builder().field("intArray", 
DataTypes.ARRAY(DataTypes.INT())).build();
+
+        PojoToRowConverter<SimpleArrayPojo> writer =
+                PojoToRowConverter.of(SimpleArrayPojo.class, table, table);
+        RowToPojoConverter<SimpleArrayPojo> reader =
+                RowToPojoConverter.of(SimpleArrayPojo.class, table, table);
+
+        SimpleArrayPojo pojo = new SimpleArrayPojo();
+        pojo.intArray = null;
+
+        // POJO -> Row -> POJO
+        GenericRow row = writer.toRow(pojo);
+        SimpleArrayPojo back = reader.fromRow(row);
+        assertThat(back.intArray).isEqualTo(null);
+    }
+
+    @Test
+    public void testListFieldsOnReadPath() {
+        // POJO fields declared as List<T> should deserialize as 
ArrayList<Object>
+        RowType table =
+                RowType.builder()
+                        .field("intList", DataTypes.ARRAY(DataTypes.INT()))
+                        .field("strList", DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("nullableList", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<ListFieldPojo> writer =
+                PojoToRowConverter.of(ListFieldPojo.class, table, table);
+        RowToPojoConverter<ListFieldPojo> reader =
+                RowToPojoConverter.of(ListFieldPojo.class, table, table);
+
+        ListFieldPojo pojo = new ListFieldPojo();
+        pojo.intList = Arrays.asList(1, 2, 3);
+        pojo.strList = Arrays.asList("x", "y");
+        pojo.nullableList = Arrays.asList(10, null, 30);
+
+        GenericRow row = writer.toRow(pojo);
+        ListFieldPojo back = reader.fromRow(row);
+
+        assertThat(back.intList).isInstanceOf(List.class).containsExactly(1, 
2, 3);
+        assertThat(back.strList).isInstanceOf(List.class).containsExactly("x", 
"y");
+        
assertThat(back.nullableList).isInstanceOf(List.class).containsExactly(10, 
null, 30);
+    }
+
+    /** POJO with List fields for read-path List deserialization tests. */
+    public static class ListFieldPojo {
+        public List<Integer> intList;
+        public List<String> strList;
+        public List<Integer> nullableList;
+
+        public ListFieldPojo() {}
+    }
+
+    @Test
+    public void testTypedArrayFieldsOnReadPath() {
+        // Verify that typed POJO array fields (String[], Character[], 
OffsetDateTime[]) are
+        // handled correctly on the read path — not just Object[].
+        RowType table =
+                RowType.builder()
+                        .field("stringArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("charArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+                        .field("timestampLtzArray", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+                        .build();
+
+        PojoToRowConverter<TypedArrayPojo> writer =
+                PojoToRowConverter.of(TypedArrayPojo.class, table, table);
+        RowToPojoConverter<TypedArrayPojo> reader =
+                RowToPojoConverter.of(TypedArrayPojo.class, table, table);
+
+        TypedArrayPojo pojo = new TypedArrayPojo();
+        pojo.stringArray = new String[] {"hello", "world"};
+        pojo.charArray = new Character[] {'A', 'B'};
+        pojo.intArray = new Integer[] {10, 20};
+        pojo.timestampLtzArray =
+                new OffsetDateTime[] {
+                    OffsetDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
+                    OffsetDateTime.of(2025, 6, 15, 12, 0, 0, 0, ZoneOffset.UTC)
+                };
+
+        GenericRow row = writer.toRow(pojo);
+        TypedArrayPojo back = reader.fromRow(row);
+
+        assertThat((String[]) back.stringArray).isEqualTo(new String[] 
{"hello", "world"});
+        assertThat((Character[]) back.charArray).isEqualTo(new Character[] 
{'A', 'B'});
+        assertThat((Integer[]) back.intArray).isEqualTo(new Integer[] {10, 
20});
+        assertThat(((OffsetDateTime[]) back.timestampLtzArray)[0])
+                .isEqualTo(OffsetDateTime.of(2025, 1, 1, 0, 0, 0, 0, 
ZoneOffset.UTC));
+        assertThat(((OffsetDateTime[]) back.timestampLtzArray)[1])
+                .isEqualTo(OffsetDateTime.of(2025, 6, 15, 12, 0, 0, 0, 
ZoneOffset.UTC));
+    }
+
+    @Test
+    public void testNullElementInPrimitiveArrayThrows() {
+        // A Fluss ARRAY<INT> with a null element cannot be read into an int[] 
POJO field.
+        // The converter must throw a clear NullPointerException rather than a 
cryptic one.
+        RowType table =
+                RowType.builder().field("intObjectArray", 
DataTypes.ARRAY(DataTypes.INT())).build();
+
+        PojoToRowConverter<NullableArrayPojo> writer =
+                PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+        RowToPojoConverter<PrimitiveIntArrayPojo> reader =
+                RowToPojoConverter.of(PrimitiveIntArrayPojo.class, table, 
table);
+
+        NullableArrayPojo pojo = new NullableArrayPojo();
+        pojo.stringArray = null; // unused — only intObjectArray is in 
projection
+        pojo.intObjectArray = new Integer[] {1, null, 3};
+
+        GenericRow row = writer.toRow(pojo);
+        assertThatThrownBy(() -> reader.fromRow(row))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("primitive array")
+                .hasMessageContaining("intObjectArray");
+    }
+
+    /** POJO with typed (non-Object[]) array fields for read-path 
type-fidelity tests. */
+    public static class TypedArrayPojo {
+        public String[] stringArray;
+        public Character[] charArray;
+        public Integer[] intArray;
+        public OffsetDateTime[] timestampLtzArray;
+
+        public TypedArrayPojo() {}
+    }
+
+    /** POJO with primitive int[] for null-element guard test. */
+    public static class PrimitiveIntArrayPojo {
+        public int[] intObjectArray;
+
+        public PrimitiveIntArrayPojo() {}
+    }
+
+    /** POJO for testing all array types. */
+    @SuppressWarnings("unchecked")
+    public static class ArrayPojo {
+        public Object[] booleanArray;
+        public Object[] byteArray;
+        public Object[] shortArray;
+        public Object[] intArray;
+        public Object[] longArray;
+        public Object[] floatArray;
+        public Object[] doubleArray;
+        public Object[] stringArray;
+        public Object[] decimalArray;
+        public Object[] dateArray;
+        public Object[] timeArray;
+        public Object[] timestampArray;
+        public Object[] timestampLtzArray;
+        public Object[][] nestedIntArray;
+        public Map<Object, Object>[] mapArray;
+
+        public ArrayPojo() {}
+
+        public static ArrayPojo sample() {
+            ArrayPojo pojo = new ArrayPojo();
+            pojo.booleanArray = new Boolean[] {true, false};
+            pojo.byteArray = new Byte[] {1, 2};
+            pojo.shortArray = new Short[] {100, 200};
+            pojo.intArray = new Integer[] {1000, 2000};
+            pojo.longArray = new Long[] {10000L, 20000L};
+            pojo.floatArray = new Float[] {1.1f, 2.2f};
+            pojo.doubleArray = new Double[] {1.11, 2.22};
+            pojo.stringArray = new String[] {"hello", "world"};
+            pojo.decimalArray =
+                    new BigDecimal[] {new BigDecimal("123.45"), new 
BigDecimal("678.90")};
+            pojo.dateArray = new LocalDate[] {LocalDate.of(2025, 1, 1), 
LocalDate.of(2025, 12, 31)};
+            pojo.timeArray = new LocalTime[] {LocalTime.MIDNIGHT, 
LocalTime.of(12, 30, 0)};
+            pojo.timestampArray =
+                    new LocalDateTime[] {
+                        LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+                        LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+                    };
+            pojo.timestampLtzArray =
+                    new Instant[] {
+                        Instant.parse("2025-01-01T00:00:00Z"), 
Instant.parse("2025-12-31T23:59:59Z")
+                    };
+            pojo.nestedIntArray =
+                    new Integer[][] {
+                        {1, 2},
+                        {3, 4, 5}
+                    };
+            pojo.mapArray =
+                    new HashMap[] {
+                        new HashMap<Object, Object>() {
+                            {
+                                put("test_1", 1);
+                                put("test_2", 2);
+                            }
+                        },
+                        new HashMap<Object, Object>() {
+                            {
+                                put("test_3", 3);
+                                put("test_4", 4);
+                            }
+                        }
+                    };
+            return pojo;
+        }
+    }
+
+    /** POJO for testing arrays with null elements. */
+    public static class NullableArrayPojo {
+        public Object[] stringArray;
+        public Object[] intObjectArray;
+
+        public NullableArrayPojo() {}
+    }
+
+    /** Simple POJO for testing null array field. */
+    public static class SimpleArrayPojo {
+        public Object[] intArray;
+
+        public SimpleArrayPojo() {}
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
new file mode 100644
index 000000000..2c21e7222
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PojoArrayToFlussArray}. */
+public class PojoArrayToFlussArrayTest {
+    @Test
+    public void testArrayWithAllTypes() {
+        // Schema with all array types
+        RowType table =
+                RowType.builder()
+                        .field("primitiveBooleanArray", 
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+                        .field("primitiveByteArray", 
DataTypes.ARRAY(DataTypes.TINYINT()))
+                        .field("primitiveShortArray", 
DataTypes.ARRAY(DataTypes.SMALLINT()))
+                        .field("primitiveIntArray", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .field("primitiveLongArray", 
DataTypes.ARRAY(DataTypes.BIGINT()))
+                        .field("primitiveFloatArray", 
DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .field("primitiveDoubleArray", 
DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .field("booleanArray", 
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+                        .field("byteArray", 
DataTypes.ARRAY(DataTypes.TINYINT()))
+                        .field("shortArray", 
DataTypes.ARRAY(DataTypes.SMALLINT()))
+                        .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+                        .field("longArray", 
DataTypes.ARRAY(DataTypes.BIGINT()))
+                        .field("floatArray", 
DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .field("doubleArray", 
DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .field("stringArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("decimalArray", 
DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+                        .field("dateArray", DataTypes.ARRAY(DataTypes.DATE()))
+                        .field("timeArray", DataTypes.ARRAY(DataTypes.TIME()))
+                        .field("timestampArray", 
DataTypes.ARRAY(DataTypes.TIMESTAMP(3)))
+                        .field("timestampLtzArray", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+                        .field("nestedIntArray", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                        .field(
+                                "mapArray",
+                                
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())))
+                        .build();
+
+        PojoToRowConverter<ArrayPojo> writer = 
PojoToRowConverter.of(ArrayPojo.class, table, table);
+
+        ArrayPojo pojo = ArrayPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+
+        // Verify primitive boolean array
+        InternalArray primitiveBoolArray = row.getArray(0);
+        assertThat(primitiveBoolArray.size()).isEqualTo(2);
+        assertThat(primitiveBoolArray.getBoolean(0)).isTrue();
+        assertThat(primitiveBoolArray.getBoolean(1)).isFalse();
+
+        // Verify primitive byte array
+        InternalArray primitiveByteArray = row.getArray(1);
+        assertThat(primitiveByteArray.size()).isEqualTo(2);
+        assertThat(primitiveByteArray.getByte(0)).isEqualTo((byte) 1);
+        assertThat(primitiveByteArray.getByte(1)).isEqualTo((byte) 2);
+
+        // Verify primitive short array
+        InternalArray primitiveShortArray = row.getArray(2);
+        assertThat(primitiveShortArray.size()).isEqualTo(2);
+        assertThat(primitiveShortArray.getShort(0)).isEqualTo((short) 100);
+        assertThat(primitiveShortArray.getShort(1)).isEqualTo((short) 200);
+
+        // Verify primitive int array
+        InternalArray primitiveIntArray = row.getArray(3);
+        assertThat(primitiveIntArray.size()).isEqualTo(2);
+        assertThat(primitiveIntArray.getInt(0)).isEqualTo(1000);
+        assertThat(primitiveIntArray.getInt(1)).isEqualTo(2000);
+
+        // Verify primitive long array
+        InternalArray primitiveLongArray = row.getArray(4);
+        assertThat(primitiveLongArray.size()).isEqualTo(2);
+        assertThat(primitiveLongArray.getLong(0)).isEqualTo(10000L);
+        assertThat(primitiveLongArray.getLong(1)).isEqualTo(20000L);
+
+        // Verify primitive float array
+        InternalArray primitiveFloatArray = row.getArray(5);
+        assertThat(primitiveFloatArray.size()).isEqualTo(2);
+        assertThat(primitiveFloatArray.getFloat(0)).isEqualTo(1.1f);
+        assertThat(primitiveFloatArray.getFloat(1)).isEqualTo(2.2f);
+
+        // Verify primitive double array
+        InternalArray primitiveDoubleArray = row.getArray(6);
+        assertThat(primitiveDoubleArray.size()).isEqualTo(2);
+        assertThat(primitiveDoubleArray.getDouble(0)).isEqualTo(1.11);
+        assertThat(primitiveDoubleArray.getDouble(1)).isEqualTo(2.22);
+
+        // Verify boolean array
+        InternalArray boolArray = row.getArray(7);
+        assertThat(boolArray.size()).isEqualTo(2);
+        assertThat(boolArray.getBoolean(0)).isTrue();
+        assertThat(boolArray.getBoolean(1)).isFalse();
+
+        // Verify byte array
+        InternalArray byteArray = row.getArray(8);
+        assertThat(byteArray.size()).isEqualTo(2);
+        assertThat(byteArray.getByte(0)).isEqualTo((byte) 1);
+        assertThat(byteArray.getByte(1)).isEqualTo((byte) 2);
+
+        // Verify short array
+        InternalArray shortArray = row.getArray(9);
+        assertThat(shortArray.size()).isEqualTo(2);
+        assertThat(shortArray.getShort(0)).isEqualTo((short) 100);
+        assertThat(shortArray.getShort(1)).isEqualTo((short) 200);
+
+        // Verify int array
+        InternalArray intArray = row.getArray(10);
+        assertThat(intArray.size()).isEqualTo(2);
+        assertThat(intArray.getInt(0)).isEqualTo(1000);
+        assertThat(intArray.getInt(1)).isEqualTo(2000);
+
+        // Verify long array
+        InternalArray longArray = row.getArray(11);
+        assertThat(longArray.size()).isEqualTo(2);
+        assertThat(longArray.getLong(0)).isEqualTo(10000L);
+        assertThat(longArray.getLong(1)).isEqualTo(20000L);
+
+        // Verify float array
+        InternalArray floatArray = row.getArray(12);
+        assertThat(floatArray.size()).isEqualTo(2);
+        assertThat(floatArray.getFloat(0)).isEqualTo(1.1f);
+        assertThat(floatArray.getFloat(1)).isEqualTo(2.2f);
+
+        // Verify double array
+        InternalArray doubleArray = row.getArray(13);
+        assertThat(doubleArray.size()).isEqualTo(2);
+        assertThat(doubleArray.getDouble(0)).isEqualTo(1.11);
+        assertThat(doubleArray.getDouble(1)).isEqualTo(2.22);
+
+        // Verify string array
+        InternalArray stringArray = row.getArray(14);
+        assertThat(stringArray.size()).isEqualTo(2);
+        assertThat(stringArray.getString(0).toString()).isEqualTo("hello");
+        assertThat(stringArray.getString(1).toString()).isEqualTo("world");
+
+        // Verify decimal array
+        InternalArray decimalArray = row.getArray(15);
+        assertThat(decimalArray.size()).isEqualTo(2);
+        assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal())
+                .isEqualTo(new BigDecimal("123.45"));
+        assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal())
+                .isEqualTo(new BigDecimal("678.90"));
+
+        // Verify date array (days since epoch)
+        InternalArray dateArray = row.getArray(16);
+        assertThat(dateArray.size()).isEqualTo(2);
+        assertThat(dateArray.getInt(0)).isEqualTo((int) LocalDate.of(2025, 1, 
1).toEpochDay());
+        assertThat(dateArray.getInt(1)).isEqualTo((int) LocalDate.of(2025, 12, 
31).toEpochDay());
+
+        // Verify time array (millis of day)
+        InternalArray timeArray = row.getArray(17);
+        assertThat(timeArray.size()).isEqualTo(2);
+        assertThat(timeArray.getInt(0)).isEqualTo(0); // midnight
+        assertThat(timeArray.getInt(1))
+                .isEqualTo((int) (LocalTime.of(12, 30, 0).toNanoOfDay() / 
1_000_000));
+
+        // Verify timestamp array
+        InternalArray timestampArray = row.getArray(18);
+        assertThat(timestampArray.size()).isEqualTo(2);
+        assertThat(timestampArray.getTimestampNtz(0, 3).getMillisecond())
+                .isEqualTo(
+                        LocalDateTime.of(2025, 7, 23, 15, 0, 0)
+                                .atZone(java.time.ZoneOffset.UTC)
+                                .toInstant()
+                                .toEpochMilli());
+
+        // Verify timestampLtz array
+        InternalArray timestampLtzArray = row.getArray(19);
+        assertThat(timestampLtzArray.size()).isEqualTo(2);
+        assertThat(timestampLtzArray.getTimestampLtz(0, 
3).getEpochMillisecond())
+                
.isEqualTo(Instant.parse("2025-01-01T00:00:00Z").toEpochMilli());
+
+        // Verify nested array (array<array<int>>)
+        InternalArray nestedArray = row.getArray(20);
+        assertThat(nestedArray.size()).isEqualTo(2);
+        InternalArray innerArray1 = nestedArray.getArray(0);
+        assertThat(innerArray1.getInt(0)).isEqualTo(1);
+        assertThat(innerArray1.getInt(1)).isEqualTo(2);
+        InternalArray innerArray2 = nestedArray.getArray(1);
+        assertThat(innerArray2.getInt(0)).isEqualTo(3);
+        assertThat(innerArray2.getInt(1)).isEqualTo(4);
+        assertThat(innerArray2.getInt(2)).isEqualTo(5);
+
+        // Verify map array (array<map<string, int>>)
+        InternalArray mapArray = row.getArray(21);
+        assertThat(mapArray.size()).isEqualTo(2);
+        // Verify inner map 1
+        InternalMap innerMap1 = mapArray.getMap(0);
+        assertThat(innerMap1.size()).isEqualTo(2);
+        InternalArray keyArray1 = innerMap1.keyArray();
+        InternalArray valueArray1 = innerMap1.valueArray();
+
+        Map<String, Integer> resultMap1 = new HashMap<>();
+        resultMap1.put(keyArray1.getString(0).toString(), 
valueArray1.getInt(0));
+        resultMap1.put(keyArray1.getString(1).toString(), 
valueArray1.getInt(1));
+
+        assertThat(resultMap1).containsEntry("test_1", 1);
+        assertThat(resultMap1).containsEntry("test_2", 2);
+        // Verify inner map 2
+        InternalMap innerMap2 = mapArray.getMap(1);
+        assertThat(innerMap2.size()).isEqualTo(2);
+        InternalArray keyArray2 = innerMap2.keyArray();
+        InternalArray valueArray2 = innerMap2.valueArray();
+
+        Map<String, Integer> resultMap2 = new HashMap<>();
+        resultMap2.put(keyArray2.getString(0).toString(), 
valueArray2.getInt(0));
+        resultMap2.put(keyArray2.getString(1).toString(), 
valueArray2.getInt(1));
+
+        assertThat(resultMap2).containsEntry("test_3", 3);
+        assertThat(resultMap2).containsEntry("test_4", 4);
+    }
+
+    @Test
+    public void testArrayWithNullElements() {
+        RowType table =
+                RowType.builder()
+                        .field("stringArray", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("intObjectArray", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<NullableArrayPojo> writer =
+                PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+
+        NullableArrayPojo pojo = new NullableArrayPojo();
+        pojo.stringArray = new String[] {"hello", null, "world"};
+        pojo.intObjectArray = new Integer[] {1, null, 3};
+
+        GenericRow row = writer.toRow(pojo);
+
+        InternalArray stringArray = row.getArray(0);
+        assertThat(stringArray.size()).isEqualTo(3);
+        assertThat(stringArray.getString(0).toString()).isEqualTo("hello");
+        assertThat(stringArray.isNullAt(1)).isTrue();
+        assertThat(stringArray.getString(2).toString()).isEqualTo("world");
+
+        InternalArray intArray = row.getArray(1);
+        assertThat(intArray.size()).isEqualTo(3);
+        assertThat(intArray.getInt(0)).isEqualTo(1);
+        assertThat(intArray.isNullAt(1)).isTrue();
+        assertThat(intArray.getInt(2)).isEqualTo(3);
+    }
+
+    @Test
+    public void testNullArrayField() {
+        RowType table =
+                RowType.builder().field("intArray", 
DataTypes.ARRAY(DataTypes.INT())).build();
+
+        PojoToRowConverter<SimpleArrayPojo> writer =
+                PojoToRowConverter.of(SimpleArrayPojo.class, table, table);
+
+        SimpleArrayPojo pojo = new SimpleArrayPojo();
+        pojo.intArray = null;
+
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.isNullAt(0)).isTrue();
+    }
+
+    /** POJO for testing all array types. */
+    @SuppressWarnings("unchecked")
+    public static class ArrayPojo {
+        public boolean[] primitiveBooleanArray;
+        public byte[] primitiveByteArray;
+        public short[] primitiveShortArray;
+        public int[] primitiveIntArray;
+        public long[] primitiveLongArray;
+        public float[] primitiveFloatArray;
+        public double[] primitiveDoubleArray;
+        public Boolean[] booleanArray;
+        public Byte[] byteArray;
+        public Short[] shortArray;
+        public Integer[] intArray;
+        public Long[] longArray;
+        public Float[] floatArray;
+        public Double[] doubleArray;
+        public String[] stringArray;
+        public BigDecimal[] decimalArray;
+        public LocalDate[] dateArray;
+        public LocalTime[] timeArray;
+        public LocalDateTime[] timestampArray;
+        public Instant[] timestampLtzArray;
+        public Integer[][] nestedIntArray;
+        public Map<String, Integer>[] mapArray;
+
+        public ArrayPojo() {}
+
+        public static ArrayPojo sample() {
+            ArrayPojo pojo = new ArrayPojo();
+            pojo.primitiveBooleanArray = new boolean[] {true, false};
+            pojo.primitiveByteArray = new byte[] {1, 2};
+            pojo.primitiveShortArray = new short[] {100, 200};
+            pojo.primitiveIntArray = new int[] {1000, 2000};
+            pojo.primitiveLongArray = new long[] {10000L, 20000L};
+            pojo.primitiveFloatArray = new float[] {1.1f, 2.2f};
+            pojo.primitiveDoubleArray = new double[] {1.11, 2.22};
+            pojo.booleanArray = new Boolean[] {true, false};
+            pojo.byteArray = new Byte[] {1, 2};
+            pojo.shortArray = new Short[] {100, 200};
+            pojo.intArray = new Integer[] {1000, 2000};
+            pojo.longArray = new Long[] {10000L, 20000L};
+            pojo.floatArray = new Float[] {1.1f, 2.2f};
+            pojo.doubleArray = new Double[] {1.11, 2.22};
+            pojo.stringArray = new String[] {"hello", "world"};
+            pojo.decimalArray =
+                    new BigDecimal[] {new BigDecimal("123.45"), new 
BigDecimal("678.90")};
+            pojo.dateArray = new LocalDate[] {LocalDate.of(2025, 1, 1), 
LocalDate.of(2025, 12, 31)};
+            pojo.timeArray = new LocalTime[] {LocalTime.MIDNIGHT, 
LocalTime.of(12, 30, 0)};
+            pojo.timestampArray =
+                    new LocalDateTime[] {
+                        LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+                        LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+                    };
+            pojo.timestampLtzArray =
+                    new Instant[] {
+                        Instant.parse("2025-01-01T00:00:00Z"), 
Instant.parse("2025-12-31T23:59:59Z")
+                    };
+            pojo.nestedIntArray =
+                    new Integer[][] {
+                        {1, 2},
+                        {3, 4, 5}
+                    };
+
+            pojo.mapArray =
+                    new HashMap[] {
+                        new HashMap<Object, Object>() {
+                            {
+                                put("test_1", 1);
+                                put("test_2", 2);
+                            }
+                        },
+                        new HashMap<Object, Object>() {
+                            {
+                                put("test_3", 3);
+                                put("test_4", 4);
+                            }
+                        }
+                    };
+            return pojo;
+        }
+    }
+
+    @Test
+    public void testCollectionFields() {
+        // PojoArrayToFlussArray must accept java.util.Collection (List, 
ArrayList, etc.)
+        RowType table =
+                RowType.builder()
+                        .field("listOfStrings", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .field("listOfInts", DataTypes.ARRAY(DataTypes.INT()))
+                        .field("listWithNull", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .build();
+
+        PojoToRowConverter<CollectionArrayPojo> writer =
+                PojoToRowConverter.of(CollectionArrayPojo.class, table, table);
+
+        CollectionArrayPojo pojo = new CollectionArrayPojo();
+        pojo.listOfStrings = Arrays.asList("alpha", "beta", "gamma");
+        pojo.listOfInts = new ArrayList<>(Arrays.asList(10, 20, 30));
+        pojo.listWithNull = Arrays.asList("a", null, "b");
+
+        GenericRow row = writer.toRow(pojo);
+
+        InternalArray strArray = row.getArray(0);
+        assertThat(strArray.size()).isEqualTo(3);
+        assertThat(strArray.getString(0).toString()).isEqualTo("alpha");
+        assertThat(strArray.getString(1).toString()).isEqualTo("beta");
+        assertThat(strArray.getString(2).toString()).isEqualTo("gamma");
+
+        InternalArray intArray = row.getArray(1);
+        assertThat(intArray.size()).isEqualTo(3);
+        assertThat(intArray.getInt(0)).isEqualTo(10);
+        assertThat(intArray.getInt(1)).isEqualTo(20);
+        assertThat(intArray.getInt(2)).isEqualTo(30);
+
+        InternalArray nullableArray = row.getArray(2);
+        assertThat(nullableArray.size()).isEqualTo(3);
+        assertThat(nullableArray.getString(0).toString()).isEqualTo("a");
+        assertThat(nullableArray.isNullAt(1)).isTrue();
+        assertThat(nullableArray.getString(2).toString()).isEqualTo("b");
+    }
+
+    /** POJO with Collection fields for array conversion tests. */
+    public static class CollectionArrayPojo {
+        public List<String> listOfStrings;
+        public ArrayList<Integer> listOfInts;
+        public List<String> listWithNull;
+
+        public CollectionArrayPojo() {}
+    }
+
+    /** POJO for testing arrays with null elements. */
+    public static class NullableArrayPojo {
+        public String[] stringArray;
+        public Integer[] intObjectArray;
+
+        public NullableArrayPojo() {}
+    }
+
+    /** Simple POJO for testing null array field. */
+    public static class SimpleArrayPojo {
+        public Integer[] intArray;
+
+        public SimpleArrayPojo() {}
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
index 4703c811c..d0cda979d 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
@@ -18,6 +18,8 @@
 package org.apache.fluss.client.converter;
 
 import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypes;
@@ -27,6 +29,8 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -157,23 +161,6 @@ public class PojoToRowConverterTest {
                 .hasMessageContaining("timestampLtzField");
     }
 
-    @Test
-    public void testUnsupportedSchemaFieldTypeThrows() {
-        RowType table =
-                RowType.builder()
-                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
-                        .build();
-        RowType proj = table;
-        assertThatThrownBy(
-                        () ->
-                                PojoToRowConverter.of(
-                                        ConvertersTestFixtures.MapPojo.class, 
table, proj))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Unsupported field type")
-                .hasMessageContaining("MAP")
-                .hasMessageContaining("mapField");
-    }
-
     @Test
     public void testTimestampPrecision3() {
         // Test with precision 3 milliseconds
@@ -296,6 +283,52 @@ public class PojoToRowConverterTest {
                 .isEqualTo(expectedInstant);
     }
 
+    @Test
+    public void testMapType() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<MapPojo> writer = 
PojoToRowConverter.of(MapPojo.class, table, table);
+
+        MapPojo pojo = MapPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+
+        // Verify map field
+        InternalMap mapField = row.getMap(0);
+        assertThat(mapField.size()).isEqualTo(2);
+
+        InternalArray keyArray = mapField.keyArray();
+        InternalArray valueArray = mapField.valueArray();
+
+        Map<String, Integer> resultMap = new HashMap<>();
+        resultMap.put(keyArray.getString(0).toString(), valueArray.getInt(0));
+        resultMap.put(keyArray.getString(1).toString(), valueArray.getInt(1));
+
+        assertThat(resultMap).containsEntry("test_1", 1);
+        assertThat(resultMap).containsEntry("test_2", 2);
+    }
+
+    /** POJO for testing map type. */
+    public static class MapPojo {
+        public Map<String, Integer> mapField;
+
+        public MapPojo() {}
+
+        public static MapPojo sample() {
+            MapPojo pojo = new MapPojo();
+            pojo.mapField =
+                    new HashMap<String, Integer>() {
+                        {
+                            put("test_1", 1);
+                            put("test_2", 2);
+                        }
+                    };
+            return pojo;
+        }
+    }
+
     private LocalDateTime truncateLocalDateTime(LocalDateTime ldt, int 
precision) {
         if (precision >= 9) {
             return ldt;
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
index 52d69eb2a..917d6de09 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
@@ -23,6 +23,9 @@ import org.apache.fluss.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -41,7 +44,7 @@ public class RowToPojoConverterTest {
 
         ConvertersTestFixtures.TestPojo pojo = 
ConvertersTestFixtures.TestPojo.sample();
         GenericRow row = writer.toRow(pojo);
-        assertThat(row.getFieldCount()).isEqualTo(15);
+        assertThat(row.getFieldCount()).isEqualTo(17);
 
         ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
         assertThat(back).isEqualTo(pojo);
@@ -126,4 +129,106 @@ public class RowToPojoConverterTest {
         ConvertersTestFixtures.CharacterFieldPojo back = scanner.fromRow(row);
         assertThat(back.charField).isEqualTo('A');
     }
+
+    @Test
+    public void testMapType() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<MapPojo> writer = 
PojoToRowConverter.of(MapPojo.class, table, table);
+        RowToPojoConverter<MapPojo> reader = 
RowToPojoConverter.of(MapPojo.class, table, table);
+
+        MapPojo pojo = MapPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+        MapPojo back = reader.fromRow(row);
+
+        // Verify map field
+        Map<Object, Object> mapField = back.mapField;
+        assertThat(mapField.size()).isEqualTo(2);
+        assertThat(mapField).containsEntry("test_1", 1);
+        assertThat(mapField).containsEntry("test_2", 2);
+    }
+
+    @Test
+    public void testNullMapField() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<MapPojo> writer = 
PojoToRowConverter.of(MapPojo.class, table, table);
+        RowToPojoConverter<MapPojo> reader = 
RowToPojoConverter.of(MapPojo.class, table, table);
+
+        MapPojo pojo = new MapPojo();
+        pojo.mapField = null;
+
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.isNullAt(0)).isTrue();
+
+        MapPojo back = reader.fromRow(row);
+        assertThat(back.mapField).isNull();
+    }
+
+    @Test
+    public void testMapWithNullValues() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<MapPojo> writer = 
PojoToRowConverter.of(MapPojo.class, table, table);
+        RowToPojoConverter<MapPojo> reader = 
RowToPojoConverter.of(MapPojo.class, table, table);
+
+        MapPojo pojo = new MapPojo();
+        pojo.mapField = new HashMap<>();
+        pojo.mapField.put("a", 1);
+        pojo.mapField.put("b", null);
+
+        GenericRow row = writer.toRow(pojo);
+        MapPojo back = reader.fromRow(row);
+
+        assertThat(back.mapField).containsEntry("a", 1);
+        assertThat(back.mapField).containsKey("b");
+        assertThat(back.mapField.get("b")).isNull();
+    }
+
+    @Test
+    public void testEmptyMap() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        PojoToRowConverter<MapPojo> writer = 
PojoToRowConverter.of(MapPojo.class, table, table);
+        RowToPojoConverter<MapPojo> reader = 
RowToPojoConverter.of(MapPojo.class, table, table);
+
+        MapPojo pojo = new MapPojo();
+        pojo.mapField = new HashMap<>();
+
+        GenericRow row = writer.toRow(pojo);
+        MapPojo back = reader.fromRow(row);
+
+        assertThat(back.mapField).isEmpty();
+    }
+
+    /** POJO for testing map type. */
+    public static class MapPojo {
+        public Map<Object, Object> mapField;
+
+        public MapPojo() {}
+
+        public static MapPojo sample() {
+            MapPojo pojo = new MapPojo();
+            pojo.mapField =
+                    new HashMap<Object, Object>() {
+                        {
+                            put("test_1", 1);
+                            put("test_2", 2);
+                        }
+                    };
+            return pojo;
+        }
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
index c7724799b..e6b963ead 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
@@ -48,7 +48,9 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -501,6 +503,341 @@ public class FlussTypedClientITCase extends 
ClientToServerITCaseBase {
         }
     }
 
+    // 
-------------------------------------------------------------------------
+    // Complex-type (ARRAY / MAP) POJO and helpers
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * POJO covering the complex-type fields that are new in this PR.
+     *
+     * <ul>
+     *   <li>ARRAY&lt;INT&gt; — Integer[]
+     *   <li>ARRAY&lt;STRING&gt; — String[]
+     *   <li>ARRAY&lt;ARRAY&lt;INT&gt;&gt; — Integer[][]
+     *   <li>MAP&lt;STRING, INT&gt; — Map&lt;String, Integer&gt;
+     *   <li>MAP&lt;STRING, ARRAY&lt;INT&gt;&gt; — Map&lt;String, Object[]&gt; 
(generic type is
+     *       erased at runtime; inner arrays always come back as Object[])
+     * </ul>
+     */
+    public static class ComplexTypesPojo {
+        public Integer id;
+        public Integer[] intArray;
+        public String[] strArray;
+        public Integer[][] nestedArray;
+        public Map<String, Integer> simpleMap;
+        // Map values that are arrays are always deserialized as Object[] 
(type erasure)
+        public Map<String, Object[]> mapOfArrays;
+
+        public ComplexTypesPojo() {}
+
+        /** Constructor that sets only the id; all array/map fields default to 
null. */
+        public ComplexTypesPojo(Integer id) {
+            this.id = id;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ComplexTypesPojo that = (ComplexTypesPojo) o;
+            return Objects.equals(id, that.id)
+                    && Arrays.equals(intArray, that.intArray)
+                    && Arrays.equals(strArray, that.strArray)
+                    && Arrays.deepEquals(nestedArray, that.nestedArray)
+                    && Objects.equals(simpleMap, that.simpleMap)
+                    && mapsOfArraysEqual(mapOfArrays, that.mapOfArrays);
+        }
+
+        private static boolean mapsOfArraysEqual(Map<String, Object[]> a, 
Map<String, Object[]> b) {
+            if (a == b) {
+                return true;
+            }
+            if (a == null || b == null || a.size() != b.size()) {
+                return false;
+            }
+            for (Map.Entry<String, Object[]> e : a.entrySet()) {
+                if (!Arrays.equals(e.getValue(), b.get(e.getKey()))) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(id, simpleMap);
+            result = 31 * result + Arrays.hashCode(intArray);
+            result = 31 * result + Arrays.hashCode(strArray);
+            result = 31 * result + Arrays.deepHashCode(nestedArray);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "ComplexTypesPojo{"
+                    + "id="
+                    + id
+                    + ", intArray="
+                    + Arrays.toString(intArray)
+                    + ", strArray="
+                    + Arrays.toString(strArray)
+                    + ", nestedArray="
+                    + Arrays.deepToString(nestedArray)
+                    + ", simpleMap="
+                    + simpleMap
+                    + ", mapOfArrays="
+                    + mapOfArrays
+                    + '}';
+        }
+    }
+
+    /** Primary-key lookup POJO for {@link ComplexTypesPojo}. */
+    public static class ComplexTypesLookupKey {
+        public Integer id;
+
+        public ComplexTypesLookupKey() {}
+
+        public ComplexTypesLookupKey(Integer id) {
+            this.id = id;
+        }
+    }
+
+    private static Schema complexTypesLogSchema() {
+        return Schema.newBuilder()
+                .column("id", DataTypes.INT())
+                .column("intArray", DataTypes.ARRAY(DataTypes.INT()))
+                .column("strArray", DataTypes.ARRAY(DataTypes.STRING()))
+                .column("nestedArray", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                .column("simpleMap", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                .column(
+                        "mapOfArrays",
+                        DataTypes.MAP(DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.INT())))
+                .build();
+    }
+
+    private static Schema complexTypesPkSchema() {
+        return Schema.newBuilder()
+                .column("id", DataTypes.INT())
+                .column("intArray", DataTypes.ARRAY(DataTypes.INT()))
+                .column("strArray", DataTypes.ARRAY(DataTypes.STRING()))
+                .column("nestedArray", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                .column("simpleMap", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                .column(
+                        "mapOfArrays",
+                        DataTypes.MAP(DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.INT())))
+                .primaryKey("id")
+                .build();
+    }
+
+    private static ComplexTypesPojo newComplexTypesPojo(int i) {
+        ComplexTypesPojo p = new ComplexTypesPojo();
+        p.id = i;
+        p.intArray = new Integer[] {i, i + 1, i + 2};
+        p.strArray = new String[] {"s" + i, "s" + (i + 1)};
+        p.nestedArray = new Integer[][] {{i, i + 1}, {i + 2, i + 3}};
+        p.simpleMap = new HashMap<>();
+        p.simpleMap.put("k" + i, i * 10);
+        p.simpleMap.put("k" + (i + 1), (i + 1) * 10);
+        p.mapOfArrays = new HashMap<>();
+        p.mapOfArrays.put("arr" + i, new Object[] {i * 100, i * 100 + 1});
+        p.mapOfArrays.put("arr" + (i + 1), new Object[] {(i + 1) * 100, (i + 
1) * 100 + 1});
+        return p;
+    }
+
+    @Test
+    void testComplexTypesAppendWriteAndScan() throws Exception {
+        TablePath path = TablePath.of("pojo_db", "complex_types_log");
+        TableDescriptor td =
+                
TableDescriptor.builder().schema(complexTypesLogSchema()).distributedBy(1).build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            TypedAppendWriter<ComplexTypesPojo> writer =
+                    
table.newAppend().createTypedWriter(ComplexTypesPojo.class);
+
+            List<ComplexTypesPojo> expected = new ArrayList<>();
+            for (int i = 1; i <= 3; i++) {
+                ComplexTypesPojo p = newComplexTypesPojo(i);
+                expected.add(p);
+                writer.append(p);
+            }
+
+            // also write a row with null array / map fields to verify null 
propagation
+            ComplexTypesPojo nullFieldPojo = new ComplexTypesPojo(99);
+            expected.add(nullFieldPojo);
+            writer.append(nullFieldPojo);
+
+            writer.flush();
+
+            TypedLogScanner<ComplexTypesPojo> scanner =
+                    
table.newScan().createTypedLogScanner(ComplexTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+
+            List<ComplexTypesPojo> actual = new ArrayList<>();
+            while (actual.size() < expected.size()) {
+                TypedScanRecords<ComplexTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+                for (TypedScanRecord<ComplexTypesPojo> r : recs) {
+                    
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+                    actual.add(r.getValue());
+                }
+            }
+
+            // verify all elements (custom equals handles deep array 
comparison)
+            assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+            // spot-check the null-field row
+            ComplexTypesPojo nullBack =
+                    actual.stream().filter(p -> p.id == 
99).findFirst().orElse(null);
+            assertThat(nullBack).isNotNull();
+            assertThat(nullBack.intArray).isNull();
+            assertThat(nullBack.strArray).isNull();
+            assertThat(nullBack.nestedArray).isNull();
+            assertThat(nullBack.simpleMap).isNull();
+            assertThat(nullBack.mapOfArrays).isNull();
+        }
+    }
+
+    @Test
+    void testComplexTypesUpsertAndLookup() throws Exception {
+        TablePath path = TablePath.of("pojo_db", "complex_types_pk");
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(complexTypesPkSchema())
+                        .distributedBy(1, "id")
+                        .build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            TypedUpsertWriter<ComplexTypesPojo> writer =
+                    
table.newUpsert().createTypedWriter(ComplexTypesPojo.class);
+
+            ComplexTypesPojo original = newComplexTypesPojo(10);
+            writer.upsert(original).get();
+
+            // overwrite with updated arrays/maps
+            ComplexTypesPojo updated = new ComplexTypesPojo();
+            updated.id = 10;
+            updated.intArray = new Integer[] {100, 200, 300};
+            updated.strArray = new String[] {"updated"};
+            updated.nestedArray = new Integer[][] {{9, 8}, {7}};
+            updated.simpleMap = new HashMap<>();
+            updated.simpleMap.put("new_key", 999);
+            updated.mapOfArrays = new HashMap<>();
+            updated.mapOfArrays.put("new_arr", new Object[] {-1, -2});
+            writer.upsert(updated).get();
+            writer.flush();
+
+            // verify via typed lookup
+            RowType tableSchema = table.getTableInfo().getRowType();
+            RowToPojoConverter<ComplexTypesPojo> rowConv =
+                    RowToPojoConverter.of(ComplexTypesPojo.class, tableSchema, 
tableSchema);
+
+            TypedLookuper<ComplexTypesLookupKey> lookuper =
+                    
table.newLookup().createTypedLookuper(ComplexTypesLookupKey.class);
+            ComplexTypesPojo lookedUp =
+                    rowConv.fromRow(
+                            lookuper.lookup(new 
ComplexTypesLookupKey(10)).get().getSingletonRow());
+
+            assertThat(lookedUp.id).isEqualTo(10);
+            assertThat(lookedUp.intArray).isEqualTo(new Integer[] {100, 200, 
300});
+            assertThat(lookedUp.strArray).isEqualTo(new String[] {"updated"});
+            assertThat(Arrays.deepEquals(lookedUp.nestedArray, new Integer[][] 
{{9, 8}, {7}}))
+                    .isTrue();
+            assertThat(lookedUp.simpleMap).containsEntry("new_key", 999);
+            assertThat(lookedUp.mapOfArrays).containsKey("new_arr");
+            // inner arrays are deserialized as Object[] due to type erasure
+            assertThat(lookedUp.mapOfArrays.get("new_arr")).isEqualTo(new 
Object[] {-1, -2});
+
+            // verify non-existent key returns null row
+            assertThat(lookuper.lookup(new 
ComplexTypesLookupKey(999)).get().getSingletonRow())
+                    .isNull();
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // List-field POJO — verifies that java.util.List fields work on both paths
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * POJO where ARRAY columns are mapped to {@link List} fields instead of 
Java arrays. Both the
+     * write path ({@link PojoArrayToFlussArray}) and the read path ({@link 
FlussArrayToPojoArray})
+     * support {@link java.util.Collection} types.
+     */
+    public static class ListTypesPojo {
+        public Integer id;
+        public List<Integer> intList;
+        public List<String> strList;
+        public List<Integer> nullableIntList;
+
+        public ListTypesPojo() {}
+    }
+
+    @Test
+    void testListFieldsRoundTrip() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("intList", DataTypes.ARRAY(DataTypes.INT()))
+                        .column("strList", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("nullableIntList", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .build();
+
+        TablePath path = TablePath.of("pojo_db", "list_fields_log");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(1).build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            TypedAppendWriter<ListTypesPojo> writer =
+                    table.newAppend().createTypedWriter(ListTypesPojo.class);
+
+            ListTypesPojo p1 = new ListTypesPojo();
+            p1.id = 1;
+            p1.intList = new ArrayList<>(Arrays.asList(10, 20, 30));
+            p1.strList = Arrays.asList("alpha", "beta");
+            p1.nullableIntList = Arrays.asList(1, null, 3);
+
+            ListTypesPojo p2 = new ListTypesPojo();
+            p2.id = 2;
+            p2.intList = new ArrayList<>(); // empty list
+            p2.strList = Arrays.asList("only");
+            p2.nullableIntList = null; // null list field
+
+            writer.append(p1);
+            writer.append(p2);
+            writer.flush();
+
+            TypedLogScanner<ListTypesPojo> scanner =
+                    table.newScan().createTypedLogScanner(ListTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+
+            Map<Integer, ListTypesPojo> actual = new HashMap<>();
+            while (actual.size() < 2) {
+                TypedScanRecords<ListTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+                for (TypedScanRecord<ListTypesPojo> r : recs) {
+                    actual.put(r.getValue().id, r.getValue());
+                }
+            }
+
+            // verify row 1
+            ListTypesPojo back1 = actual.get(1);
+            assertThat(back1.intList).isInstanceOf(List.class);
+            assertThat(back1.intList).containsExactly(10, 20, 30);
+            assertThat(back1.strList).containsExactly("alpha", "beta");
+            assertThat(back1.nullableIntList).containsExactly(1, null, 3);
+
+            // verify row 2
+            ListTypesPojo back2 = actual.get(2);
+            assertThat(back2.intList).isInstanceOf(List.class);
+            assertThat(back2.intList).isEmpty();
+            assertThat(back2.strList).containsExactly("only");
+            assertThat(back2.nullableIntList).isNull();
+        }
+    }
+
     @Test
     void testTypedPartialUpdates() throws Exception {
         // Use full PK schema and update a subset of fields

Reply via email to