This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 7452a5284 [client] Introduce util helpers for conversion between POJO 
and InternalRow (#1378)
7452a5284 is described below

commit 7452a5284bc94184ce1373e8a53ab12158b865dd
Author: Giannis Polyzos <[email protected]>
AuthorDate: Mon Oct 6 04:13:32 2025 +0300

    [client] Introduce util helpers for conversion between POJO and InternalRow 
(#1378)
---
 .../fluss/client/converter/ConverterCommons.java   | 123 ++++++++++
 .../fluss/client/converter/PojoToRowConverter.java | 240 +++++++++++++++++++
 .../apache/fluss/client/converter/PojoType.java    | 226 ++++++++++++++++++
 .../fluss/client/converter/RowToPojoConverter.java | 265 +++++++++++++++++++++
 .../client/converter/ConvertersTestFixtures.java   | 260 ++++++++++++++++++++
 .../client/converter/PojoToRowConverterTest.java   | 171 +++++++++++++
 .../client/converter/RowToPojoConverterTest.java   | 129 ++++++++++
 7 files changed, 1414 insertions(+)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
new file mode 100644
index 000000000..f7766d36d
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.RowType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.fluss.client.converter.RowToPojoConverter.charLengthExceptionMessage;
+
+/**
+ * Internal shared utilities for POJO and Fluss InternalRow conversions.
+ *
+ * <p>Provides validation helpers and common functions used by 
PojoToRowConverter and
+ * RowToPojoConverter (e.g., supported Java types per Fluss DataType, 
projection/table validation,
+ * and text conversion helpers).
+ */
+final class ConverterCommons {
+
+    static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = 
createSupportedTypes();
+
+    private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
+        Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
+        map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
+        map.put(DataTypeRoot.TINYINT, setOf(Byte.class));
+        map.put(DataTypeRoot.SMALLINT, setOf(Short.class));
+        map.put(DataTypeRoot.INTEGER, setOf(Integer.class));
+        map.put(DataTypeRoot.BIGINT, setOf(Long.class));
+        map.put(DataTypeRoot.FLOAT, setOf(Float.class));
+        map.put(DataTypeRoot.DOUBLE, setOf(Double.class));
+        map.put(DataTypeRoot.CHAR, setOf(String.class, Character.class));
+        map.put(DataTypeRoot.STRING, setOf(String.class, Character.class));
+        map.put(DataTypeRoot.BINARY, setOf(byte[].class));
+        map.put(DataTypeRoot.BYTES, setOf(byte[].class));
+        map.put(DataTypeRoot.DECIMAL, setOf(BigDecimal.class));
+        map.put(DataTypeRoot.DATE, setOf(java.time.LocalDate.class));
+        map.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, 
setOf(java.time.LocalTime.class));
+        map.put(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, 
setOf(java.time.LocalDateTime.class));
+        map.put(
+                DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                setOf(java.time.Instant.class, 
java.time.OffsetDateTime.class));
+        return map;
+    }
+
+    static void validatePojoMatchesTable(PojoType<?> pojoType, RowType 
tableSchema) {
+        Set<String> pojoNames = pojoType.getProperties().keySet();
+        Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
+        if (!pojoNames.equals(tableNames)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "POJO fields %s must exactly match table schema 
fields %s.",
+                            pojoNames, tableNames));
+        }
+        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
+            String name = tableSchema.getFieldNames().get(i);
+            DataType dt = tableSchema.getTypeAt(i);
+            PojoType.Property prop = pojoType.getProperty(name);
+            validateCompatibility(dt, prop);
+        }
+    }
+
+    static void validateProjectionSubset(RowType projection, RowType 
tableSchema) {
+        Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
+        for (String n : projection.getFieldNames()) {
+            if (!tableNames.contains(n)) {
+                throw new IllegalArgumentException(
+                        "Projection field '" + n + "' is not part of table 
schema.");
+            }
+        }
+    }
+
+    static void validateCompatibility(DataType fieldType, PojoType.Property 
prop) {
+        Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
+        Class<?> actual = prop.type;
+        if (supported == null) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported field type %s for field %s.",
+                            fieldType.getTypeRoot(), prop.name));
+        }
+        if (!supported.contains(actual)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field '%s' in POJO has Java type %s which is 
incompatible with Fluss type %s. Supported Java types: %s",
+                            prop.name, actual.getName(), 
fieldType.getTypeRoot(), supported));
+        }
+    }
+
+    static BinaryString toBinaryStringForText(Object v, String fieldName, 
DataTypeRoot root) {
+        final String s = String.valueOf(v);
+        if (root == DataTypeRoot.CHAR && s.length() != 1) {
+            throw new 
IllegalArgumentException(charLengthExceptionMessage(fieldName, s.length()));
+        }
+        return BinaryString.fromString(s);
+    }
+
+    static Set<Class<?>> setOf(Class<?>... javaTypes) {
+        return new HashSet<>(Arrays.asList(javaTypes));
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
new file mode 100644
index 000000000..c2a3dca08
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+/**
+ * Converter for writer path: converts POJO instances to Fluss InternalRow 
according to a (possibly
+ * projected) RowType. Validation is done against the full table schema.
+ */
+public final class PojoToRowConverter<T> {
+
+    private final PojoType<T> pojoType;
+    private final RowType tableSchema;
+    private final RowType projection;
+    private final List<String> projectionFieldNames;
+    private final FieldToRow[] fieldConverters; // index corresponds to 
projection position
+
+    private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, 
RowType projection) {
+        this.pojoType = pojoType;
+        this.tableSchema = tableSchema;
+        this.projection = projection;
+        this.projectionFieldNames = projection.getFieldNames();
+        ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+        ConverterCommons.validateProjectionSubset(projection, tableSchema);
+        this.fieldConverters = createFieldConverters();
+    }
+
+    public static <T> PojoToRowConverter<T> of(
+            Class<T> pojoClass, RowType tableSchema, RowType projection) {
+        return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema, 
projection);
+    }
+
+    public GenericRow toRow(@Nullable T pojo) {
+        if (pojo == null) {
+            return null;
+        }
+        GenericRow row = new GenericRow(projection.getFieldCount());
+        for (int i = 0; i < fieldConverters.length; i++) {
+            Object v;
+            try {
+                v = fieldConverters[i].readAndConvert(pojo);
+            } catch (RuntimeException re) {
+                throw re;
+            } catch (Exception e) {
+                throw new IllegalStateException(
+                        "Failed to access field '"
+                                + projectionFieldNames.get(i)
+                                + "' from POJO "
+                                + pojoType.getPojoClass().getName(),
+                        e);
+            }
+            row.setField(i, v);
+        }
+        return row;
+    }
+
+    private FieldToRow[] createFieldConverters() {
+        FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
+        for (int i = 0; i < projection.getFieldCount(); i++) {
+            String fieldName = projectionFieldNames.get(i);
+            DataType fieldType = projection.getTypeAt(i);
+            PojoType.Property prop = requireProperty(fieldName);
+            ConverterCommons.validateCompatibility(fieldType, prop);
+            arr[i] = createFieldConverter(prop, fieldType);
+        }
+        return arr;
+    }
+
+    private PojoType.Property requireProperty(String fieldName) {
+        PojoType.Property p = pojoType.getProperty(fieldName);
+        if (p == null) {
+            throw new IllegalArgumentException(
+                    "Field '"
+                            + fieldName
+                            + "' not found in POJO class "
+                            + pojoType.getPojoClass().getName()
+                            + ".");
+        }
+        return p;
+    }
+
+    private static FieldToRow createFieldConverter(PojoType.Property prop, 
DataType fieldType) {
+        switch (fieldType.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BINARY:
+            case BYTES:
+                return prop::read;
+            case CHAR:
+            case STRING:
+                return (obj) -> convertTextValue(fieldType, prop, 
prop.read(obj));
+            case DECIMAL:
+                return (obj) -> convertDecimalValue((DecimalType) fieldType, 
prop, prop.read(obj));
+            case DATE:
+                return (obj) -> convertDateValue(prop, prop.read(obj));
+            case TIME_WITHOUT_TIME_ZONE:
+                return (obj) -> convertTimeValue(prop, prop.read(obj));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (obj) -> convertTimestampNtzValue(prop, prop.read(obj));
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return (obj) -> convertTimestampLtzValue(prop, prop.read(obj));
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported field type %s for field %s.",
+                                fieldType.getTypeRoot(), prop.name));
+        }
+    }
+
+    /**
+     * Converts a text value (String or Character) from a POJO property to 
Fluss BinaryString.
+     *
+     * <p>For CHAR columns, enforces that the text has exactly one character. 
Nulls are passed
+     * through.
+     */
+    private static @Nullable BinaryString convertTextValue(
+            DataType fieldType, PojoType.Property prop, @Nullable Object v) {
+        if (v == null) {
+            return null;
+        }
+        return ConverterCommons.toBinaryStringForText(v, prop.name, 
fieldType.getTypeRoot());
+    }
+
+    /** Converts a BigDecimal POJO property to Fluss Decimal respecting 
precision and scale. */
+    private static @Nullable Decimal convertDecimalValue(
+            DecimalType decimalType, PojoType.Property prop, @Nullable Object 
v) {
+        if (v == null) {
+            return null;
+        }
+        if (!(v instanceof BigDecimal)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field %s is not a BigDecimal. Cannot convert to 
Decimal.", prop.name));
+        }
+        return Decimal.fromBigDecimal(
+                (BigDecimal) v, decimalType.getPrecision(), 
decimalType.getScale());
+    }
+
+    /** Converts a LocalDate POJO property to number of days since epoch. */
+    private static @Nullable Integer convertDateValue(PojoType.Property prop, 
@Nullable Object v) {
+        if (v == null) {
+            return null;
+        }
+        if (!(v instanceof LocalDate)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field %s is not a LocalDate. Cannot convert to 
int days.", prop.name));
+        }
+        return (int) ((LocalDate) v).toEpochDay();
+    }
+
+    /** Converts a LocalTime POJO property to milliseconds of day. */
+    private static @Nullable Integer convertTimeValue(PojoType.Property prop, 
@Nullable Object v) {
+        if (v == null) {
+            return null;
+        }
+        if (!(v instanceof LocalTime)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field %s is not a LocalTime. Cannot convert to 
int millis.",
+                            prop.name));
+        }
+        LocalTime t = (LocalTime) v;
+        return (int) (t.toNanoOfDay() / 1_000_000);
+    }
+
+    /** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
+    private static @Nullable TimestampNtz convertTimestampNtzValue(
+            PojoType.Property prop, @Nullable Object v) {
+        if (v == null) {
+            return null;
+        }
+        if (!(v instanceof LocalDateTime)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field %s is not a LocalDateTime. Cannot convert 
to TimestampNtz.",
+                            prop.name));
+        }
+        return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
+    }
+
+    /** Converts an Instant or OffsetDateTime POJO property to Fluss 
TimestampLtz (UTC based). */
+    private static @Nullable TimestampLtz convertTimestampLtzValue(
+            PojoType.Property prop, @Nullable Object v) {
+        if (v == null) {
+            return null;
+        }
+        if (v instanceof Instant) {
+            return TimestampLtz.fromInstant((Instant) v);
+        } else if (v instanceof OffsetDateTime) {
+            return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
+        }
+        throw new IllegalArgumentException(
+                String.format(
+                        "Field %s is not an Instant or OffsetDateTime. Cannot 
convert to TimestampLtz.",
+                        prop.name));
+    }
+
+    private interface FieldToRow {
+        Object readAndConvert(Object pojo) throws Exception;
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
new file mode 100644
index 000000000..78af49700
--- /dev/null
+++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Internal representation of a POJO type, used to validate POJO requirements 
and to provide unified
+ * accessors for reading/writing properties.
+ */
+final class PojoType<T> {
+    private final Class<T> pojoClass;
+    private final Constructor<T> defaultConstructor;
+    private final Map<String, Property> properties; // property name -> 
property
+
+    private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, 
Property> props) {
+        this.pojoClass = pojoClass;
+        this.defaultConstructor = ctor;
+        this.properties = java.util.Collections.unmodifiableMap(new 
LinkedHashMap<>(props));
+    }
+
+    Class<T> getPojoClass() {
+        return pojoClass;
+    }
+
+    Constructor<T> getDefaultConstructor() {
+        return defaultConstructor;
+    }
+
+    Map<String, Property> getProperties() {
+        return properties;
+    }
+
+    @Nullable
+    Property getProperty(String name) {
+        return properties.get(name);
+    }
+
+    static <T> PojoType<T> of(Class<T> pojoClass) {
+        validatePublicClass(pojoClass);
+        Constructor<T> ctor = requirePublicDefaultConstructor(pojoClass);
+
+        Map<String, Field> allFields = discoverAllInstanceFields(pojoClass);
+        Map<String, Method> getters = discoverGetters(pojoClass);
+        Map<String, Method> setters = discoverSetters(pojoClass);
+
+        Map<String, Property> props = new LinkedHashMap<>();
+        for (Map.Entry<String, Field> e : allFields.entrySet()) {
+            String name = e.getKey();
+            Field field = e.getValue();
+            if (field.getType().isPrimitive()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "POJO class %s has primitive field '%s' of 
type %s. Primitive types are not allowed; all fields must be nullable (use 
wrapper types).",
+                                pojoClass.getName(), name, 
field.getType().getName()));
+            }
+            boolean publicField = Modifier.isPublic(field.getModifiers());
+            Method getter = getters.get(name);
+            Method setter = setters.get(name);
+            if (!publicField) {
+                // When not a public field, require both getter and setter
+                if (getter == null || setter == null) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "POJO class %s field '%s' must be public 
or have both getter and setter (get%s/set%s).",
+                                    pojoClass.getName(), name, 
capitalize(name), capitalize(name)));
+                }
+            }
+            props.put(
+                    name,
+                    new Property(
+                            name, field.getType(), publicField ? field : null, 
getter, setter));
+        }
+
+        return new PojoType<>(pojoClass, ctor, props);
+    }
+
+    private static <T> void validatePublicClass(Class<T> pojoClass) {
+        if (!Modifier.isPublic(pojoClass.getModifiers())) {
+            throw new IllegalArgumentException(
+                    String.format("POJO class %s must be public.", 
pojoClass.getName()));
+        }
+    }
+
+    private static <T> Constructor<T> requirePublicDefaultConstructor(Class<T> 
pojoClass) {
+        try {
+            Constructor<T> ctor = pojoClass.getDeclaredConstructor();
+            if (!Modifier.isPublic(ctor.getModifiers())) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "POJO class %s must have a public default 
constructor.",
+                                pojoClass.getName()));
+            }
+            return ctor;
+        } catch (NoSuchMethodException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "POJO class %s must have a public default 
constructor.",
+                            pojoClass.getName()),
+                    e);
+        }
+    }
+
+    private static Map<String, Field> discoverAllInstanceFields(Class<?> 
clazz) {
+        Map<String, Field> fields = new LinkedHashMap<>();
+        Class<?> c = clazz;
+        while (c != null && c != Object.class) {
+            for (Field f : c.getDeclaredFields()) {
+                int mod = f.getModifiers();
+                if (Modifier.isStatic(mod) || Modifier.isTransient(mod)) {
+                    continue;
+                }
+                f.setAccessible(true);
+                fields.putIfAbsent(f.getName(), f);
+            }
+            c = c.getSuperclass();
+        }
+        return fields;
+    }
+
+    private static Map<String, Method> discoverGetters(Class<?> clazz) {
+        Map<String, Method> getters = new HashMap<>();
+        for (Method m : clazz.getMethods()) { // public methods incl. inherited
+            if (m.getParameterCount() == 0
+                    && m.getName().startsWith("get")
+                    && !m.getReturnType().equals(void.class)) {
+                String prop = decapitalize(m.getName().substring(3));
+                getters.put(prop, m);
+            }
+        }
+        return getters;
+    }
+
+    private static Map<String, Method> discoverSetters(Class<?> clazz) {
+        Map<String, Method> setters = new HashMap<>();
+        for (Method m : clazz.getMethods()) { // public methods incl. inherited
+            if (m.getParameterCount() == 1
+                    && m.getName().startsWith("set")
+                    && m.getReturnType().equals(void.class)) {
+                String prop = decapitalize(m.getName().substring(3));
+                setters.put(prop, m);
+            }
+        }
+        return setters;
+    }
+
+    private static String capitalize(String s) {
+        if (s == null || s.isEmpty()) {
+            return s;
+        }
+        return s.substring(0, 1).toUpperCase(Locale.ROOT) + s.substring(1);
+    }
+
+    private static String decapitalize(String s) {
+        if (s == null || s.isEmpty()) {
+            return s;
+        }
+        return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
+    }
+
+    static final class Property {
+        final String name;
+        final Class<?> type;
+        @Nullable final Field publicField;
+        @Nullable final Method getter;
+        @Nullable final Method setter;
+
+        Property(
+                String name,
+                Class<?> type,
+                @Nullable Field publicField,
+                @Nullable Method getter,
+                @Nullable Method setter) {
+            this.name = Objects.requireNonNull(name, "name");
+            this.type = Objects.requireNonNull(type, "type");
+            this.publicField = publicField;
+            this.getter = getter;
+            this.setter = setter;
+        }
+
+        Object read(Object instance) throws Exception {
+            if (publicField != null) {
+                return publicField.get(instance);
+            } else if (getter != null) {
+                return getter.invoke(instance);
+            } else {
+                throw new IllegalStateException("No readable accessor for 
property '" + name + "'");
+            }
+        }
+
+        void write(Object instance, @Nullable Object value) throws Exception {
+            if (publicField != null) {
+                publicField.set(instance, value);
+            } else if (setter != null) {
+                setter.invoke(instance, value);
+            } else {
+                throw new IllegalStateException("No writable accessor for 
property '" + name + "'");
+            }
+        }
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
new file mode 100644
index 000000000..a396285f5
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+/**
+ * Converter for scanner path: converts InternalRow (possibly projected) to 
POJO, leaving
+ * non-projected fields as null on the POJO. Validation is done against the 
full table schema.
+ */
+public final class RowToPojoConverter<T> {
+
+    private final PojoType<T> pojoType;
+    private final RowType tableSchema;
+    private final RowType projection;
+    private final List<String> projectionFieldNames;
+    private final RowToField[] rowReaders;
+
+    private RowToPojoConverter(PojoType<T> pojoType, RowType tableSchema, 
RowType projection) {
+        this.pojoType = pojoType;
+        this.tableSchema = tableSchema;
+        this.projection = projection;
+        this.projectionFieldNames = projection.getFieldNames();
+        ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+        ConverterCommons.validateProjectionSubset(projection, tableSchema);
+        this.rowReaders = createRowReaders();
+    }
+
+    public static <T> RowToPojoConverter<T> of(
+            Class<T> pojoClass, RowType tableSchema, RowType projection) {
+        return new RowToPojoConverter<>(PojoType.of(pojoClass), tableSchema, 
projection);
+    }
+
+    public T fromRow(@Nullable InternalRow row) {
+        if (row == null) {
+            return null;
+        }
+        try {
+            T pojo = pojoType.getDefaultConstructor().newInstance();
+            for (int i = 0; i < rowReaders.length; i++) {
+                if (!row.isNullAt(i)) {
+                    Object v = rowReaders[i].convert(row, i);
+                    PojoType.Property prop = 
pojoType.getProperty(projectionFieldNames.get(i));
+                    if (v != null) {
+                        prop.write(pojo, v);
+                    }
+                }
+            }
+            return pojo;
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+            String message =
+                    String.format(
+                            "Failed to instantiate POJO class %s using the 
public default constructor. Cause: %s",
+                            pojoType.getPojoClass().getName(), e.getMessage());
+            throw new IllegalStateException(message, e);
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to set field on POJO class " + 
pojoType.getPojoClass().getName(), e);
+        }
+    }
+
+    private RowToField[] createRowReaders() {
+        RowToField[] arr = new RowToField[projection.getFieldCount()];
+        for (int i = 0; i < projection.getFieldCount(); i++) {
+            String name = projectionFieldNames.get(i);
+            DataType type = projection.getTypeAt(i);
+            PojoType.Property prop = requireProperty(name);
+            ConverterCommons.validateCompatibility(type, prop);
+            arr[i] = createRowReader(type, prop);
+        }
+        return arr;
+    }
+
+    private PojoType.Property requireProperty(String fieldName) {
+        PojoType.Property p = pojoType.getProperty(fieldName);
+        if (p == null) {
+            throw new IllegalArgumentException(
+                    "Field '"
+                            + fieldName
+                            + "' not found in POJO class "
+                            + pojoType.getPojoClass().getName()
+                            + ".");
+        }
+        return p;
+    }
+
+    private static RowToField createRowReader(DataType fieldType, 
PojoType.Property prop) {
+        switch (fieldType.getTypeRoot()) {
+            case BOOLEAN:
+                return InternalRow::getBoolean;
+            case TINYINT:
+                return InternalRow::getByte;
+            case SMALLINT:
+                return InternalRow::getShort;
+            case INTEGER:
+                return InternalRow::getInt;
+            case BIGINT:
+                return InternalRow::getLong;
+            case FLOAT:
+                return InternalRow::getFloat;
+            case DOUBLE:
+                return InternalRow::getDouble;
+            case CHAR:
+            case STRING:
+                return (row, pos) -> convertTextValue(fieldType, prop, 
row.getString(pos));
+            case BINARY:
+            case BYTES:
+                return InternalRow::getBytes;
+            case DECIMAL:
+                return (row, pos) -> convertDecimalValue((DecimalType) 
fieldType, row, pos);
+            case DATE:
+                return RowToPojoConverter::convertDateValue;
+            case TIME_WITHOUT_TIME_ZONE:
+                return RowToPojoConverter::convertTimeValue;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(fieldType);
+                    return (row, pos) -> convertTimestampNtzValue(precision, 
row, pos);
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    final int precision = 
DataTypeChecks.getPrecision(fieldType);
+                    return (row, pos) -> convertTimestampLtzValue(precision, 
prop, row, pos);
+                }
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported field type %s for field %s.",
+                                fieldType.getTypeRoot(), prop.name));
+        }
+    }
+
+    /**
+     * Converts a text value (CHAR/STRING) read from an InternalRow into the 
target Java type
+     * declared by the POJO property.
+     *
+     * <p>Supported target types are String and Character. For CHAR columns, 
this enforces that the
+     * value has exactly one character. For Character targets, empty strings 
are rejected.
+     *
+     * @param fieldType Fluss column DataType (must be CHAR or STRING)
+     * @param prop The target POJO property (used for type and error messages)
+     * @param s The BinaryString read from the row
+     * @return Converted Java value (String or Character)
+     * @throws IllegalArgumentException if the target type is unsupported or 
constraints are
+     *     violated
+     */
+    private static Object convertTextValue(
+            DataType fieldType, PojoType.Property prop, BinaryString s) {
+        String v = s.toString();
+        if (prop.type == String.class) {
+            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
+                throw new IllegalArgumentException(
+                        charLengthExceptionMessage(prop.name, v.length()));
+            }
+            return v;
+        } else if (prop.type == Character.class) {
+            if (v.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Field %s expects Character, but the string 
value is empty.",
+                                prop.name));
+            }
+            if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() != 
1) {
+                throw new IllegalArgumentException(
+                        charLengthExceptionMessage(prop.name, v.length()));
+            }
+            return v.charAt(0);
+        }
+        throw new IllegalArgumentException(
+                String.format(
+                        "Field %s is not a String or Character. Cannot convert 
from string.",
+                        prop.name));
+    }
+
+    public static String charLengthExceptionMessage(String fieldName, int 
length) {
+        return String.format(
+                "Field %s expects exactly one character for CHAR type, got 
length %d.",
+                fieldName, length);
+    }
+
+    /**
+     * Converts a DECIMAL value from an InternalRow into a BigDecimal using 
the column's precision
+     * and scale. The row position is assumed non-null (caller checks), so 
this never returns null.
+     */
+    private static BigDecimal convertDecimalValue(
+            DecimalType decimalType, InternalRow row, int pos) {
+        Decimal d = row.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale());
+        return d.toBigDecimal();
+    }
+
+    /** Converts a DATE value stored as int days since epoch to a LocalDate. */
+    private static LocalDate convertDateValue(InternalRow row, int pos) {
+        return LocalDate.ofEpochDay(row.getInt(pos));
+    }
+
+    /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to 
a LocalTime. */
+    private static LocalTime convertTimeValue(InternalRow row, int pos) {
+        return LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L);
+    }
+
+    /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime 
honoring precision. */
+    private static Object convertTimestampNtzValue(int precision, InternalRow 
row, int pos) {
+        TimestampNtz t = row.getTimestampNtz(pos, precision);
+        return t.toLocalDateTime();
+    }
+
+    /**
+     * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or 
OffsetDateTime in UTC,
+     * depending on the target POJO property type.
+     */
+    private static Object convertTimestampLtzValue(
+            int precision, PojoType.Property prop, InternalRow row, int pos) {
+        TimestampLtz t = row.getTimestampLtz(pos, precision);
+        if (prop.type == Instant.class) {
+            return t.toInstant();
+        } else if (prop.type == OffsetDateTime.class) {
+            return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
+        }
+        throw new IllegalArgumentException(
+                String.format(
+                        "Field %s is not an Instant or OffsetDateTime. Cannot 
convert from TimestampData.",
+                        prop.name));
+    }
+
+    private interface RowToField {
+        Object convert(InternalRow row, int pos) throws Exception;
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
new file mode 100644
index 000000000..8863b9f23
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** Shared fixtures and helper POJOs for converter tests. */
+public final class ConvertersTestFixtures {
+
+    private ConvertersTestFixtures() {}
+
+    public static RowType fullSchema() {
+        return RowType.builder()
+                .field("booleanField", DataTypes.BOOLEAN())
+                .field("byteField", DataTypes.TINYINT())
+                .field("shortField", DataTypes.SMALLINT())
+                .field("intField", DataTypes.INT())
+                .field("longField", DataTypes.BIGINT())
+                .field("floatField", DataTypes.FLOAT())
+                .field("doubleField", DataTypes.DOUBLE())
+                .field("stringField", DataTypes.STRING())
+                .field("bytesField", DataTypes.BYTES())
+                .field("decimalField", DataTypes.DECIMAL(10, 2))
+                .field("dateField", DataTypes.DATE())
+                .field("timeField", DataTypes.TIME())
+                .field("timestampField", DataTypes.TIMESTAMP())
+                .field("timestampLtzField", DataTypes.TIMESTAMP_LTZ())
+                .field("offsetDateTimeField", DataTypes.TIMESTAMP_LTZ())
+                .build();
+    }
+
+    // ----------------------- Helper POJOs -----------------------
+    /** Test POJO used for end-to-end converter tests. */
+    public static class TestPojo {
+        public Boolean booleanField;
+        public Byte byteField;
+        public Short shortField;
+        public Integer intField;
+        public Long longField;
+        public Float floatField;
+        public Double doubleField;
+        public String stringField;
+        public byte[] bytesField;
+        public BigDecimal decimalField;
+        public LocalDate dateField;
+        public LocalTime timeField;
+        public LocalDateTime timestampField;
+        public Instant timestampLtzField;
+        public OffsetDateTime offsetDateTimeField;
+
+        public TestPojo() {}
+
+        public TestPojo(
+                Boolean booleanField,
+                Byte byteField,
+                Short shortField,
+                Integer intField,
+                Long longField,
+                Float floatField,
+                Double doubleField,
+                String stringField,
+                byte[] bytesField,
+                BigDecimal decimalField,
+                LocalDate dateField,
+                LocalTime timeField,
+                LocalDateTime timestampField,
+                Instant timestampLtzField,
+                OffsetDateTime offsetDateTimeField) {
+            this.booleanField = booleanField;
+            this.byteField = byteField;
+            this.shortField = shortField;
+            this.intField = intField;
+            this.longField = longField;
+            this.floatField = floatField;
+            this.doubleField = doubleField;
+            this.stringField = stringField;
+            this.bytesField = bytesField;
+            this.decimalField = decimalField;
+            this.dateField = dateField;
+            this.timeField = timeField;
+            this.timestampField = timestampField;
+            this.timestampLtzField = timestampLtzField;
+            this.offsetDateTimeField = offsetDateTimeField;
+        }
+
+        public static TestPojo sample() {
+            return new TestPojo(
+                    true,
+                    (byte) 42,
+                    (short) 1234,
+                    123456,
+                    9876543210L,
+                    3.14f,
+                    2.71828,
+                    "Hello, World!",
+                    new byte[] {1, 2, 3, 4, 5},
+                    new BigDecimal("123.45"),
+                    LocalDate.of(2025, 7, 23),
+                    LocalTime.of(15, 1, 30),
+                    LocalDateTime.of(2025, 7, 23, 15, 1, 30),
+                    Instant.parse("2025-07-23T15:01:30Z"),
+                    OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0, 
ZoneOffset.UTC));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestPojo testPojo = (TestPojo) o;
+            return Objects.equals(booleanField, testPojo.booleanField)
+                    && Objects.equals(byteField, testPojo.byteField)
+                    && Objects.equals(shortField, testPojo.shortField)
+                    && Objects.equals(intField, testPojo.intField)
+                    && Objects.equals(longField, testPojo.longField)
+                    && Objects.equals(floatField, testPojo.floatField)
+                    && Objects.equals(doubleField, testPojo.doubleField)
+                    && Objects.equals(stringField, testPojo.stringField)
+                    && Arrays.equals(bytesField, testPojo.bytesField)
+                    && Objects.equals(decimalField, testPojo.decimalField)
+                    && Objects.equals(dateField, testPojo.dateField)
+                    && Objects.equals(timeField, testPojo.timeField)
+                    && Objects.equals(timestampField, testPojo.timestampField)
+                    && Objects.equals(timestampLtzField, 
testPojo.timestampLtzField)
+                    && Objects.equals(offsetDateTimeField, 
testPojo.offsetDateTimeField);
+        }
+
+        @Override
+        public int hashCode() {
+            int result =
+                    Objects.hash(
+                            booleanField,
+                            byteField,
+                            shortField,
+                            intField,
+                            longField,
+                            floatField,
+                            doubleField,
+                            stringField,
+                            decimalField,
+                            dateField,
+                            timeField,
+                            timestampField,
+                            timestampLtzField,
+                            offsetDateTimeField);
+            result = 31 * result + Arrays.hashCode(bytesField);
+            return result;
+        }
+    }
+
+    /** A minimal POJO with a subset of fields for projection tests. */
+    public static class PartialTestPojo {
+        public Boolean booleanField;
+        public Integer intField;
+        public String stringField;
+
+        public PartialTestPojo() {}
+    }
+
+    /** POJO without public default constructor, used for validation tests. */
+    public static class NoDefaultConstructorPojo {
+        public Integer intField;
+
+        public NoDefaultConstructorPojo(int v) {
+            this.intField = v;
+        }
+    }
+
+    /** POJO whose default constructor throws, used to test instantiation 
error handling. */
+    public static class ThrowingCtorPojo {
+        public Integer intField;
+
+        public ThrowingCtorPojo() {
+            throw new RuntimeException("ctor failure");
+        }
+    }
+
+    /** POJO with wrong Java type for DECIMAL field, used for negative tests. 
*/
+    public static class DecimalWrongTypePojo {
+        public String decimalField;
+
+        public DecimalWrongTypePojo() {}
+    }
+
+    /** POJO with wrong Java type for DATE field, used for negative tests. */
+    public static class DateWrongTypePojo {
+        public String dateField;
+
+        public DateWrongTypePojo() {}
+    }
+
+    /** POJO with wrong Java type for TIME field, used for negative tests. */
+    public static class TimeWrongTypePojo {
+        public String timeField;
+
+        public TimeWrongTypePojo() {}
+    }
+
+    /** POJO with wrong Java type for TIMESTAMP_NTZ field, used for negative 
tests. */
+    public static class TimestampWrongTypePojo {
+        public String timestampField;
+
+        public TimestampWrongTypePojo() {}
+    }
+
+    /** POJO with wrong Java type for TIMESTAMP_LTZ field, used for negative 
tests. */
+    public static class TimestampLtzWrongTypePojo {
+        public String timestampLtzField;
+
+        public TimestampLtzWrongTypePojo() {}
+    }
+
+    /** POJO with unsupported Map field, used for negative tests. */
+    public static class MapPojo {
+        public Map<String, Integer> mapField;
+
+        public MapPojo() {}
+    }
+
+    /** POJO with Character field for CHAR/STRING handling tests. */
+    public static class CharacterFieldPojo {
+        public Character charField;
+
+        public CharacterFieldPojo() {}
+
+        public CharacterFieldPojo(Character c) {
+            this.charField = c;
+        }
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
new file mode 100644
index 000000000..11deb35ac
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link PojoToRowConverter}. */
+public class PojoToRowConverterTest {
+
+    @Test
+    public void testNullHandlingToRow() {
+        RowType table = ConvertersTestFixtures.fullSchema();
+        RowType projection = table;
+        PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+                PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+        assertThat(writer.toRow(null)).isNull();
+    }
+
+    @Test
+    public void testProjectionSubsetWrites() {
+        RowType table = ConvertersTestFixtures.fullSchema();
+        RowType projection =
+                RowType.builder()
+                        .field("booleanField", DataTypes.BOOLEAN())
+                        .field("intField", DataTypes.INT())
+                        .field("stringField", DataTypes.STRING())
+                        .build();
+
+        PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+                PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+        ConvertersTestFixtures.TestPojo pojo = 
ConvertersTestFixtures.TestPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.getFieldCount()).isEqualTo(3);
+        assertThat(row.getBoolean(0)).isTrue();
+        assertThat(row.getInt(1)).isEqualTo(123456);
+        assertThat(row.getString(2).toString()).isEqualTo("Hello, World!");
+    }
+
+    @Test
+    public void testPojoMustExactlyMatchTableSchema() {
+        RowType table =
+                RowType.builder()
+                        .field("booleanField", DataTypes.BOOLEAN())
+                        .field("intField", DataTypes.INT())
+                        .field("stringField", DataTypes.STRING())
+                        .field("extraField", DataTypes.DOUBLE())
+                        .build();
+        RowType projection = table;
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.PartialTestPojo.class,
+                                        table,
+                                        projection))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("must exactly match table schema");
+    }
+
+    @Test
+    public void testPojoNoDefaultCtorFails() {
+        RowType table = RowType.builder().field("intField", 
DataTypes.INT()).build();
+        RowType proj = table;
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.NoDefaultConstructorPojo.class,
+                                        table,
+                                        proj))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("public default constructor");
+    }
+
+    @Test
+    public void testDecimalTypeValidationAtCreation() {
+        RowType table = RowType.builder().field("decimalField", 
DataTypes.DECIMAL(10, 2)).build();
+        RowType proj = table;
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.DecimalWrongTypePojo.class,
+                                        table,
+                                        proj))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("incompatible with Fluss type")
+                .hasMessageContaining("decimalField");
+    }
+
+    @Test
+    public void testDateTimeTypeValidationAtCreation() {
+        RowType dateSchema = RowType.builder().field("dateField", 
DataTypes.DATE()).build();
+        RowType timeSchema = RowType.builder().field("timeField", 
DataTypes.TIME()).build();
+        RowType tsSchema = RowType.builder().field("timestampField", 
DataTypes.TIMESTAMP()).build();
+        RowType ltzSchema =
+                RowType.builder().field("timestampLtzField", 
DataTypes.TIMESTAMP_LTZ()).build();
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.DateWrongTypePojo.class,
+                                        dateSchema,
+                                        dateSchema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("incompatible with Fluss type")
+                .hasMessageContaining("dateField");
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.TimeWrongTypePojo.class,
+                                        timeSchema,
+                                        timeSchema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("incompatible with Fluss type")
+                .hasMessageContaining("timeField");
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.TimestampWrongTypePojo.class,
+                                        tsSchema,
+                                        tsSchema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("incompatible with Fluss type")
+                .hasMessageContaining("timestampField");
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        
ConvertersTestFixtures.TimestampLtzWrongTypePojo.class,
+                                        ltzSchema,
+                                        ltzSchema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("incompatible with Fluss type")
+                .hasMessageContaining("timestampLtzField");
+    }
+
+    @Test
+    public void testUnsupportedSchemaFieldTypeThrows() {
+        RowType table =
+                RowType.builder()
+                        .field("mapField", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+        RowType proj = table;
+        assertThatThrownBy(
+                        () ->
+                                PojoToRowConverter.of(
+                                        ConvertersTestFixtures.MapPojo.class, 
table, proj))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported field type")
+                .hasMessageContaining("MAP")
+                .hasMessageContaining("mapField");
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
new file mode 100644
index 000000000..52d69eb2a
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link RowToPojoConverter}. */
+public class RowToPojoConverterTest {
+
+    @Test
+    public void testRoundTripFullSchema() {
+        RowType table = ConvertersTestFixtures.fullSchema();
+        RowType projection = table;
+
+        PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+                PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+        RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+                RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+
+        ConvertersTestFixtures.TestPojo pojo = 
ConvertersTestFixtures.TestPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.getFieldCount()).isEqualTo(15);
+
+        ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
+        assertThat(back).isEqualTo(pojo);
+    }
+
+    @Test
+    public void testNullHandlingFromRow() {
+        RowType table = ConvertersTestFixtures.fullSchema();
+        RowType projection = table;
+        RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+                RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+        assertThat(scanner.fromRow(null)).isNull();
+    }
+
+    @Test
+    public void testProjectionSubsetReads() {
+        RowType table = ConvertersTestFixtures.fullSchema();
+        RowType projection =
+                RowType.builder()
+                        .field("booleanField", DataTypes.BOOLEAN())
+                        .field("intField", DataTypes.INT())
+                        .field("stringField", DataTypes.STRING())
+                        .build();
+
+        PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+                PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+        RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+                RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class, 
table, projection);
+
+        ConvertersTestFixtures.TestPojo pojo = 
ConvertersTestFixtures.TestPojo.sample();
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.getFieldCount()).isEqualTo(3);
+        assertThat(row.getBoolean(0)).isTrue();
+        assertThat(row.getInt(1)).isEqualTo(123456);
+        assertThat(row.getString(2).toString()).isEqualTo("Hello, World!");
+
+        ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
+        assertThat(back.booleanField).isTrue();
+        assertThat(back.intField).isEqualTo(123456);
+        assertThat(back.stringField).isEqualTo("Hello, World!");
+        // non-projected remain null
+        assertThat(back.byteField).isNull();
+        assertThat(back.shortField).isNull();
+        assertThat(back.longField).isNull();
+        assertThat(back.floatField).isNull();
+        assertThat(back.doubleField).isNull();
+        assertThat(back.bytesField).isNull();
+        assertThat(back.decimalField).isNull();
+        assertThat(back.dateField).isNull();
+        assertThat(back.timeField).isNull();
+        assertThat(back.timestampField).isNull();
+        assertThat(back.timestampLtzField).isNull();
+        assertThat(back.offsetDateTimeField).isNull();
+    }
+
+    @Test
+    public void testFromRowThrowsWhenDefaultConstructorThrows() {
+        RowType table = RowType.builder().field("intField", 
DataTypes.INT()).build();
+        RowType proj = table;
+        RowToPojoConverter<ConvertersTestFixtures.ThrowingCtorPojo> scanner =
+                
RowToPojoConverter.of(ConvertersTestFixtures.ThrowingCtorPojo.class, table, 
proj);
+        GenericRow row = new GenericRow(1);
+        row.setField(0, 1);
+        assertThatThrownBy(() -> scanner.fromRow(row))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Failed to instantiate POJO class")
+                
.hasMessageContaining(ConvertersTestFixtures.ThrowingCtorPojo.class.getName());
+    }
+
+    @Test
+    public void testCharacterFieldRoundTrip() {
+        RowType table = RowType.builder().field("charField", 
DataTypes.STRING()).build();
+        RowType proj = table;
+        PojoToRowConverter<ConvertersTestFixtures.CharacterFieldPojo> writer =
+                
PojoToRowConverter.of(ConvertersTestFixtures.CharacterFieldPojo.class, table, 
proj);
+        RowToPojoConverter<ConvertersTestFixtures.CharacterFieldPojo> scanner =
+                
RowToPojoConverter.of(ConvertersTestFixtures.CharacterFieldPojo.class, table, 
proj);
+        ConvertersTestFixtures.CharacterFieldPojo pojo =
+                new ConvertersTestFixtures.CharacterFieldPojo('A');
+        GenericRow row = writer.toRow(pojo);
+        assertThat(row.getString(0).toString()).isEqualTo("A");
+        ConvertersTestFixtures.CharacterFieldPojo back = scanner.fromRow(row);
+        assertThat(back.charField).isEqualTo('A');
+    }
+}

Reply via email to