This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 7452a5284 [client] Introduce util helpers for conversion between POJO
and InternalRow (#1378)
7452a5284 is described below
commit 7452a5284bc94184ce1373e8a53ab12158b865dd
Author: Giannis Polyzos <[email protected]>
AuthorDate: Mon Oct 6 04:13:32 2025 +0300
[client] Introduce util helpers for conversion between POJO and InternalRow
(#1378)
---
.../fluss/client/converter/ConverterCommons.java | 123 ++++++++++
.../fluss/client/converter/PojoToRowConverter.java | 240 +++++++++++++++++++
.../apache/fluss/client/converter/PojoType.java | 226 ++++++++++++++++++
.../fluss/client/converter/RowToPojoConverter.java | 265 +++++++++++++++++++++
.../client/converter/ConvertersTestFixtures.java | 260 ++++++++++++++++++++
.../client/converter/PojoToRowConverterTest.java | 171 +++++++++++++
.../client/converter/RowToPojoConverterTest.java | 129 ++++++++++
7 files changed, 1414 insertions(+)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
new file mode 100644
index 000000000..f7766d36d
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.RowType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.fluss.client.converter.RowToPojoConverter.charLengthExceptionMessage;
+
+/**
+ * Internal shared utilities for POJO and Fluss InternalRow conversions.
+ *
+ * <p>Provides validation helpers and common functions used by
PojoToRowConverter and
+ * RowToPojoConverter (e.g., supported Java types per Fluss DataType,
projection/table validation,
+ * and text conversion helpers).
+ */
+final class ConverterCommons {
+
+ static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES =
createSupportedTypes();
+
+ private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
+ Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
+ map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
+ map.put(DataTypeRoot.TINYINT, setOf(Byte.class));
+ map.put(DataTypeRoot.SMALLINT, setOf(Short.class));
+ map.put(DataTypeRoot.INTEGER, setOf(Integer.class));
+ map.put(DataTypeRoot.BIGINT, setOf(Long.class));
+ map.put(DataTypeRoot.FLOAT, setOf(Float.class));
+ map.put(DataTypeRoot.DOUBLE, setOf(Double.class));
+ map.put(DataTypeRoot.CHAR, setOf(String.class, Character.class));
+ map.put(DataTypeRoot.STRING, setOf(String.class, Character.class));
+ map.put(DataTypeRoot.BINARY, setOf(byte[].class));
+ map.put(DataTypeRoot.BYTES, setOf(byte[].class));
+ map.put(DataTypeRoot.DECIMAL, setOf(BigDecimal.class));
+ map.put(DataTypeRoot.DATE, setOf(java.time.LocalDate.class));
+ map.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
setOf(java.time.LocalTime.class));
+ map.put(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
setOf(java.time.LocalDateTime.class));
+ map.put(
+ DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ setOf(java.time.Instant.class,
java.time.OffsetDateTime.class));
+ return map;
+ }
+
+ static void validatePojoMatchesTable(PojoType<?> pojoType, RowType
tableSchema) {
+ Set<String> pojoNames = pojoType.getProperties().keySet();
+ Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
+ if (!pojoNames.equals(tableNames)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO fields %s must exactly match table schema
fields %s.",
+ pojoNames, tableNames));
+ }
+ for (int i = 0; i < tableSchema.getFieldCount(); i++) {
+ String name = tableSchema.getFieldNames().get(i);
+ DataType dt = tableSchema.getTypeAt(i);
+ PojoType.Property prop = pojoType.getProperty(name);
+ validateCompatibility(dt, prop);
+ }
+ }
+
+ static void validateProjectionSubset(RowType projection, RowType
tableSchema) {
+ Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
+ for (String n : projection.getFieldNames()) {
+ if (!tableNames.contains(n)) {
+ throw new IllegalArgumentException(
+ "Projection field '" + n + "' is not part of table
schema.");
+ }
+ }
+ }
+
+ static void validateCompatibility(DataType fieldType, PojoType.Property
prop) {
+ Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
+ Class<?> actual = prop.type;
+ if (supported == null) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported field type %s for field %s.",
+ fieldType.getTypeRoot(), prop.name));
+ }
+ if (!supported.contains(actual)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field '%s' in POJO has Java type %s which is
incompatible with Fluss type %s. Supported Java types: %s",
+ prop.name, actual.getName(),
fieldType.getTypeRoot(), supported));
+ }
+ }
+
+ static BinaryString toBinaryStringForText(Object v, String fieldName,
DataTypeRoot root) {
+ final String s = String.valueOf(v);
+ if (root == DataTypeRoot.CHAR && s.length() != 1) {
+ throw new
IllegalArgumentException(charLengthExceptionMessage(fieldName, s.length()));
+ }
+ return BinaryString.fromString(s);
+ }
+
+ static Set<Class<?>> setOf(Class<?>... javaTypes) {
+ return new HashSet<>(Arrays.asList(javaTypes));
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
new file mode 100644
index 000000000..c2a3dca08
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+/**
+ * Converter for writer path: converts POJO instances to Fluss InternalRow
according to a (possibly
+ * projected) RowType. Validation is done against the full table schema.
+ */
+public final class PojoToRowConverter<T> {
+
+ private final PojoType<T> pojoType;
+ private final RowType tableSchema;
+ private final RowType projection;
+ private final List<String> projectionFieldNames;
+ private final FieldToRow[] fieldConverters; // index corresponds to
projection position
+
+ private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema,
RowType projection) {
+ this.pojoType = pojoType;
+ this.tableSchema = tableSchema;
+ this.projection = projection;
+ this.projectionFieldNames = projection.getFieldNames();
+ ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+ ConverterCommons.validateProjectionSubset(projection, tableSchema);
+ this.fieldConverters = createFieldConverters();
+ }
+
+ public static <T> PojoToRowConverter<T> of(
+ Class<T> pojoClass, RowType tableSchema, RowType projection) {
+ return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema,
projection);
+ }
+
+ public GenericRow toRow(@Nullable T pojo) {
+ if (pojo == null) {
+ return null;
+ }
+ GenericRow row = new GenericRow(projection.getFieldCount());
+ for (int i = 0; i < fieldConverters.length; i++) {
+ Object v;
+ try {
+ v = fieldConverters[i].readAndConvert(pojo);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to access field '"
+ + projectionFieldNames.get(i)
+ + "' from POJO "
+ + pojoType.getPojoClass().getName(),
+ e);
+ }
+ row.setField(i, v);
+ }
+ return row;
+ }
+
+ private FieldToRow[] createFieldConverters() {
+ FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
+ for (int i = 0; i < projection.getFieldCount(); i++) {
+ String fieldName = projectionFieldNames.get(i);
+ DataType fieldType = projection.getTypeAt(i);
+ PojoType.Property prop = requireProperty(fieldName);
+ ConverterCommons.validateCompatibility(fieldType, prop);
+ arr[i] = createFieldConverter(prop, fieldType);
+ }
+ return arr;
+ }
+
+ private PojoType.Property requireProperty(String fieldName) {
+ PojoType.Property p = pojoType.getProperty(fieldName);
+ if (p == null) {
+ throw new IllegalArgumentException(
+ "Field '"
+ + fieldName
+ + "' not found in POJO class "
+ + pojoType.getPojoClass().getName()
+ + ".");
+ }
+ return p;
+ }
+
+ private static FieldToRow createFieldConverter(PojoType.Property prop,
DataType fieldType) {
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case BYTES:
+ return prop::read;
+ case CHAR:
+ case STRING:
+ return (obj) -> convertTextValue(fieldType, prop,
prop.read(obj));
+ case DECIMAL:
+ return (obj) -> convertDecimalValue((DecimalType) fieldType,
prop, prop.read(obj));
+ case DATE:
+ return (obj) -> convertDateValue(prop, prop.read(obj));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (obj) -> convertTimeValue(prop, prop.read(obj));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (obj) -> convertTimestampNtzValue(prop, prop.read(obj));
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return (obj) -> convertTimestampLtzValue(prop, prop.read(obj));
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported field type %s for field %s.",
+ fieldType.getTypeRoot(), prop.name));
+ }
+ }
+
+ /**
+ * Converts a text value (String or Character) from a POJO property to
Fluss BinaryString.
+ *
+ * <p>For CHAR columns, enforces that the text has exactly one character.
Nulls are passed
+ * through.
+ */
+ private static @Nullable BinaryString convertTextValue(
+ DataType fieldType, PojoType.Property prop, @Nullable Object v) {
+ if (v == null) {
+ return null;
+ }
+ return ConverterCommons.toBinaryStringForText(v, prop.name,
fieldType.getTypeRoot());
+ }
+
+ /** Converts a BigDecimal POJO property to Fluss Decimal respecting
precision and scale. */
+ private static @Nullable Decimal convertDecimalValue(
+ DecimalType decimalType, PojoType.Property prop, @Nullable Object
v) {
+ if (v == null) {
+ return null;
+ }
+ if (!(v instanceof BigDecimal)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a BigDecimal. Cannot convert to
Decimal.", prop.name));
+ }
+ return Decimal.fromBigDecimal(
+ (BigDecimal) v, decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ /** Converts a LocalDate POJO property to number of days since epoch. */
+ private static @Nullable Integer convertDateValue(PojoType.Property prop,
@Nullable Object v) {
+ if (v == null) {
+ return null;
+ }
+ if (!(v instanceof LocalDate)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a LocalDate. Cannot convert to
int days.", prop.name));
+ }
+ return (int) ((LocalDate) v).toEpochDay();
+ }
+
+ /** Converts a LocalTime POJO property to milliseconds of day. */
+ private static @Nullable Integer convertTimeValue(PojoType.Property prop,
@Nullable Object v) {
+ if (v == null) {
+ return null;
+ }
+ if (!(v instanceof LocalTime)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a LocalTime. Cannot convert to
int millis.",
+ prop.name));
+ }
+ LocalTime t = (LocalTime) v;
+ return (int) (t.toNanoOfDay() / 1_000_000);
+ }
+
+ /** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
+ private static @Nullable TimestampNtz convertTimestampNtzValue(
+ PojoType.Property prop, @Nullable Object v) {
+ if (v == null) {
+ return null;
+ }
+ if (!(v instanceof LocalDateTime)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a LocalDateTime. Cannot convert
to TimestampNtz.",
+ prop.name));
+ }
+ return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
+ }
+
+ /** Converts an Instant or OffsetDateTime POJO property to Fluss
TimestampLtz (UTC based). */
+ private static @Nullable TimestampLtz convertTimestampLtzValue(
+ PojoType.Property prop, @Nullable Object v) {
+ if (v == null) {
+ return null;
+ }
+ if (v instanceof Instant) {
+ return TimestampLtz.fromInstant((Instant) v);
+ } else if (v instanceof OffsetDateTime) {
+ return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not an Instant or OffsetDateTime. Cannot
convert to TimestampLtz.",
+ prop.name));
+ }
+
+ private interface FieldToRow {
+ Object readAndConvert(Object pojo) throws Exception;
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
new file mode 100644
index 000000000..78af49700
--- /dev/null
+++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Internal representation of a POJO type, used to validate POJO requirements
and to provide unified
+ * accessors for reading/writing properties.
+ */
+final class PojoType<T> {
+ private final Class<T> pojoClass;
+ private final Constructor<T> defaultConstructor;
+ private final Map<String, Property> properties; // property name ->
property
+
+ private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String,
Property> props) {
+ this.pojoClass = pojoClass;
+ this.defaultConstructor = ctor;
+ this.properties = java.util.Collections.unmodifiableMap(new
LinkedHashMap<>(props));
+ }
+
+ Class<T> getPojoClass() {
+ return pojoClass;
+ }
+
+ Constructor<T> getDefaultConstructor() {
+ return defaultConstructor;
+ }
+
+ Map<String, Property> getProperties() {
+ return properties;
+ }
+
+ @Nullable
+ Property getProperty(String name) {
+ return properties.get(name);
+ }
+
+ static <T> PojoType<T> of(Class<T> pojoClass) {
+ validatePublicClass(pojoClass);
+ Constructor<T> ctor = requirePublicDefaultConstructor(pojoClass);
+
+ Map<String, Field> allFields = discoverAllInstanceFields(pojoClass);
+ Map<String, Method> getters = discoverGetters(pojoClass);
+ Map<String, Method> setters = discoverSetters(pojoClass);
+
+ Map<String, Property> props = new LinkedHashMap<>();
+ for (Map.Entry<String, Field> e : allFields.entrySet()) {
+ String name = e.getKey();
+ Field field = e.getValue();
+ if (field.getType().isPrimitive()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO class %s has primitive field '%s' of
type %s. Primitive types are not allowed; all fields must be nullable (use
wrapper types).",
+ pojoClass.getName(), name,
field.getType().getName()));
+ }
+ boolean publicField = Modifier.isPublic(field.getModifiers());
+ Method getter = getters.get(name);
+ Method setter = setters.get(name);
+ if (!publicField) {
+ // When not a public field, require both getter and setter
+ if (getter == null || setter == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO class %s field '%s' must be public
or have both getter and setter (get%s/set%s).",
+ pojoClass.getName(), name,
capitalize(name), capitalize(name)));
+ }
+ }
+ props.put(
+ name,
+ new Property(
+ name, field.getType(), publicField ? field : null,
getter, setter));
+ }
+
+ return new PojoType<>(pojoClass, ctor, props);
+ }
+
+ private static <T> void validatePublicClass(Class<T> pojoClass) {
+ if (!Modifier.isPublic(pojoClass.getModifiers())) {
+ throw new IllegalArgumentException(
+ String.format("POJO class %s must be public.",
pojoClass.getName()));
+ }
+ }
+
+ private static <T> Constructor<T> requirePublicDefaultConstructor(Class<T>
pojoClass) {
+ try {
+ Constructor<T> ctor = pojoClass.getDeclaredConstructor();
+ if (!Modifier.isPublic(ctor.getModifiers())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO class %s must have a public default
constructor.",
+ pojoClass.getName()));
+ }
+ return ctor;
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO class %s must have a public default
constructor.",
+ pojoClass.getName()),
+ e);
+ }
+ }
+
+ private static Map<String, Field> discoverAllInstanceFields(Class<?>
clazz) {
+ Map<String, Field> fields = new LinkedHashMap<>();
+ Class<?> c = clazz;
+ while (c != null && c != Object.class) {
+ for (Field f : c.getDeclaredFields()) {
+ int mod = f.getModifiers();
+ if (Modifier.isStatic(mod) || Modifier.isTransient(mod)) {
+ continue;
+ }
+ f.setAccessible(true);
+ fields.putIfAbsent(f.getName(), f);
+ }
+ c = c.getSuperclass();
+ }
+ return fields;
+ }
+
+ private static Map<String, Method> discoverGetters(Class<?> clazz) {
+ Map<String, Method> getters = new HashMap<>();
+ for (Method m : clazz.getMethods()) { // public methods incl. inherited
+ if (m.getParameterCount() == 0
+ && m.getName().startsWith("get")
+ && !m.getReturnType().equals(void.class)) {
+ String prop = decapitalize(m.getName().substring(3));
+ getters.put(prop, m);
+ }
+ }
+ return getters;
+ }
+
+ private static Map<String, Method> discoverSetters(Class<?> clazz) {
+ Map<String, Method> setters = new HashMap<>();
+ for (Method m : clazz.getMethods()) { // public methods incl. inherited
+ if (m.getParameterCount() == 1
+ && m.getName().startsWith("set")
+ && m.getReturnType().equals(void.class)) {
+ String prop = decapitalize(m.getName().substring(3));
+ setters.put(prop, m);
+ }
+ }
+ return setters;
+ }
+
+ private static String capitalize(String s) {
+ if (s == null || s.isEmpty()) {
+ return s;
+ }
+ return s.substring(0, 1).toUpperCase(Locale.ROOT) + s.substring(1);
+ }
+
+ private static String decapitalize(String s) {
+ if (s == null || s.isEmpty()) {
+ return s;
+ }
+ return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
+ }
+
+ static final class Property {
+ final String name;
+ final Class<?> type;
+ @Nullable final Field publicField;
+ @Nullable final Method getter;
+ @Nullable final Method setter;
+
+ Property(
+ String name,
+ Class<?> type,
+ @Nullable Field publicField,
+ @Nullable Method getter,
+ @Nullable Method setter) {
+ this.name = Objects.requireNonNull(name, "name");
+ this.type = Objects.requireNonNull(type, "type");
+ this.publicField = publicField;
+ this.getter = getter;
+ this.setter = setter;
+ }
+
+ Object read(Object instance) throws Exception {
+ if (publicField != null) {
+ return publicField.get(instance);
+ } else if (getter != null) {
+ return getter.invoke(instance);
+ } else {
+ throw new IllegalStateException("No readable accessor for
property '" + name + "'");
+ }
+ }
+
+ void write(Object instance, @Nullable Object value) throws Exception {
+ if (publicField != null) {
+ publicField.set(instance, value);
+ } else if (setter != null) {
+ setter.invoke(instance, value);
+ } else {
+ throw new IllegalStateException("No writable accessor for
property '" + name + "'");
+ }
+ }
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
new file mode 100644
index 000000000..a396285f5
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+/**
+ * Converter for scanner path: converts InternalRow (possibly projected) to
POJO, leaving
+ * non-projected fields as null on the POJO. Validation is done against the
full table schema.
+ */
+public final class RowToPojoConverter<T> {
+
+ private final PojoType<T> pojoType;
+ private final RowType tableSchema;
+ private final RowType projection;
+ private final List<String> projectionFieldNames;
+ private final RowToField[] rowReaders;
+
+ private RowToPojoConverter(PojoType<T> pojoType, RowType tableSchema,
RowType projection) {
+ this.pojoType = pojoType;
+ this.tableSchema = tableSchema;
+ this.projection = projection;
+ this.projectionFieldNames = projection.getFieldNames();
+ ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+ ConverterCommons.validateProjectionSubset(projection, tableSchema);
+ this.rowReaders = createRowReaders();
+ }
+
+ public static <T> RowToPojoConverter<T> of(
+ Class<T> pojoClass, RowType tableSchema, RowType projection) {
+ return new RowToPojoConverter<>(PojoType.of(pojoClass), tableSchema,
projection);
+ }
+
+ public T fromRow(@Nullable InternalRow row) {
+ if (row == null) {
+ return null;
+ }
+ try {
+ T pojo = pojoType.getDefaultConstructor().newInstance();
+ for (int i = 0; i < rowReaders.length; i++) {
+ if (!row.isNullAt(i)) {
+ Object v = rowReaders[i].convert(row, i);
+ PojoType.Property prop =
pojoType.getProperty(projectionFieldNames.get(i));
+ if (v != null) {
+ prop.write(pojo, v);
+ }
+ }
+ }
+ return pojo;
+ } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ String message =
+ String.format(
+ "Failed to instantiate POJO class %s using the
public default constructor. Cause: %s",
+ pojoType.getPojoClass().getName(), e.getMessage());
+ throw new IllegalStateException(message, e);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to set field on POJO class " +
pojoType.getPojoClass().getName(), e);
+ }
+ }
+
+ private RowToField[] createRowReaders() {
+ RowToField[] arr = new RowToField[projection.getFieldCount()];
+ for (int i = 0; i < projection.getFieldCount(); i++) {
+ String name = projectionFieldNames.get(i);
+ DataType type = projection.getTypeAt(i);
+ PojoType.Property prop = requireProperty(name);
+ ConverterCommons.validateCompatibility(type, prop);
+ arr[i] = createRowReader(type, prop);
+ }
+ return arr;
+ }
+
+ private PojoType.Property requireProperty(String fieldName) {
+ PojoType.Property p = pojoType.getProperty(fieldName);
+ if (p == null) {
+ throw new IllegalArgumentException(
+ "Field '"
+ + fieldName
+ + "' not found in POJO class "
+ + pojoType.getPojoClass().getName()
+ + ".");
+ }
+ return p;
+ }
+
+ private static RowToField createRowReader(DataType fieldType,
PojoType.Property prop) {
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ return InternalRow::getBoolean;
+ case TINYINT:
+ return InternalRow::getByte;
+ case SMALLINT:
+ return InternalRow::getShort;
+ case INTEGER:
+ return InternalRow::getInt;
+ case BIGINT:
+ return InternalRow::getLong;
+ case FLOAT:
+ return InternalRow::getFloat;
+ case DOUBLE:
+ return InternalRow::getDouble;
+ case CHAR:
+ case STRING:
+ return (row, pos) -> convertTextValue(fieldType, prop,
row.getString(pos));
+ case BINARY:
+ case BYTES:
+ return InternalRow::getBytes;
+ case DECIMAL:
+ return (row, pos) -> convertDecimalValue((DecimalType)
fieldType, row, pos);
+ case DATE:
+ return RowToPojoConverter::convertDateValue;
+ case TIME_WITHOUT_TIME_ZONE:
+ return RowToPojoConverter::convertTimeValue;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(fieldType);
+ return (row, pos) -> convertTimestampNtzValue(precision,
row, pos);
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(fieldType);
+ return (row, pos) -> convertTimestampLtzValue(precision,
prop, row, pos);
+ }
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported field type %s for field %s.",
+ fieldType.getTypeRoot(), prop.name));
+ }
+ }
+
+ /**
+ * Converts a text value (CHAR/STRING) read from an InternalRow into the
target Java type
+ * declared by the POJO property.
+ *
+ * <p>Supported target types are String and Character. For CHAR columns,
this enforces that the
+ * value has exactly one character. For Character targets, empty strings
are rejected.
+ *
+ * @param fieldType Fluss column DataType (must be CHAR or STRING)
+ * @param prop The target POJO property (used for type and error messages)
+ * @param s The BinaryString read from the row
+ * @return Converted Java value (String or Character)
+ * @throws IllegalArgumentException if the target type is unsupported or
constraints are
+ * violated
+ */
+ private static Object convertTextValue(
+ DataType fieldType, PojoType.Property prop, BinaryString s) {
+ String v = s.toString();
+ if (prop.type == String.class) {
+ if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
+ throw new IllegalArgumentException(
+ charLengthExceptionMessage(prop.name, v.length()));
+ }
+ return v;
+ } else if (prop.type == Character.class) {
+ if (v.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s expects Character, but the string
value is empty.",
+ prop.name));
+ }
+ if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
+ throw new IllegalArgumentException(
+ charLengthExceptionMessage(prop.name, v.length()));
+ }
+ return v.charAt(0);
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a String or Character. Cannot convert
from string.",
+ prop.name));
+ }
+
+ public static String charLengthExceptionMessage(String fieldName, int
length) {
+ return String.format(
+ "Field %s expects exactly one character for CHAR type, got
length %d.",
+ fieldName, length);
+ }
+
+ /**
+ * Converts a DECIMAL value from an InternalRow into a BigDecimal using
the column's precision
+ * and scale. The row position is assumed non-null (caller checks), so
this never returns null.
+ */
+ private static BigDecimal convertDecimalValue(
+ DecimalType decimalType, InternalRow row, int pos) {
+ Decimal d = row.getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale());
+ return d.toBigDecimal();
+ }
+
+ /** Converts a DATE value stored as int days since epoch to a LocalDate. */
+ private static LocalDate convertDateValue(InternalRow row, int pos) {
+ return LocalDate.ofEpochDay(row.getInt(pos));
+ }
+
+ /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to
a LocalTime. */
+ private static LocalTime convertTimeValue(InternalRow row, int pos) {
+ return LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L);
+ }
+
+ /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime
honoring precision. */
+ private static Object convertTimestampNtzValue(int precision, InternalRow
row, int pos) {
+ TimestampNtz t = row.getTimestampNtz(pos, precision);
+ return t.toLocalDateTime();
+ }
+
+ /**
+ * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or
OffsetDateTime in UTC,
+ * depending on the target POJO property type.
+ */
+ private static Object convertTimestampLtzValue(
+ int precision, PojoType.Property prop, InternalRow row, int pos) {
+ TimestampLtz t = row.getTimestampLtz(pos, precision);
+ if (prop.type == Instant.class) {
+ return t.toInstant();
+ } else if (prop.type == OffsetDateTime.class) {
+ return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not an Instant or OffsetDateTime. Cannot
convert from TimestampData.",
+ prop.name));
+ }
+
+ private interface RowToField {
+ Object convert(InternalRow row, int pos) throws Exception;
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
new file mode 100644
index 000000000..8863b9f23
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** Shared fixtures and helper POJOs for converter tests. */
+public final class ConvertersTestFixtures {
+
+ private ConvertersTestFixtures() {}
+
+ public static RowType fullSchema() {
+ return RowType.builder()
+ .field("booleanField", DataTypes.BOOLEAN())
+ .field("byteField", DataTypes.TINYINT())
+ .field("shortField", DataTypes.SMALLINT())
+ .field("intField", DataTypes.INT())
+ .field("longField", DataTypes.BIGINT())
+ .field("floatField", DataTypes.FLOAT())
+ .field("doubleField", DataTypes.DOUBLE())
+ .field("stringField", DataTypes.STRING())
+ .field("bytesField", DataTypes.BYTES())
+ .field("decimalField", DataTypes.DECIMAL(10, 2))
+ .field("dateField", DataTypes.DATE())
+ .field("timeField", DataTypes.TIME())
+ .field("timestampField", DataTypes.TIMESTAMP())
+ .field("timestampLtzField", DataTypes.TIMESTAMP_LTZ())
+ .field("offsetDateTimeField", DataTypes.TIMESTAMP_LTZ())
+ .build();
+ }
+
+ // ----------------------- Helper POJOs -----------------------
+ /** Test POJO used for end-to-end converter tests. */
+ public static class TestPojo {
+ public Boolean booleanField;
+ public Byte byteField;
+ public Short shortField;
+ public Integer intField;
+ public Long longField;
+ public Float floatField;
+ public Double doubleField;
+ public String stringField;
+ public byte[] bytesField;
+ public BigDecimal decimalField;
+ public LocalDate dateField;
+ public LocalTime timeField;
+ public LocalDateTime timestampField;
+ public Instant timestampLtzField;
+ public OffsetDateTime offsetDateTimeField;
+
+ public TestPojo() {}
+
+ public TestPojo(
+ Boolean booleanField,
+ Byte byteField,
+ Short shortField,
+ Integer intField,
+ Long longField,
+ Float floatField,
+ Double doubleField,
+ String stringField,
+ byte[] bytesField,
+ BigDecimal decimalField,
+ LocalDate dateField,
+ LocalTime timeField,
+ LocalDateTime timestampField,
+ Instant timestampLtzField,
+ OffsetDateTime offsetDateTimeField) {
+ this.booleanField = booleanField;
+ this.byteField = byteField;
+ this.shortField = shortField;
+ this.intField = intField;
+ this.longField = longField;
+ this.floatField = floatField;
+ this.doubleField = doubleField;
+ this.stringField = stringField;
+ this.bytesField = bytesField;
+ this.decimalField = decimalField;
+ this.dateField = dateField;
+ this.timeField = timeField;
+ this.timestampField = timestampField;
+ this.timestampLtzField = timestampLtzField;
+ this.offsetDateTimeField = offsetDateTimeField;
+ }
+
+ public static TestPojo sample() {
+ return new TestPojo(
+ true,
+ (byte) 42,
+ (short) 1234,
+ 123456,
+ 9876543210L,
+ 3.14f,
+ 2.71828,
+ "Hello, World!",
+ new byte[] {1, 2, 3, 4, 5},
+ new BigDecimal("123.45"),
+ LocalDate.of(2025, 7, 23),
+ LocalTime.of(15, 1, 30),
+ LocalDateTime.of(2025, 7, 23, 15, 1, 30),
+ Instant.parse("2025-07-23T15:01:30Z"),
+ OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0,
ZoneOffset.UTC));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestPojo testPojo = (TestPojo) o;
+ return Objects.equals(booleanField, testPojo.booleanField)
+ && Objects.equals(byteField, testPojo.byteField)
+ && Objects.equals(shortField, testPojo.shortField)
+ && Objects.equals(intField, testPojo.intField)
+ && Objects.equals(longField, testPojo.longField)
+ && Objects.equals(floatField, testPojo.floatField)
+ && Objects.equals(doubleField, testPojo.doubleField)
+ && Objects.equals(stringField, testPojo.stringField)
+ && Arrays.equals(bytesField, testPojo.bytesField)
+ && Objects.equals(decimalField, testPojo.decimalField)
+ && Objects.equals(dateField, testPojo.dateField)
+ && Objects.equals(timeField, testPojo.timeField)
+ && Objects.equals(timestampField, testPojo.timestampField)
+ && Objects.equals(timestampLtzField,
testPojo.timestampLtzField)
+ && Objects.equals(offsetDateTimeField,
testPojo.offsetDateTimeField);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ booleanField,
+ byteField,
+ shortField,
+ intField,
+ longField,
+ floatField,
+ doubleField,
+ stringField,
+ decimalField,
+ dateField,
+ timeField,
+ timestampField,
+ timestampLtzField,
+ offsetDateTimeField);
+ result = 31 * result + Arrays.hashCode(bytesField);
+ return result;
+ }
+ }
+
+ /** A minimal POJO with a subset of fields for projection tests. */
+ public static class PartialTestPojo {
+ public Boolean booleanField;
+ public Integer intField;
+ public String stringField;
+
+ public PartialTestPojo() {}
+ }
+
+ /** POJO without public default constructor, used for validation tests. */
+ public static class NoDefaultConstructorPojo {
+ public Integer intField;
+
+ public NoDefaultConstructorPojo(int v) {
+ this.intField = v;
+ }
+ }
+
+ /** POJO whose default constructor throws, used to test instantiation
error handling. */
+ public static class ThrowingCtorPojo {
+ public Integer intField;
+
+ public ThrowingCtorPojo() {
+ throw new RuntimeException("ctor failure");
+ }
+ }
+
+ /** POJO with wrong Java type for DECIMAL field, used for negative tests.
*/
+ public static class DecimalWrongTypePojo {
+ public String decimalField;
+
+ public DecimalWrongTypePojo() {}
+ }
+
+ /** POJO with wrong Java type for DATE field, used for negative tests. */
+ public static class DateWrongTypePojo {
+ public String dateField;
+
+ public DateWrongTypePojo() {}
+ }
+
+ /** POJO with wrong Java type for TIME field, used for negative tests. */
+ public static class TimeWrongTypePojo {
+ public String timeField;
+
+ public TimeWrongTypePojo() {}
+ }
+
+ /** POJO with wrong Java type for TIMESTAMP_NTZ field, used for negative
tests. */
+ public static class TimestampWrongTypePojo {
+ public String timestampField;
+
+ public TimestampWrongTypePojo() {}
+ }
+
+ /** POJO with wrong Java type for TIMESTAMP_LTZ field, used for negative
tests. */
+ public static class TimestampLtzWrongTypePojo {
+ public String timestampLtzField;
+
+ public TimestampLtzWrongTypePojo() {}
+ }
+
+ /** POJO with unsupported Map field, used for negative tests. */
+ public static class MapPojo {
+ public Map<String, Integer> mapField;
+
+ public MapPojo() {}
+ }
+
+ /** POJO with Character field for CHAR/STRING handling tests. */
+ public static class CharacterFieldPojo {
+ public Character charField;
+
+ public CharacterFieldPojo() {}
+
+ public CharacterFieldPojo(Character c) {
+ this.charField = c;
+ }
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
new file mode 100644
index 000000000..11deb35ac
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link PojoToRowConverter}. */
+public class PojoToRowConverterTest {
+
+ @Test
+ public void testNullHandlingToRow() {
+ RowType table = ConvertersTestFixtures.fullSchema();
+ RowType projection = table;
+ PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+ PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+ assertThat(writer.toRow(null)).isNull();
+ }
+
+ @Test
+ public void testProjectionSubsetWrites() {
+ RowType table = ConvertersTestFixtures.fullSchema();
+ RowType projection =
+ RowType.builder()
+ .field("booleanField", DataTypes.BOOLEAN())
+ .field("intField", DataTypes.INT())
+ .field("stringField", DataTypes.STRING())
+ .build();
+
+ PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+ PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+ ConvertersTestFixtures.TestPojo pojo =
ConvertersTestFixtures.TestPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.getFieldCount()).isEqualTo(3);
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getInt(1)).isEqualTo(123456);
+ assertThat(row.getString(2).toString()).isEqualTo("Hello, World!");
+ }
+
+ @Test
+ public void testPojoMustExactlyMatchTableSchema() {
+ RowType table =
+ RowType.builder()
+ .field("booleanField", DataTypes.BOOLEAN())
+ .field("intField", DataTypes.INT())
+ .field("stringField", DataTypes.STRING())
+ .field("extraField", DataTypes.DOUBLE())
+ .build();
+ RowType projection = table;
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.PartialTestPojo.class,
+ table,
+ projection))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must exactly match table schema");
+ }
+
+ @Test
+ public void testPojoNoDefaultCtorFails() {
+ RowType table = RowType.builder().field("intField",
DataTypes.INT()).build();
+ RowType proj = table;
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.NoDefaultConstructorPojo.class,
+ table,
+ proj))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("public default constructor");
+ }
+
+ @Test
+ public void testDecimalTypeValidationAtCreation() {
+ RowType table = RowType.builder().field("decimalField",
DataTypes.DECIMAL(10, 2)).build();
+ RowType proj = table;
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.DecimalWrongTypePojo.class,
+ table,
+ proj))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("incompatible with Fluss type")
+ .hasMessageContaining("decimalField");
+ }
+
+ @Test
+ public void testDateTimeTypeValidationAtCreation() {
+ RowType dateSchema = RowType.builder().field("dateField",
DataTypes.DATE()).build();
+ RowType timeSchema = RowType.builder().field("timeField",
DataTypes.TIME()).build();
+ RowType tsSchema = RowType.builder().field("timestampField",
DataTypes.TIMESTAMP()).build();
+ RowType ltzSchema =
+ RowType.builder().field("timestampLtzField",
DataTypes.TIMESTAMP_LTZ()).build();
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.DateWrongTypePojo.class,
+ dateSchema,
+ dateSchema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("incompatible with Fluss type")
+ .hasMessageContaining("dateField");
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.TimeWrongTypePojo.class,
+ timeSchema,
+ timeSchema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("incompatible with Fluss type")
+ .hasMessageContaining("timeField");
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.TimestampWrongTypePojo.class,
+ tsSchema,
+ tsSchema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("incompatible with Fluss type")
+ .hasMessageContaining("timestampField");
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+
ConvertersTestFixtures.TimestampLtzWrongTypePojo.class,
+ ltzSchema,
+ ltzSchema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("incompatible with Fluss type")
+ .hasMessageContaining("timestampLtzField");
+ }
+
+ @Test
+ public void testUnsupportedSchemaFieldTypeThrows() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+ RowType proj = table;
+ assertThatThrownBy(
+ () ->
+ PojoToRowConverter.of(
+ ConvertersTestFixtures.MapPojo.class,
table, proj))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Unsupported field type")
+ .hasMessageContaining("MAP")
+ .hasMessageContaining("mapField");
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
new file mode 100644
index 000000000..52d69eb2a
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link RowToPojoConverter}. */
+public class RowToPojoConverterTest {
+
+ @Test
+ public void testRoundTripFullSchema() {
+ RowType table = ConvertersTestFixtures.fullSchema();
+ RowType projection = table;
+
+ PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+ PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+ RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+ RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+
+ ConvertersTestFixtures.TestPojo pojo =
ConvertersTestFixtures.TestPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.getFieldCount()).isEqualTo(15);
+
+ ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
+ assertThat(back).isEqualTo(pojo);
+ }
+
+ @Test
+ public void testNullHandlingFromRow() {
+ RowType table = ConvertersTestFixtures.fullSchema();
+ RowType projection = table;
+ RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+ RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+ assertThat(scanner.fromRow(null)).isNull();
+ }
+
+ @Test
+ public void testProjectionSubsetReads() {
+ RowType table = ConvertersTestFixtures.fullSchema();
+ RowType projection =
+ RowType.builder()
+ .field("booleanField", DataTypes.BOOLEAN())
+ .field("intField", DataTypes.INT())
+ .field("stringField", DataTypes.STRING())
+ .build();
+
+ PojoToRowConverter<ConvertersTestFixtures.TestPojo> writer =
+ PojoToRowConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+ RowToPojoConverter<ConvertersTestFixtures.TestPojo> scanner =
+ RowToPojoConverter.of(ConvertersTestFixtures.TestPojo.class,
table, projection);
+
+ ConvertersTestFixtures.TestPojo pojo =
ConvertersTestFixtures.TestPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.getFieldCount()).isEqualTo(3);
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getInt(1)).isEqualTo(123456);
+ assertThat(row.getString(2).toString()).isEqualTo("Hello, World!");
+
+ ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
+ assertThat(back.booleanField).isTrue();
+ assertThat(back.intField).isEqualTo(123456);
+ assertThat(back.stringField).isEqualTo("Hello, World!");
+ // non-projected remain null
+ assertThat(back.byteField).isNull();
+ assertThat(back.shortField).isNull();
+ assertThat(back.longField).isNull();
+ assertThat(back.floatField).isNull();
+ assertThat(back.doubleField).isNull();
+ assertThat(back.bytesField).isNull();
+ assertThat(back.decimalField).isNull();
+ assertThat(back.dateField).isNull();
+ assertThat(back.timeField).isNull();
+ assertThat(back.timestampField).isNull();
+ assertThat(back.timestampLtzField).isNull();
+ assertThat(back.offsetDateTimeField).isNull();
+ }
+
+ @Test
+ public void testFromRowThrowsWhenDefaultConstructorThrows() {
+ RowType table = RowType.builder().field("intField",
DataTypes.INT()).build();
+ RowType proj = table;
+ RowToPojoConverter<ConvertersTestFixtures.ThrowingCtorPojo> scanner =
+
RowToPojoConverter.of(ConvertersTestFixtures.ThrowingCtorPojo.class, table,
proj);
+ GenericRow row = new GenericRow(1);
+ row.setField(0, 1);
+ assertThatThrownBy(() -> scanner.fromRow(row))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Failed to instantiate POJO class")
+
.hasMessageContaining(ConvertersTestFixtures.ThrowingCtorPojo.class.getName());
+ }
+
+ @Test
+ public void testCharacterFieldRoundTrip() {
+ RowType table = RowType.builder().field("charField",
DataTypes.STRING()).build();
+ RowType proj = table;
+ PojoToRowConverter<ConvertersTestFixtures.CharacterFieldPojo> writer =
+
PojoToRowConverter.of(ConvertersTestFixtures.CharacterFieldPojo.class, table,
proj);
+ RowToPojoConverter<ConvertersTestFixtures.CharacterFieldPojo> scanner =
+
RowToPojoConverter.of(ConvertersTestFixtures.CharacterFieldPojo.class, table,
proj);
+ ConvertersTestFixtures.CharacterFieldPojo pojo =
+ new ConvertersTestFixtures.CharacterFieldPojo('A');
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.getString(0).toString()).isEqualTo("A");
+ ConvertersTestFixtures.CharacterFieldPojo back = scanner.fromRow(row);
+ assertThat(back.charField).isEqualTo('A');
+ }
+}