This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2f8cadad4 [fluss-client] Support Complex Data Types on the Java Typed
API (Map and Array) (#2814)
2f8cadad4 is described below
commit 2f8cadad425a95533be79a8d7f36356993ef7b9f
Author: nhuantho <[email protected]>
AuthorDate: Wed Mar 11 19:58:39 2026 +0700
[fluss-client] Support Complex Data Types on the Java Typed API (Map and
Array) (#2814)
* [fluss-client] Add pojo array to fluss array
* [fluss-client] Add pojo map to fluss map
* [fluss-client] Create shared utilities for POJO type and Fluss type
* [fluss-client] Create shared utilities for Fluss type to Pojo type
* [fluss-client] Add Fluss array to Pojo array
* [fluss-client] Add Fluss map to Pojo map
* [ci] Fix Compile Java 8
* Revert _partial_config.mdx
* [fluss-client] Hanle primitive arrays
* [fluss-client] Refactor PojoMapToFlussMap to follow the same conversion
logic as PojoArrayToFlussArray
* [fluss-client] Change validateCompatibility logic for MAP and ARRAY types
* small fixes and add tests to FlussTypedClient
* fix checkstyle
* fix checkstyle
---------
Co-authored-by: ipolyzos <[email protected]>
---
.../fluss/client/converter/ConverterCommons.java | 32 +-
.../client/converter/FlussArrayToPojoArray.java | 184 +++++++++
.../fluss/client/converter/FlussMapToPojoMap.java | 69 ++++
.../converter/FlussTypeToPojoTypeConverter.java | 122 ++++++
.../client/converter/PojoArrayToFlussArray.java | 99 +++++
.../fluss/client/converter/PojoMapToFlussMap.java | 58 +++
.../fluss/client/converter/PojoToRowConverter.java | 213 ++--------
...rter.java => PojoTypeToFlussTypeConverter.java} | 236 +++++------
.../fluss/client/converter/RowToPojoConverter.java | 179 +++------
.../client/converter/ConvertersTestFixtures.java | 29 +-
.../converter/FlussArrayToPojoArrayTest.java | 431 ++++++++++++++++++++
.../converter/PojoArrayToFlussArrayTest.java | 437 +++++++++++++++++++++
.../client/converter/PojoToRowConverterTest.java | 67 +++-
.../client/converter/RowToPojoConverterTest.java | 107 ++++-
.../fluss/client/table/FlussTypedClientITCase.java | 337 ++++++++++++++++
15 files changed, 2124 insertions(+), 476 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
index aeb36419a..dbc883f36 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -24,14 +24,13 @@ import org.apache.fluss.types.RowType;
import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.fluss.client.converter.RowToPojoConverter.charLengthExceptionMessage;
-
/**
* Internal shared utilities for POJO and Fluss InternalRow conversions.
*
@@ -112,8 +111,29 @@ final class ConverterCommons {
}
static void validateCompatibility(DataType fieldType, PojoType.Property
prop) {
- Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
+ DataTypeRoot typeRoot = fieldType.getTypeRoot();
Class<?> actual = prop.type;
+ if (typeRoot == DataTypeRoot.ARRAY) {
+ if (!actual.isArray() &&
!Collection.class.isAssignableFrom(actual)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field '%s' must be an array or Collection for
ARRAY type, got %s",
+ prop.name, actual.getName()));
+ }
+ return;
+ }
+
+ if (typeRoot == DataTypeRoot.MAP) {
+ if (!Map.class.isAssignableFrom(actual)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field '%s' must be a Map for MAP type, got
%s",
+ prop.name, actual.getName()));
+ }
+ return;
+ }
+
+ Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
if (supported == null) {
throw new UnsupportedOperationException(
String.format(
@@ -128,6 +148,12 @@ final class ConverterCommons {
}
}
+ public static String charLengthExceptionMessage(String fieldName, int
length) {
+ return String.format(
+ "Field %s expects exactly one character for CHAR type, got
length %d.",
+ fieldName, length);
+ }
+
static BinaryString toBinaryStringForText(Object v, String fieldName,
DataTypeRoot root) {
final String s = String.valueOf(v);
if (root == DataTypeRoot.CHAR && s.length() != 1) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
new file mode 100644
index 000000000..cc9efd31f
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
+
+import java.lang.reflect.Array;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Adapter class for converting Fluss InternalArray to Pojo Array. */
+public class FlussArrayToPojoArray {
+ private final InternalArray flussArray;
+ private final String fieldName;
+ private final Class<?> pojoType;
+ /** Pre-compiled per-element converter; avoids repeating the type-switch
on every element. */
+ private final ElementConverter elementConverter;
+
+ public FlussArrayToPojoArray(
+ InternalArray flussArray, DataType elementType, String fieldName,
Class<?> pojoType) {
+ this.flussArray = flussArray;
+ this.fieldName = fieldName;
+ this.pojoType = pojoType != null ? pojoType : Object.class;
+ this.elementConverter = buildElementConverter(elementType, fieldName,
this.pojoType);
+ }
+
+ public Object convertArray() {
+ if (flussArray == null) {
+ return null;
+ }
+
+ int size = flussArray.size();
+ Object result = Array.newInstance(pojoType, size);
+ for (int i = 0; i < size; i++) {
+ Object element = convertElementValue(i);
+ if (element == null && pojoType.isPrimitive()) {
+ throw new NullPointerException(
+ String.format(
+ "Field '%s': cannot store null into a
primitive array of type %s[]."
+ + " Use a boxed type (e.g. Integer[])
or ensure all elements are non-null.",
+ fieldName, pojoType.getName()));
+ }
+ Array.set(result, i, element);
+ }
+ return result;
+ }
+
+ /** Converts the Fluss array to a {@link List}, preserving null elements.
*/
+ List<Object> convertList() {
+ if (flussArray == null) {
+ return null;
+ }
+ int size = flussArray.size();
+ List<Object> result = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ result.add(convertElementValue(i));
+ }
+ return result;
+ }
+
+ Object convertElementValue(int index) {
+ if (flussArray.isNullAt(index)) {
+ return null;
+ }
+ return elementConverter.convert(flussArray, index);
+ }
+
+ /**
+ * Resolves the element type once and returns a pre-compiled converter.
This avoids repeating
+ * the type dispatch (getTypeRoot() switch + casts) on every array element.
+ */
+ private static ElementConverter buildElementConverter(
+ DataType elementType, String fieldName, Class<?> pojoType) {
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ return InternalArray::getBoolean;
+ case TINYINT:
+ return InternalArray::getByte;
+ case SMALLINT:
+ return InternalArray::getShort;
+ case INTEGER:
+ return InternalArray::getInt;
+ case BIGINT:
+ return InternalArray::getLong;
+ case FLOAT:
+ return InternalArray::getFloat;
+ case DOUBLE:
+ return InternalArray::getDouble;
+ case CHAR:
+ case STRING:
+ // Default to String when the POJO element type is unspecified
(Object[])
+ final Class<?> textTarget = (pojoType == Object.class) ?
String.class : pojoType;
+ return (arr, i) ->
+ FlussTypeToPojoTypeConverter.convertTextValue(
+ elementType, fieldName, textTarget,
arr.getString(i));
+ case BINARY:
+ case BYTES:
+ return InternalArray::getBytes;
+ case DECIMAL:
+ {
+ final DecimalType decimalType = (DecimalType) elementType;
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return (arr, i) -> {
+ Decimal d = arr.getDecimal(i, precision, scale);
+ return
FlussTypeToPojoTypeConverter.convertDecimalValue(d);
+ };
+ }
+ case DATE:
+ return (arr, i) ->
FlussTypeToPojoTypeConverter.convertDateValue(arr.getInt(i));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (arr, i) ->
FlussTypeToPojoTypeConverter.convertTimeValue(arr.getInt(i));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(elementType);
+ return (arr, i) ->
+
FlussTypeToPojoTypeConverter.convertTimestampNtzValue(
+ arr.getTimestampNtz(i, precision));
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(elementType);
+ // Default to Instant when the POJO element type is
unspecified (Object[])
+ final Class<?> tsTarget = (pojoType == Object.class) ?
Instant.class : pojoType;
+ return (arr, i) ->
+
FlussTypeToPojoTypeConverter.convertTimestampLtzValue(
+ arr.getTimestampLtz(i, precision),
fieldName, tsTarget);
+ }
+ case ARRAY:
+ {
+ final ArrayType nestedArrayType = (ArrayType) elementType;
+ final Class<?> componentType = pojoType.getComponentType();
+ return (arr, i) -> {
+ InternalArray innerArray = arr.getArray(i);
+ return innerArray == null
+ ? null
+ : new FlussArrayToPojoArray(
+ innerArray,
+
nestedArrayType.getElementType(),
+ fieldName,
+ componentType)
+ .convertArray();
+ };
+ }
+ case MAP:
+ return (arr, i) ->
+ new FlussMapToPojoMap(arr.getMap(i), (MapType)
elementType, fieldName)
+ .convertMap();
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported field type %s for field %s.",
+ elementType.getTypeRoot(), fieldName));
+ }
+ }
+
+ @FunctionalInterface
+ private interface ElementConverter {
+ Object convert(InternalArray array, int index);
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
new file mode 100644
index 000000000..d1a7474aa
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussMapToPojoMap.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.types.MapType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Adapter class for converting Fluss InternalMap to Pojo map. */
+public class FlussMapToPojoMap {
+ private final InternalMap flussMap;
+ private final MapType mapType;
+ private final String fieldName;
+
+ public FlussMapToPojoMap(InternalMap flussMap, MapType mapType, String
fieldName) {
+ this.flussMap = flussMap;
+ this.mapType = mapType;
+ this.fieldName = fieldName;
+ }
+
+ public Object convertMap() {
+ if (flussMap == null) {
+ return null;
+ }
+
+ List<Object> keys =
+ new FlussArrayToPojoArray(
+ flussMap.keyArray(), mapType.getKeyType(),
fieldName, Object.class)
+ .convertList();
+
+ if (keys == null || keys.isEmpty()) {
+ return new HashMap<>();
+ }
+
+ List<Object> values =
+ new FlussArrayToPojoArray(
+ flussMap.valueArray(),
+ mapType.getValueType(),
+ fieldName,
+ Object.class)
+ .convertList();
+
+ // Build the result map
+ Map<Object, Object> result = new HashMap<>(keys.size() * 2);
+ for (int i = 0; i < keys.size(); i++) {
+ result.put(keys.get(i), values == null ? null : values.get(i));
+ }
+ return result;
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
new file mode 100644
index 000000000..2b693f4e3
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/FlussTypeToPojoTypeConverter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+
+/** Shared utilities for Fluss type and Pojo type. */
+public class FlussTypeToPojoTypeConverter {
+ /**
+ * Converts a text value (CHAR/STRING) read from an InternalRow into the
target Java type
+ * declared by the POJO property.
+ *
+ * <p>Supported target types are String and Character. For CHAR columns,
this enforces that the
+ * value has exactly one character. For Character targets, empty strings
are rejected.
+ *
+ * @param fieldType Fluss column DataType (must be CHAR or STRING)
+ * @param fieldName The field name
+ * @param pojoType The pojo type
+ * @param s The BinaryString read from the row
+ * @return Converted Java value (String or Character)
+ * @throws IllegalArgumentException if the target type is unsupported or
constraints are
+ * violated
+ */
+ static Object convertTextValue(
+ DataType fieldType, String fieldName, Class<?> pojoType,
BinaryString s) {
+ if (s == null) {
+ return null;
+ }
+
+ String v = s.toString();
+ if (pojoType == String.class) {
+ if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
+ throw new IllegalArgumentException(
+ ConverterCommons.charLengthExceptionMessage(fieldName,
v.length()));
+ }
+ return v;
+ } else if (pojoType == Character.class) {
+ if (v.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s expects Character, but the string
value is empty.",
+ fieldName));
+ }
+ if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
+ throw new IllegalArgumentException(
+ ConverterCommons.charLengthExceptionMessage(fieldName,
v.length()));
+ }
+ return v.charAt(0);
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not a String or Character. Cannot convert
from string.",
+ fieldName));
+ }
+
+ /**
+ * Converts a DECIMAL value from an InternalRow into a BigDecimal using
the column's precision
+ * and scale. The row position is assumed non-null (caller checks), so
this never returns null.
+ */
+ static BigDecimal convertDecimalValue(Decimal d) {
+ return d.toBigDecimal();
+ }
+
+ /** Converts a DATE value stored as int days since epoch to a LocalDate. */
+ static LocalDate convertDateValue(int daysSinceEpoch) {
+ return LocalDate.ofEpochDay(daysSinceEpoch);
+ }
+
+ /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to
a LocalTime. */
+ static LocalTime convertTimeValue(int millisOfDay) {
+ return LocalTime.ofNanoOfDay(millisOfDay * 1_000_000L);
+ }
+
+ /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime
honoring precision. */
+ static Object convertTimestampNtzValue(TimestampNtz t) {
+ return t.toLocalDateTime();
+ }
+
+ /**
+ * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or
OffsetDateTime in UTC,
+ * depending on the target POJO property type.
+ */
+ static Object convertTimestampLtzValue(TimestampLtz t, String fieldName,
Class<?> pojoType) {
+ if (pojoType == Instant.class) {
+ return t.toInstant();
+ } else if (pojoType == OffsetDateTime.class) {
+ return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is not an Instant or OffsetDateTime. Cannot
convert from TimestampData.",
+ fieldName));
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
new file mode 100644
index 000000000..2b27ddccf
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+
+import java.util.Collection;
+
+import static
org.apache.fluss.client.converter.PojoTypeToFlussTypeConverter.convertElementValue;
+
+/** Adapter class for converting Pojo Array to Fluss InternalArray. */
+public class PojoArrayToFlussArray {
+ private final Object obj;
+ private final DataType fieldType;
+ private final String fieldName;
+
+ public PojoArrayToFlussArray(Object obj, DataType fieldType, String
fieldName) {
+ this.obj = obj;
+ this.fieldType = fieldType;
+ this.fieldName = fieldName;
+ }
+
+ public GenericArray convertArray() {
+ if (obj == null) {
+ return null;
+ }
+
+ ArrayType arrayType = (ArrayType) fieldType;
+ DataType elementType = arrayType.getElementType();
+
+ // Handle primitive arrays
+ if (obj instanceof boolean[]) {
+ return new GenericArray((boolean[]) obj);
+ } else if (obj instanceof long[]) {
+ return new GenericArray((long[]) obj);
+ } else if (obj instanceof double[]) {
+ return new GenericArray((double[]) obj);
+ } else if (obj instanceof float[]) {
+ return new GenericArray((float[]) obj);
+ } else if (obj instanceof short[]) {
+ return new GenericArray((short[]) obj);
+ } else if (obj instanceof byte[]) {
+ return new GenericArray((byte[]) obj);
+ } else if (obj instanceof int[]) {
+ return new GenericArray((int[]) obj);
+ }
+ // Handle boxed wrapper arrays
+ else if (obj instanceof Boolean[]) {
+ return new GenericArray((Boolean[]) obj);
+ } else if (obj instanceof Long[]) {
+ return new GenericArray((Long[]) obj);
+ } else if (obj instanceof Double[]) {
+ return new GenericArray((Double[]) obj);
+ } else if (obj instanceof Float[]) {
+ return new GenericArray((Float[]) obj);
+ } else if (obj instanceof Short[]) {
+ return new GenericArray((Short[]) obj);
+ } else if (obj instanceof Byte[]) {
+ return new GenericArray((Byte[]) obj);
+ }
+
+ // Handle Object[] and java.util.Collection
+ Object[] elements;
+ if (obj instanceof Object[]) {
+ elements = (Object[]) obj;
+ } else if (obj instanceof Collection) {
+ elements = ((Collection<?>) obj).toArray();
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s has unsupported array type: %s. Expected
array or Collection.",
+ fieldName, obj.getClass().getName()));
+ }
+
+ Object[] converted = new Object[elements.length];
+ for (int i = 0; i < elements.length; i++) {
+ converted[i] = convertElementValue(elements[i], elementType,
fieldName);
+ }
+ return new GenericArray(converted);
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
new file mode 100644
index 000000000..a3bdf8917
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMapToFlussMap.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.types.MapType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.fluss.client.converter.PojoTypeToFlussTypeConverter.convertElementValue;
+
+/** Adapter class for converting Pojo Map to Fluss InternalMap. */
+public class PojoMapToFlussMap {
+ private final Map<?, ?> pojoMap;
+ private final MapType mapType;
+ private final String fieldName;
+
+ public PojoMapToFlussMap(Map<?, ?> pojoMap, MapType mapType, String
fieldName) {
+ this.pojoMap = pojoMap;
+ this.mapType = mapType;
+ this.fieldName = fieldName;
+ }
+
+ public GenericMap convertMap() {
+ if (pojoMap == null) {
+ return null;
+ }
+
+ Map<Object, Object> converted = new HashMap<>(pojoMap.size() * 2);
+ for (Map.Entry<?, ?> entry : pojoMap.entrySet()) {
+ Object convertedKey =
+ convertElementValue(entry.getKey(), mapType.getKeyType(),
fieldName);
+ Object convertedValue =
+ convertElementValue(entry.getValue(),
mapType.getValueType(), fieldName);
+ converted.put(convertedKey, convertedValue);
+ }
+
+ // Build the result map
+ return new GenericMap(converted);
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
index 0f4f6d8a4..14e46634a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -17,26 +17,17 @@
package org.apache.fluss.client.converter;
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericRow;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
import java.util.List;
+import java.util.Map;
/**
* Converter for writer path: converts POJO instances to Fluss InternalRow
according to a (possibly
@@ -130,23 +121,42 @@ public final class PojoToRowConverter<T> {
return prop::read;
case CHAR:
case STRING:
- return (obj) -> convertTextValue(fieldType, prop,
prop.read(obj));
+ return (obj) ->
+ PojoTypeToFlussTypeConverter.convertTextValue(
+ fieldType, prop.name, prop.read(obj));
case DECIMAL:
- return (obj) -> convertDecimalValue((DecimalType) fieldType,
prop, prop.read(obj));
+ return (obj) ->
+ PojoTypeToFlussTypeConverter.convertDecimalValue(
+ (DecimalType) fieldType, prop.name,
prop.read(obj));
case DATE:
- return (obj) -> convertDateValue(prop, prop.read(obj));
+ return (obj) ->
+
PojoTypeToFlussTypeConverter.convertDateValue(prop.name, prop.read(obj));
case TIME_WITHOUT_TIME_ZONE:
- return (obj) -> convertTimeValue(prop, prop.read(obj));
+ return (obj) ->
+
PojoTypeToFlussTypeConverter.convertTimeValue(prop.name, prop.read(obj));
case TIMESTAMP_WITHOUT_TIME_ZONE:
{
final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (obj) -> convertTimestampNtzValue(precision, prop,
prop.read(obj));
+ return (obj) ->
+
PojoTypeToFlussTypeConverter.convertTimestampNtzValue(
+ precision, prop.name, prop.read(obj));
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
{
final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (obj) -> convertTimestampLtzValue(precision, prop,
prop.read(obj));
+ return (obj) ->
+
PojoTypeToFlussTypeConverter.convertTimestampLtzValue(
+ precision, prop.name, prop.read(obj));
}
+ case ARRAY:
+ return (obj) ->
+ new PojoArrayToFlussArray(prop.read(obj), fieldType,
prop.name)
+ .convertArray();
+ case MAP:
+ return (obj) ->
+ new PojoMapToFlussMap(
+ (Map<?, ?>) prop.read(obj), (MapType)
fieldType, prop.name)
+ .convertMap();
default:
throw new UnsupportedOperationException(
String.format(
@@ -155,175 +165,6 @@ public final class PojoToRowConverter<T> {
}
}
- /**
- * Converts a text value (String or Character) from a POJO property to
Fluss BinaryString.
- *
- * <p>For CHAR columns, enforces that the text has exactly one character.
Nulls are passed
- * through.
- */
- private static @Nullable BinaryString convertTextValue(
- DataType fieldType, PojoType.Property prop, @Nullable Object v) {
- if (v == null) {
- return null;
- }
- return ConverterCommons.toBinaryStringForText(v, prop.name,
fieldType.getTypeRoot());
- }
-
- /** Converts a BigDecimal POJO property to Fluss Decimal respecting
precision and scale. */
- private static @Nullable Decimal convertDecimalValue(
- DecimalType decimalType, PojoType.Property prop, @Nullable Object
v) {
- if (v == null) {
- return null;
- }
- if (!(v instanceof BigDecimal)) {
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not a BigDecimal. Cannot convert to
Decimal.", prop.name));
- }
- final int precision = decimalType.getPrecision();
- final int scale = decimalType.getScale();
-
- // Scale with a deterministic rounding mode to avoid
ArithmeticException when rounding is
- // needed.
- BigDecimal bd = (BigDecimal) v;
- BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
-
- if (scaled.precision() > precision) {
- throw new IllegalArgumentException(
- String.format(
- "Decimal value for field %s exceeds precision %d
after scaling to %d: %s",
- prop.name, precision, scale, scaled));
- }
-
- return Decimal.fromBigDecimal(scaled, precision, scale);
- }
-
- /** Converts a LocalDate POJO property to number of days since epoch. */
- private static @Nullable Integer convertDateValue(PojoType.Property prop,
@Nullable Object v) {
- if (v == null) {
- return null;
- }
- if (!(v instanceof LocalDate)) {
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not a LocalDate. Cannot convert to
int days.", prop.name));
- }
- return (int) ((LocalDate) v).toEpochDay();
- }
-
- /** Converts a LocalTime POJO property to milliseconds of day. */
- private static @Nullable Integer convertTimeValue(PojoType.Property prop,
@Nullable Object v) {
- if (v == null) {
- return null;
- }
- if (!(v instanceof LocalTime)) {
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not a LocalTime. Cannot convert to
int millis.",
- prop.name));
- }
- LocalTime t = (LocalTime) v;
- return (int) (t.toNanoOfDay() / 1_000_000);
- }
-
- /**
- * Converts a LocalDateTime POJO property to Fluss TimestampNtz,
respecting the specified
- * precision.
- *
- * @param precision the timestamp precision (0-9)
- * @param prop the POJO property metadata
- * @param v the value to convert
- * @return TimestampNtz with precision applied, or null if v is null
- */
- private static @Nullable TimestampNtz convertTimestampNtzValue(
- int precision, PojoType.Property prop, @Nullable Object v) {
- if (v == null) {
- return null;
- }
- if (!(v instanceof LocalDateTime)) {
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not a LocalDateTime. Cannot convert
to TimestampNtz.",
- prop.name));
- }
- LocalDateTime ldt = (LocalDateTime) v;
- LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
- return TimestampNtz.fromLocalDateTime(truncated);
- }
-
- /**
- * Converts an Instant or OffsetDateTime POJO property to Fluss
TimestampLtz (UTC based),
- * respecting the specified precision.
- *
- * @param precision the timestamp precision (0-9)
- * @param prop the POJO property metadata
- * @param v the value to convert
- * @return TimestampLtz with precision applied, or null if v is null
- */
- private static @Nullable TimestampLtz convertTimestampLtzValue(
- int precision, PojoType.Property prop, @Nullable Object v) {
- if (v == null) {
- return null;
- }
- Instant instant;
- if (v instanceof Instant) {
- instant = (Instant) v;
- } else if (v instanceof OffsetDateTime) {
- instant = ((OffsetDateTime) v).toInstant();
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not an Instant or OffsetDateTime.
Cannot convert to TimestampLtz.",
- prop.name));
- }
- Instant truncated = truncateToTimestampPrecision(instant, precision);
- return TimestampLtz.fromInstant(truncated);
- }
-
- /**
- * Truncates a LocalDateTime to the specified timestamp precision.
- *
- * @param ldt the LocalDateTime to truncate
- * @param precision the precision (0-9)
- * @return truncated LocalDateTime
- */
- private static LocalDateTime truncateToTimestampPrecision(LocalDateTime
ldt, int precision) {
- if (precision >= 9) {
- return ldt;
- }
- int nanos = ldt.getNano();
- int truncatedNanos = truncateNanos(nanos, precision);
- return ldt.withNano(truncatedNanos);
- }
-
- /**
- * Truncates an Instant to the specified timestamp precision.
- *
- * @param instant the Instant to truncate
- * @param precision the precision (0-9)
- * @return truncated Instant
- */
- private static Instant truncateToTimestampPrecision(Instant instant, int
precision) {
- if (precision >= 9) {
- return instant;
- }
- int nanos = instant.getNano();
- int truncatedNanos = truncateNanos(nanos, precision);
- return Instant.ofEpochSecond(instant.getEpochSecond(), truncatedNanos);
- }
-
- /**
- * Truncates nanoseconds to the specified precision.
- *
- * @param nanos the nanoseconds value (0-999,999,999)
- * @param precision the precision (0-9)
- * @return truncated nanoseconds
- */
- private static int truncateNanos(int nanos, int precision) {
- int divisor = (int) Math.pow(10, 9 - precision);
- return (nanos / divisor) * divisor;
- }
-
private interface FieldToRow {
Object readAndConvert(Object pojo) throws Exception;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
similarity index 55%
copy from
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
copy to
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
index 0f4f6d8a4..3c8ce1bfd 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +20,12 @@ package org.apache.fluss.client.converter;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
-import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DecimalType;
-import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.MapType;
import javax.annotation.Nullable;
@@ -36,149 +36,34 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
-import java.util.List;
-
-/**
- * Converter for writer path: converts POJO instances to Fluss InternalRow
according to a (possibly
- * projected) RowType. Validation is done against the full table schema.
- */
-public final class PojoToRowConverter<T> {
-
- private final PojoType<T> pojoType;
- private final RowType tableSchema;
- private final RowType projection;
- private final List<String> projectionFieldNames;
- private final FieldToRow[] fieldConverters; // index corresponds to
projection position
-
- private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema,
RowType projection) {
- this.pojoType = pojoType;
- this.tableSchema = tableSchema;
- this.projection = projection;
- this.projectionFieldNames = projection.getFieldNames();
- // For writer path, allow POJO to be a superset of the projection. It
must contain all
- // projected fields.
- ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
- ConverterCommons.validateProjectionSubset(projection, tableSchema);
- this.fieldConverters = createFieldConverters();
- }
-
- public static <T> PojoToRowConverter<T> of(
- Class<T> pojoClass, RowType tableSchema, RowType projection) {
- return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema,
projection);
- }
-
- public GenericRow toRow(@Nullable T pojo) {
- if (pojo == null) {
- return null;
- }
- GenericRow row = new GenericRow(projection.getFieldCount());
- for (int i = 0; i < fieldConverters.length; i++) {
- Object v;
- try {
- v = fieldConverters[i].readAndConvert(pojo);
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to access field '"
- + projectionFieldNames.get(i)
- + "' from POJO "
- + pojoType.getPojoClass().getName(),
- e);
- }
- row.setField(i, v);
- }
- return row;
- }
-
- private FieldToRow[] createFieldConverters() {
- FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
- for (int i = 0; i < projection.getFieldCount(); i++) {
- String fieldName = projectionFieldNames.get(i);
- DataType fieldType = projection.getTypeAt(i);
- PojoType.Property prop = requireProperty(fieldName);
- ConverterCommons.validateCompatibility(fieldType, prop);
- arr[i] = createFieldConverter(prop, fieldType);
- }
- return arr;
- }
-
- private PojoType.Property requireProperty(String fieldName) {
- PojoType.Property p = pojoType.getProperty(fieldName);
- if (p == null) {
- throw new IllegalArgumentException(
- "Field '"
- + fieldName
- + "' not found in POJO class "
- + pojoType.getPojoClass().getName()
- + ".");
- }
- return p;
- }
-
- private static FieldToRow createFieldConverter(PojoType.Property prop,
DataType fieldType) {
- switch (fieldType.getTypeRoot()) {
- case BOOLEAN:
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- case BINARY:
- case BYTES:
- return prop::read;
- case CHAR:
- case STRING:
- return (obj) -> convertTextValue(fieldType, prop,
prop.read(obj));
- case DECIMAL:
- return (obj) -> convertDecimalValue((DecimalType) fieldType,
prop, prop.read(obj));
- case DATE:
- return (obj) -> convertDateValue(prop, prop.read(obj));
- case TIME_WITHOUT_TIME_ZONE:
- return (obj) -> convertTimeValue(prop, prop.read(obj));
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- {
- final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (obj) -> convertTimestampNtzValue(precision, prop,
prop.read(obj));
- }
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- {
- final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (obj) -> convertTimestampLtzValue(precision, prop,
prop.read(obj));
- }
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unsupported field type %s for field %s.",
- fieldType.getTypeRoot(), prop.name));
- }
- }
+import java.util.Map;
+/** Shared utilities for POJO type and Fluss type. */
+public class PojoTypeToFlussTypeConverter {
/**
* Converts a text value (String or Character) from a POJO property to
Fluss BinaryString.
*
* <p>For CHAR columns, enforces that the text has exactly one character.
Nulls are passed
* through.
*/
- private static @Nullable BinaryString convertTextValue(
- DataType fieldType, PojoType.Property prop, @Nullable Object v) {
+ static @Nullable BinaryString convertTextValue(
+ DataType fieldType, String fieldName, @Nullable Object v) {
if (v == null) {
return null;
}
- return ConverterCommons.toBinaryStringForText(v, prop.name,
fieldType.getTypeRoot());
+ return ConverterCommons.toBinaryStringForText(v, fieldName,
fieldType.getTypeRoot());
}
/** Converts a BigDecimal POJO property to Fluss Decimal respecting
precision and scale. */
- private static @Nullable Decimal convertDecimalValue(
- DecimalType decimalType, PojoType.Property prop, @Nullable Object
v) {
+ static @Nullable Decimal convertDecimalValue(
+ DecimalType decimalType, String fieldName, @Nullable Object v) {
if (v == null) {
return null;
}
if (!(v instanceof BigDecimal)) {
throw new IllegalArgumentException(
String.format(
- "Field %s is not a BigDecimal. Cannot convert to
Decimal.", prop.name));
+ "Field %s is not a BigDecimal. Cannot convert to
Decimal.", fieldName));
}
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
@@ -192,27 +77,27 @@ public final class PojoToRowConverter<T> {
throw new IllegalArgumentException(
String.format(
"Decimal value for field %s exceeds precision %d
after scaling to %d: %s",
- prop.name, precision, scale, scaled));
+ fieldName, precision, scale, scaled));
}
return Decimal.fromBigDecimal(scaled, precision, scale);
}
/** Converts a LocalDate POJO property to number of days since epoch. */
- private static @Nullable Integer convertDateValue(PojoType.Property prop,
@Nullable Object v) {
+ static @Nullable Integer convertDateValue(String fieldName, @Nullable
Object v) {
if (v == null) {
return null;
}
if (!(v instanceof LocalDate)) {
throw new IllegalArgumentException(
String.format(
- "Field %s is not a LocalDate. Cannot convert to
int days.", prop.name));
+ "Field %s is not a LocalDate. Cannot convert to
int days.", fieldName));
}
return (int) ((LocalDate) v).toEpochDay();
}
/** Converts a LocalTime POJO property to milliseconds of day. */
- private static @Nullable Integer convertTimeValue(PojoType.Property prop,
@Nullable Object v) {
+ static @Nullable Integer convertTimeValue(String fieldName, @Nullable
Object v) {
if (v == null) {
return null;
}
@@ -220,7 +105,7 @@ public final class PojoToRowConverter<T> {
throw new IllegalArgumentException(
String.format(
"Field %s is not a LocalTime. Cannot convert to
int millis.",
- prop.name));
+ fieldName));
}
LocalTime t = (LocalTime) v;
return (int) (t.toNanoOfDay() / 1_000_000);
@@ -231,12 +116,12 @@ public final class PojoToRowConverter<T> {
* precision.
*
* @param precision the timestamp precision (0-9)
- * @param prop the POJO property metadata
+ * @param fieldName the field name
* @param v the value to convert
* @return TimestampNtz with precision applied, or null if v is null
*/
- private static @Nullable TimestampNtz convertTimestampNtzValue(
- int precision, PojoType.Property prop, @Nullable Object v) {
+ static @Nullable TimestampNtz convertTimestampNtzValue(
+ int precision, String fieldName, @Nullable Object v) {
if (v == null) {
return null;
}
@@ -244,7 +129,7 @@ public final class PojoToRowConverter<T> {
throw new IllegalArgumentException(
String.format(
"Field %s is not a LocalDateTime. Cannot convert
to TimestampNtz.",
- prop.name));
+ fieldName));
}
LocalDateTime ldt = (LocalDateTime) v;
LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
@@ -256,12 +141,12 @@ public final class PojoToRowConverter<T> {
* respecting the specified precision.
*
* @param precision the timestamp precision (0-9)
- * @param prop the POJO property metadata
+ * @param fieldName the field name
* @param v the value to convert
* @return TimestampLtz with precision applied, or null if v is null
*/
- private static @Nullable TimestampLtz convertTimestampLtzValue(
- int precision, PojoType.Property prop, @Nullable Object v) {
+ static @Nullable TimestampLtz convertTimestampLtzValue(
+ int precision, String fieldName, @Nullable Object v) {
if (v == null) {
return null;
}
@@ -274,7 +159,7 @@ public final class PojoToRowConverter<T> {
throw new IllegalArgumentException(
String.format(
"Field %s is not an Instant or OffsetDateTime.
Cannot convert to TimestampLtz.",
- prop.name));
+ fieldName));
}
Instant truncated = truncateToTimestampPrecision(instant, precision);
return TimestampLtz.fromInstant(truncated);
@@ -324,7 +209,62 @@ public final class PojoToRowConverter<T> {
return (nanos / divisor) * divisor;
}
- private interface FieldToRow {
- Object readAndConvert(Object pojo) throws Exception;
+ /**
+ * Converts a Pojo element to Fluss internal type.
+ *
+ * @param obj the pojo
+ * @param elementType the data type of Fluss
+ * @param fieldName the field name
+ */
+ static @Nullable Object convertElementValue(
+ @Nullable Object obj, DataType elementType, String fieldName) {
+ if (obj == null) {
+ return null;
+ }
+
+ switch (elementType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case BYTES:
+ return obj;
+ case CHAR:
+ case STRING:
+ return
PojoTypeToFlussTypeConverter.convertTextValue(elementType, fieldName, obj);
+ case DECIMAL:
+ return PojoTypeToFlussTypeConverter.convertDecimalValue(
+ (DecimalType) elementType, fieldName, obj);
+ case DATE:
+ return
PojoTypeToFlussTypeConverter.convertDateValue(fieldName, obj);
+ case TIME_WITHOUT_TIME_ZONE:
+ return
PojoTypeToFlussTypeConverter.convertTimeValue(fieldName, obj);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(elementType);
+ return
PojoTypeToFlussTypeConverter.convertTimestampNtzValue(
+ precision, fieldName, obj);
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ {
+ final int precision =
DataTypeChecks.getPrecision(elementType);
+ return
PojoTypeToFlussTypeConverter.convertTimestampLtzValue(
+ precision, fieldName, obj);
+ }
+ case ARRAY:
+ return new PojoArrayToFlussArray(obj, elementType,
fieldName).convertArray();
+ case MAP:
+ return new PojoMapToFlussMap((Map<?, ?>) obj, (MapType)
elementType, fieldName)
+ .convertMap();
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported field type %s for field %s.",
+ elementType.getTypeRoot(), fieldName));
+ }
}
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
index a396285f5..aa7b0d315 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java
@@ -17,26 +17,19 @@
package org.apache.fluss.client.converter;
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
-import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
+import java.util.Collection;
import java.util.List;
/**
@@ -50,6 +43,7 @@ public final class RowToPojoConverter<T> {
private final RowType projection;
private final List<String> projectionFieldNames;
private final RowToField[] rowReaders;
+ private final PojoType.Property[] rowProps;
private RowToPojoConverter(PojoType<T> pojoType, RowType tableSchema,
RowType projection) {
this.pojoType = pojoType;
@@ -58,7 +52,10 @@ public final class RowToPojoConverter<T> {
this.projectionFieldNames = projection.getFieldNames();
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
ConverterCommons.validateProjectionSubset(projection, tableSchema);
- this.rowReaders = createRowReaders();
+ int fieldCount = projection.getFieldCount();
+ this.rowReaders = new RowToField[fieldCount];
+ this.rowProps = new PojoType.Property[fieldCount];
+ createRowReaders();
}
public static <T> RowToPojoConverter<T> of(
@@ -75,9 +72,8 @@ public final class RowToPojoConverter<T> {
for (int i = 0; i < rowReaders.length; i++) {
if (!row.isNullAt(i)) {
Object v = rowReaders[i].convert(row, i);
- PojoType.Property prop =
pojoType.getProperty(projectionFieldNames.get(i));
if (v != null) {
- prop.write(pojo, v);
+ rowProps[i].write(pojo, v);
}
}
}
@@ -96,16 +92,15 @@ public final class RowToPojoConverter<T> {
}
}
- private RowToField[] createRowReaders() {
- RowToField[] arr = new RowToField[projection.getFieldCount()];
- for (int i = 0; i < projection.getFieldCount(); i++) {
+ private void createRowReaders() {
+ for (int i = 0; i < rowReaders.length; i++) {
String name = projectionFieldNames.get(i);
DataType type = projection.getTypeAt(i);
PojoType.Property prop = requireProperty(name);
ConverterCommons.validateCompatibility(type, prop);
- arr[i] = createRowReader(type, prop);
+ rowReaders[i] = createRowReader(type, prop);
+ rowProps[i] = prop;
}
- return arr;
}
private PojoType.Property requireProperty(String fieldName) {
@@ -139,26 +134,68 @@ public final class RowToPojoConverter<T> {
return InternalRow::getDouble;
case CHAR:
case STRING:
- return (row, pos) -> convertTextValue(fieldType, prop,
row.getString(pos));
+ return (row, pos) ->
+ FlussTypeToPojoTypeConverter.convertTextValue(
+ fieldType, prop.name, prop.type,
row.getString(pos));
case BINARY:
case BYTES:
return InternalRow::getBytes;
case DECIMAL:
- return (row, pos) -> convertDecimalValue((DecimalType)
fieldType, row, pos);
+ DecimalType decimalType = (DecimalType) fieldType;
+ return (row, pos) ->
+ FlussTypeToPojoTypeConverter.convertDecimalValue(
+ row.getDecimal(
+ pos, decimalType.getPrecision(),
decimalType.getScale()));
case DATE:
- return RowToPojoConverter::convertDateValue;
+ return (row, pos) ->
FlussTypeToPojoTypeConverter.convertDateValue(row.getInt(pos));
case TIME_WITHOUT_TIME_ZONE:
- return RowToPojoConverter::convertTimeValue;
+ return (row, pos) ->
FlussTypeToPojoTypeConverter.convertTimeValue(row.getInt(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
{
final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (row, pos) -> convertTimestampNtzValue(precision,
row, pos);
+ return (row, pos) ->
+
FlussTypeToPojoTypeConverter.convertTimestampNtzValue(
+ row.getTimestampNtz(pos, precision));
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
{
final int precision =
DataTypeChecks.getPrecision(fieldType);
- return (row, pos) -> convertTimestampLtzValue(precision,
prop, row, pos);
+ return (row, pos) ->
+
FlussTypeToPojoTypeConverter.convertTimestampLtzValue(
+ row.getTimestampLtz(pos, precision),
prop.name, prop.type);
}
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) fieldType;
+ if (Collection.class.isAssignableFrom(prop.type)) {
+ // POJO field is a List / Collection — deserialize as
ArrayList<Object>
+ return (row, pos) -> {
+ InternalArray array = row.getArray(pos);
+ return array == null
+ ? null
+ : new FlussArrayToPojoArray(
+ array,
+ arrayType.getElementType(),
+ prop.name,
+ Object.class)
+ .convertList();
+ };
+ }
+ final Class<?> componentType = prop.type.getComponentType();
+ return (row, pos) -> {
+ InternalArray array = row.getArray(pos);
+ return array == null
+ ? null
+ : new FlussArrayToPojoArray(
+ array,
+ arrayType.getElementType(),
+ prop.name,
+ componentType)
+ .convertArray();
+ };
+ case MAP:
+ return (row, pos) ->
+ new FlussMapToPojoMap(row.getMap(pos), (MapType)
fieldType, prop.name)
+ .convertMap();
default:
throw new UnsupportedOperationException(
String.format(
@@ -167,98 +204,6 @@ public final class RowToPojoConverter<T> {
}
}
- /**
- * Converts a text value (CHAR/STRING) read from an InternalRow into the
target Java type
- * declared by the POJO property.
- *
- * <p>Supported target types are String and Character. For CHAR columns,
this enforces that the
- * value has exactly one character. For Character targets, empty strings
are rejected.
- *
- * @param fieldType Fluss column DataType (must be CHAR or STRING)
- * @param prop The target POJO property (used for type and error messages)
- * @param s The BinaryString read from the row
- * @return Converted Java value (String or Character)
- * @throws IllegalArgumentException if the target type is unsupported or
constraints are
- * violated
- */
- private static Object convertTextValue(
- DataType fieldType, PojoType.Property prop, BinaryString s) {
- String v = s.toString();
- if (prop.type == String.class) {
- if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
- throw new IllegalArgumentException(
- charLengthExceptionMessage(prop.name, v.length()));
- }
- return v;
- } else if (prop.type == Character.class) {
- if (v.isEmpty()) {
- throw new IllegalArgumentException(
- String.format(
- "Field %s expects Character, but the string
value is empty.",
- prop.name));
- }
- if (fieldType.getTypeRoot() == DataTypeRoot.CHAR && v.length() !=
1) {
- throw new IllegalArgumentException(
- charLengthExceptionMessage(prop.name, v.length()));
- }
- return v.charAt(0);
- }
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not a String or Character. Cannot convert
from string.",
- prop.name));
- }
-
- public static String charLengthExceptionMessage(String fieldName, int
length) {
- return String.format(
- "Field %s expects exactly one character for CHAR type, got
length %d.",
- fieldName, length);
- }
-
- /**
- * Converts a DECIMAL value from an InternalRow into a BigDecimal using
the column's precision
- * and scale. The row position is assumed non-null (caller checks), so
this never returns null.
- */
- private static BigDecimal convertDecimalValue(
- DecimalType decimalType, InternalRow row, int pos) {
- Decimal d = row.getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale());
- return d.toBigDecimal();
- }
-
- /** Converts a DATE value stored as int days since epoch to a LocalDate. */
- private static LocalDate convertDateValue(InternalRow row, int pos) {
- return LocalDate.ofEpochDay(row.getInt(pos));
- }
-
- /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to
a LocalTime. */
- private static LocalTime convertTimeValue(InternalRow row, int pos) {
- return LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L);
- }
-
- /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime
honoring precision. */
- private static Object convertTimestampNtzValue(int precision, InternalRow
row, int pos) {
- TimestampNtz t = row.getTimestampNtz(pos, precision);
- return t.toLocalDateTime();
- }
-
- /**
- * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or
OffsetDateTime in UTC,
- * depending on the target POJO property type.
- */
- private static Object convertTimestampLtzValue(
- int precision, PojoType.Property prop, InternalRow row, int pos) {
- TimestampLtz t = row.getTimestampLtz(pos, precision);
- if (prop.type == Instant.class) {
- return t.toInstant();
- } else if (prop.type == OffsetDateTime.class) {
- return OffsetDateTime.ofInstant(t.toInstant(), ZoneOffset.UTC);
- }
- throw new IllegalArgumentException(
- String.format(
- "Field %s is not an Instant or OffsetDateTime. Cannot
convert from TimestampData.",
- prop.name));
- }
-
private interface RowToField {
Object convert(InternalRow row, int pos) throws Exception;
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
index 8863b9f23..8fecb7b38 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java
@@ -28,6 +28,7 @@ import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -53,10 +54,13 @@ public final class ConvertersTestFixtures {
.field("timestampField", DataTypes.TIMESTAMP())
.field("timestampLtzField", DataTypes.TIMESTAMP_LTZ())
.field("offsetDateTimeField", DataTypes.TIMESTAMP_LTZ())
+ .field("arrayField", DataTypes.ARRAY(DataTypes.INT()))
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
.build();
}
// ----------------------- Helper POJOs -----------------------
+
/** Test POJO used for end-to-end converter tests. */
public static class TestPojo {
public Boolean booleanField;
@@ -74,6 +78,8 @@ public final class ConvertersTestFixtures {
public LocalDateTime timestampField;
public Instant timestampLtzField;
public OffsetDateTime offsetDateTimeField;
+ public Integer[] arrayField;
+ public Map<String, Integer> mapField;
public TestPojo() {}
@@ -92,7 +98,9 @@ public final class ConvertersTestFixtures {
LocalTime timeField,
LocalDateTime timestampField,
Instant timestampLtzField,
- OffsetDateTime offsetDateTimeField) {
+ OffsetDateTime offsetDateTimeField,
+ Integer[] arrayField,
+ Map<String, Integer> mapField) {
this.booleanField = booleanField;
this.byteField = byteField;
this.shortField = shortField;
@@ -108,6 +116,8 @@ public final class ConvertersTestFixtures {
this.timestampField = timestampField;
this.timestampLtzField = timestampLtzField;
this.offsetDateTimeField = offsetDateTimeField;
+ this.arrayField = arrayField;
+ this.mapField = mapField;
}
public static TestPojo sample() {
@@ -126,7 +136,14 @@ public final class ConvertersTestFixtures {
LocalTime.of(15, 1, 30),
LocalDateTime.of(2025, 7, 23, 15, 1, 30),
Instant.parse("2025-07-23T15:01:30Z"),
- OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0,
ZoneOffset.UTC));
+ OffsetDateTime.of(2025, 7, 23, 15, 1, 30, 0,
ZoneOffset.UTC),
+ new Integer[] {1, 2},
+ new HashMap<String, Integer>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ });
}
@Override
@@ -152,7 +169,9 @@ public final class ConvertersTestFixtures {
&& Objects.equals(timeField, testPojo.timeField)
&& Objects.equals(timestampField, testPojo.timestampField)
&& Objects.equals(timestampLtzField,
testPojo.timestampLtzField)
- && Objects.equals(offsetDateTimeField,
testPojo.offsetDateTimeField);
+ && Objects.equals(offsetDateTimeField,
testPojo.offsetDateTimeField)
+ && Arrays.equals(arrayField, testPojo.arrayField)
+ && Objects.equals(mapField, testPojo.mapField);
}
@Override
@@ -172,8 +191,10 @@ public final class ConvertersTestFixtures {
timeField,
timestampField,
timestampLtzField,
- offsetDateTimeField);
+ offsetDateTimeField,
+ mapField);
result = 31 * result + Arrays.hashCode(bytesField);
+ result = 31 * result + Arrays.hashCode(arrayField);
return result;
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
new file mode 100644
index 000000000..c241a966f
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/FlussArrayToPojoArrayTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FlussArrayToPojoArray}. */
+public class FlussArrayToPojoArrayTest {
+ @Test
+ public void testArrayWithAllTypes() {
+ RowType table =
+ RowType.builder()
+ .field("booleanArray",
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .field("byteArray",
DataTypes.ARRAY(DataTypes.TINYINT()))
+ .field("shortArray",
DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+ .field("longArray",
DataTypes.ARRAY(DataTypes.BIGINT()))
+ .field("floatArray",
DataTypes.ARRAY(DataTypes.FLOAT()))
+ .field("doubleArray",
DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .field("stringArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("decimalArray",
DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+ .field("dateArray", DataTypes.ARRAY(DataTypes.DATE()))
+ .field("timeArray", DataTypes.ARRAY(DataTypes.TIME()))
+ .field("timestampArray",
DataTypes.ARRAY(DataTypes.TIMESTAMP(3)))
+ .field("timestampLtzArray",
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+ .field("nestedIntArray",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+ .field(
+ "mapArray",
+
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())))
+ .build();
+
+ PojoToRowConverter<ArrayPojo> writer =
PojoToRowConverter.of(ArrayPojo.class, table, table);
+ RowToPojoConverter<ArrayPojo> reader =
RowToPojoConverter.of(ArrayPojo.class, table, table);
+
+ ArrayPojo pojo = ArrayPojo.sample();
+
+ // POJO -> Row -> POJO
+ GenericRow row = writer.toRow(pojo);
+ ArrayPojo back = reader.fromRow(row);
+
+ // Verify boolean array
+ Object[] boolArray = back.booleanArray;
+ assertThat(boolArray.length).isEqualTo(2);
+ assertThat(boolArray).isEqualTo(new Boolean[] {true, false});
+
+ // Verify byte array
+ Object[] byteArray = back.byteArray;
+ assertThat(byteArray.length).isEqualTo(2);
+ assertThat(byteArray).isEqualTo(new Byte[] {1, 2});
+
+ // Verify short array
+ Object[] shortArray = back.shortArray;
+ assertThat(shortArray.length).isEqualTo(2);
+ assertThat(shortArray).isEqualTo(new Short[] {100, 200});
+
+ // Verify int array
+ Object[] intArray = back.intArray;
+ assertThat(intArray.length).isEqualTo(2);
+ assertThat(intArray).isEqualTo(new Integer[] {1000, 2000});
+
+ // Verify long array
+ Object[] longArray = back.longArray;
+ assertThat(longArray.length).isEqualTo(2);
+ assertThat(longArray).isEqualTo(new Long[] {10000L, 20000L});
+
+ // Verify float array
+ Object[] floatArray = back.floatArray;
+ assertThat(floatArray.length).isEqualTo(2);
+ assertThat(floatArray).isEqualTo(new Float[] {1.1f, 2.2f});
+
+ // Verify double array
+ Object[] doubleArray = back.doubleArray;
+ assertThat(doubleArray.length).isEqualTo(2);
+ assertThat(doubleArray).isEqualTo(new Double[] {1.11, 2.22});
+
+ // Verify string array
+ Object[] stringArray = back.stringArray;
+ assertThat(stringArray.length).isEqualTo(2);
+ assertThat(stringArray).isEqualTo(new String[] {"hello", "world"});
+
+ // Verify decimal array
+ Object[] decimalArray = back.decimalArray;
+ assertThat(decimalArray.length).isEqualTo(2);
+ assertThat(decimalArray)
+ .isEqualTo(new BigDecimal[] {new BigDecimal("123.45"), new
BigDecimal("678.90")});
+
+ // Verify date array (days since epoch)
+ Object[] dateArray = back.dateArray;
+ assertThat(dateArray.length).isEqualTo(2);
+ assertThat(dateArray)
+ .isEqualTo(new LocalDate[] {LocalDate.of(2025, 1, 1),
LocalDate.of(2025, 12, 31)});
+
+ // Verify time array (millis of day)
+ Object[] timeArray = back.timeArray;
+ assertThat(timeArray.length).isEqualTo(2);
+ assertThat(timeArray)
+ .isEqualTo(new LocalTime[] {LocalTime.MIDNIGHT,
LocalTime.of(12, 30, 0)});
+
+ // Verify timestamp array
+ Object[] timestampArray = back.timestampArray;
+ assertThat(timestampArray.length).isEqualTo(2);
+ assertThat(timestampArray)
+ .isEqualTo(
+ new LocalDateTime[] {
+ LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+ LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+ });
+
+ // Verify timestampLtz array
+ Object[] timestampLtzArray = back.timestampLtzArray;
+ assertThat(timestampLtzArray.length).isEqualTo(2);
+ assertThat(timestampLtzArray)
+ .isEqualTo(
+ new Instant[] {
+ Instant.parse("2025-01-01T00:00:00Z"),
+ Instant.parse("2025-12-31T23:59:59Z")
+ });
+
+ // Verify nested array (array<array<int>>)
+ Object[][] nestedIntArray = back.nestedIntArray;
+ assertThat(nestedIntArray.length).isEqualTo(2);
+ assertThat(back.nestedIntArray)
+ .isEqualTo(
+ new Integer[][] {
+ {1, 2},
+ {3, 4, 5}
+ });
+ // Verify map array (array<map<string, int>>)
+ Object[] mapArray = back.mapArray;
+ assertThat(mapArray.length).isEqualTo(2);
+ assertThat(back.mapArray)
+ .isEqualTo(
+ new HashMap[] {
+ new HashMap<String, Integer>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ },
+ new HashMap<String, Integer>() {
+ {
+ put("test_3", 3);
+ put("test_4", 4);
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testArrayWithNullElements() {
+ RowType table =
+ RowType.builder()
+ .field("stringArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("intObjectArray",
DataTypes.ARRAY(DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<NullableArrayPojo> writer =
+ PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+ RowToPojoConverter<NullableArrayPojo> reader =
+ RowToPojoConverter.of(NullableArrayPojo.class, table, table);
+
+ NullableArrayPojo pojo = new NullableArrayPojo();
+ pojo.stringArray = new String[] {"hello", null, "world"};
+ pojo.intObjectArray = new Integer[] {1, null, 3};
+
+ // POJO -> Row -> POJO
+ GenericRow row = writer.toRow(pojo);
+ NullableArrayPojo back = reader.fromRow(row);
+
+ Object[] stringArray = back.stringArray;
+ assertThat(stringArray.length).isEqualTo(3);
+ assertThat(stringArray).isEqualTo(new String[] {"hello", null,
"world"});
+
+ Object[] intArray = back.intObjectArray;
+ assertThat(intArray.length).isEqualTo(3);
+ assertThat(intArray).isEqualTo(new Integer[] {1, null, 3});
+ }
+
+ @Test
+ public void testNullArrayField() {
+ RowType table =
+ RowType.builder().field("intArray",
DataTypes.ARRAY(DataTypes.INT())).build();
+
+ PojoToRowConverter<SimpleArrayPojo> writer =
+ PojoToRowConverter.of(SimpleArrayPojo.class, table, table);
+ RowToPojoConverter<SimpleArrayPojo> reader =
+ RowToPojoConverter.of(SimpleArrayPojo.class, table, table);
+
+ SimpleArrayPojo pojo = new SimpleArrayPojo();
+ pojo.intArray = null;
+
+ // POJO -> Row -> POJO
+ GenericRow row = writer.toRow(pojo);
+ SimpleArrayPojo back = reader.fromRow(row);
+ assertThat(back.intArray).isEqualTo(null);
+ }
+
+ @Test
+ public void testListFieldsOnReadPath() {
+ // POJO fields declared as List<T> should deserialize as
ArrayList<Object>
+ RowType table =
+ RowType.builder()
+ .field("intList", DataTypes.ARRAY(DataTypes.INT()))
+ .field("strList", DataTypes.ARRAY(DataTypes.STRING()))
+ .field("nullableList",
DataTypes.ARRAY(DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<ListFieldPojo> writer =
+ PojoToRowConverter.of(ListFieldPojo.class, table, table);
+ RowToPojoConverter<ListFieldPojo> reader =
+ RowToPojoConverter.of(ListFieldPojo.class, table, table);
+
+ ListFieldPojo pojo = new ListFieldPojo();
+ pojo.intList = Arrays.asList(1, 2, 3);
+ pojo.strList = Arrays.asList("x", "y");
+ pojo.nullableList = Arrays.asList(10, null, 30);
+
+ GenericRow row = writer.toRow(pojo);
+ ListFieldPojo back = reader.fromRow(row);
+
+ assertThat(back.intList).isInstanceOf(List.class).containsExactly(1,
2, 3);
+ assertThat(back.strList).isInstanceOf(List.class).containsExactly("x",
"y");
+
assertThat(back.nullableList).isInstanceOf(List.class).containsExactly(10,
null, 30);
+ }
+
+ /** POJO with List fields for read-path List deserialization tests. */
+ public static class ListFieldPojo {
+ public List<Integer> intList;
+ public List<String> strList;
+ public List<Integer> nullableList;
+
+ public ListFieldPojo() {}
+ }
+
+ @Test
+ public void testTypedArrayFieldsOnReadPath() {
+ // Verify that typed POJO array fields (String[], Character[],
OffsetDateTime[]) are
+ // handled correctly on the read path — not just Object[].
+ RowType table =
+ RowType.builder()
+ .field("stringArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("charArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+ .field("timestampLtzArray",
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+ .build();
+
+ PojoToRowConverter<TypedArrayPojo> writer =
+ PojoToRowConverter.of(TypedArrayPojo.class, table, table);
+ RowToPojoConverter<TypedArrayPojo> reader =
+ RowToPojoConverter.of(TypedArrayPojo.class, table, table);
+
+ TypedArrayPojo pojo = new TypedArrayPojo();
+ pojo.stringArray = new String[] {"hello", "world"};
+ pojo.charArray = new Character[] {'A', 'B'};
+ pojo.intArray = new Integer[] {10, 20};
+ pojo.timestampLtzArray =
+ new OffsetDateTime[] {
+ OffsetDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
+ OffsetDateTime.of(2025, 6, 15, 12, 0, 0, 0, ZoneOffset.UTC)
+ };
+
+ GenericRow row = writer.toRow(pojo);
+ TypedArrayPojo back = reader.fromRow(row);
+
+ assertThat((String[]) back.stringArray).isEqualTo(new String[]
{"hello", "world"});
+ assertThat((Character[]) back.charArray).isEqualTo(new Character[]
{'A', 'B'});
+ assertThat((Integer[]) back.intArray).isEqualTo(new Integer[] {10,
20});
+ assertThat(((OffsetDateTime[]) back.timestampLtzArray)[0])
+ .isEqualTo(OffsetDateTime.of(2025, 1, 1, 0, 0, 0, 0,
ZoneOffset.UTC));
+ assertThat(((OffsetDateTime[]) back.timestampLtzArray)[1])
+ .isEqualTo(OffsetDateTime.of(2025, 6, 15, 12, 0, 0, 0,
ZoneOffset.UTC));
+ }
+
+ @Test
+ public void testNullElementInPrimitiveArrayThrows() {
+ // A Fluss ARRAY<INT> with a null element cannot be read into an int[]
POJO field.
+ // The converter must throw a clear NullPointerException rather than a
cryptic one.
+ RowType table =
+ RowType.builder().field("intObjectArray",
DataTypes.ARRAY(DataTypes.INT())).build();
+
+ PojoToRowConverter<NullableArrayPojo> writer =
+ PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+ RowToPojoConverter<PrimitiveIntArrayPojo> reader =
+ RowToPojoConverter.of(PrimitiveIntArrayPojo.class, table,
table);
+
+ NullableArrayPojo pojo = new NullableArrayPojo();
+ pojo.stringArray = null; // unused — only intObjectArray is in
projection
+ pojo.intObjectArray = new Integer[] {1, null, 3};
+
+ GenericRow row = writer.toRow(pojo);
+ assertThatThrownBy(() -> reader.fromRow(row))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("primitive array")
+ .hasMessageContaining("intObjectArray");
+ }
+
+ /** POJO with typed (non-Object[]) array fields for read-path
type-fidelity tests. */
+ public static class TypedArrayPojo {
+ public String[] stringArray;
+ public Character[] charArray;
+ public Integer[] intArray;
+ public OffsetDateTime[] timestampLtzArray;
+
+ public TypedArrayPojo() {}
+ }
+
+ /** POJO with primitive int[] for null-element guard test. */
+ public static class PrimitiveIntArrayPojo {
+ public int[] intObjectArray;
+
+ public PrimitiveIntArrayPojo() {}
+ }
+
+ /** POJO for testing all array types. */
+ @SuppressWarnings("unchecked")
+ public static class ArrayPojo {
+ public Object[] booleanArray;
+ public Object[] byteArray;
+ public Object[] shortArray;
+ public Object[] intArray;
+ public Object[] longArray;
+ public Object[] floatArray;
+ public Object[] doubleArray;
+ public Object[] stringArray;
+ public Object[] decimalArray;
+ public Object[] dateArray;
+ public Object[] timeArray;
+ public Object[] timestampArray;
+ public Object[] timestampLtzArray;
+ public Object[][] nestedIntArray;
+ public Map<Object, Object>[] mapArray;
+
+ public ArrayPojo() {}
+
+ public static ArrayPojo sample() {
+ ArrayPojo pojo = new ArrayPojo();
+ pojo.booleanArray = new Boolean[] {true, false};
+ pojo.byteArray = new Byte[] {1, 2};
+ pojo.shortArray = new Short[] {100, 200};
+ pojo.intArray = new Integer[] {1000, 2000};
+ pojo.longArray = new Long[] {10000L, 20000L};
+ pojo.floatArray = new Float[] {1.1f, 2.2f};
+ pojo.doubleArray = new Double[] {1.11, 2.22};
+ pojo.stringArray = new String[] {"hello", "world"};
+ pojo.decimalArray =
+ new BigDecimal[] {new BigDecimal("123.45"), new
BigDecimal("678.90")};
+ pojo.dateArray = new LocalDate[] {LocalDate.of(2025, 1, 1),
LocalDate.of(2025, 12, 31)};
+ pojo.timeArray = new LocalTime[] {LocalTime.MIDNIGHT,
LocalTime.of(12, 30, 0)};
+ pojo.timestampArray =
+ new LocalDateTime[] {
+ LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+ LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+ };
+ pojo.timestampLtzArray =
+ new Instant[] {
+ Instant.parse("2025-01-01T00:00:00Z"),
Instant.parse("2025-12-31T23:59:59Z")
+ };
+ pojo.nestedIntArray =
+ new Integer[][] {
+ {1, 2},
+ {3, 4, 5}
+ };
+ pojo.mapArray =
+ new HashMap[] {
+ new HashMap<Object, Object>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ },
+ new HashMap<Object, Object>() {
+ {
+ put("test_3", 3);
+ put("test_4", 4);
+ }
+ }
+ };
+ return pojo;
+ }
+ }
+
+ /** POJO for testing arrays with null elements. */
+ public static class NullableArrayPojo {
+ public Object[] stringArray;
+ public Object[] intObjectArray;
+
+ public NullableArrayPojo() {}
+ }
+
+ /** Simple POJO for testing null array field. */
+ public static class SimpleArrayPojo {
+ public Object[] intArray;
+
+ public SimpleArrayPojo() {}
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
new file mode 100644
index 000000000..2c21e7222
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoArrayToFlussArrayTest.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PojoArrayToFlussArray}. */
+public class PojoArrayToFlussArrayTest {
+ @Test
+ public void testArrayWithAllTypes() {
+ // Schema with all array types
+ RowType table =
+ RowType.builder()
+ .field("primitiveBooleanArray",
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .field("primitiveByteArray",
DataTypes.ARRAY(DataTypes.TINYINT()))
+ .field("primitiveShortArray",
DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .field("primitiveIntArray",
DataTypes.ARRAY(DataTypes.INT()))
+ .field("primitiveLongArray",
DataTypes.ARRAY(DataTypes.BIGINT()))
+ .field("primitiveFloatArray",
DataTypes.ARRAY(DataTypes.FLOAT()))
+ .field("primitiveDoubleArray",
DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .field("booleanArray",
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .field("byteArray",
DataTypes.ARRAY(DataTypes.TINYINT()))
+ .field("shortArray",
DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .field("intArray", DataTypes.ARRAY(DataTypes.INT()))
+ .field("longArray",
DataTypes.ARRAY(DataTypes.BIGINT()))
+ .field("floatArray",
DataTypes.ARRAY(DataTypes.FLOAT()))
+ .field("doubleArray",
DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .field("stringArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("decimalArray",
DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+ .field("dateArray", DataTypes.ARRAY(DataTypes.DATE()))
+ .field("timeArray", DataTypes.ARRAY(DataTypes.TIME()))
+ .field("timestampArray",
DataTypes.ARRAY(DataTypes.TIMESTAMP(3)))
+ .field("timestampLtzArray",
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))
+ .field("nestedIntArray",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+ .field(
+ "mapArray",
+
DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())))
+ .build();
+
+ PojoToRowConverter<ArrayPojo> writer =
PojoToRowConverter.of(ArrayPojo.class, table, table);
+
+ ArrayPojo pojo = ArrayPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+
+ // Verify primitive boolean array
+ InternalArray primitiveBoolArray = row.getArray(0);
+ assertThat(primitiveBoolArray.size()).isEqualTo(2);
+ assertThat(primitiveBoolArray.getBoolean(0)).isTrue();
+ assertThat(primitiveBoolArray.getBoolean(1)).isFalse();
+
+ // Verify primitive byte array
+ InternalArray primitiveByteArray = row.getArray(1);
+ assertThat(primitiveByteArray.size()).isEqualTo(2);
+ assertThat(primitiveByteArray.getByte(0)).isEqualTo((byte) 1);
+ assertThat(primitiveByteArray.getByte(1)).isEqualTo((byte) 2);
+
+ // Verify primitive short array
+ InternalArray primitiveShortArray = row.getArray(2);
+ assertThat(primitiveShortArray.size()).isEqualTo(2);
+ assertThat(primitiveShortArray.getShort(0)).isEqualTo((short) 100);
+ assertThat(primitiveShortArray.getShort(1)).isEqualTo((short) 200);
+
+ // Verify primitive int array
+ InternalArray primitiveIntArray = row.getArray(3);
+ assertThat(primitiveIntArray.size()).isEqualTo(2);
+ assertThat(primitiveIntArray.getInt(0)).isEqualTo(1000);
+ assertThat(primitiveIntArray.getInt(1)).isEqualTo(2000);
+
+ // Verify primitive long array
+ InternalArray primitiveLongArray = row.getArray(4);
+ assertThat(primitiveLongArray.size()).isEqualTo(2);
+ assertThat(primitiveLongArray.getLong(0)).isEqualTo(10000L);
+ assertThat(primitiveLongArray.getLong(1)).isEqualTo(20000L);
+
+ // Verify primitive float array
+ InternalArray primitiveFloatArray = row.getArray(5);
+ assertThat(primitiveFloatArray.size()).isEqualTo(2);
+ assertThat(primitiveFloatArray.getFloat(0)).isEqualTo(1.1f);
+ assertThat(primitiveFloatArray.getFloat(1)).isEqualTo(2.2f);
+
+ // Verify primitive double array
+ InternalArray primitiveDoubleArray = row.getArray(6);
+ assertThat(primitiveDoubleArray.size()).isEqualTo(2);
+ assertThat(primitiveDoubleArray.getDouble(0)).isEqualTo(1.11);
+ assertThat(primitiveDoubleArray.getDouble(1)).isEqualTo(2.22);
+
+ // Verify boolean array
+ InternalArray boolArray = row.getArray(7);
+ assertThat(boolArray.size()).isEqualTo(2);
+ assertThat(boolArray.getBoolean(0)).isTrue();
+ assertThat(boolArray.getBoolean(1)).isFalse();
+
+ // Verify byte array
+ InternalArray byteArray = row.getArray(8);
+ assertThat(byteArray.size()).isEqualTo(2);
+ assertThat(byteArray.getByte(0)).isEqualTo((byte) 1);
+ assertThat(byteArray.getByte(1)).isEqualTo((byte) 2);
+
+ // Verify short array
+ InternalArray shortArray = row.getArray(9);
+ assertThat(shortArray.size()).isEqualTo(2);
+ assertThat(shortArray.getShort(0)).isEqualTo((short) 100);
+ assertThat(shortArray.getShort(1)).isEqualTo((short) 200);
+
+ // Verify int array
+ InternalArray intArray = row.getArray(10);
+ assertThat(intArray.size()).isEqualTo(2);
+ assertThat(intArray.getInt(0)).isEqualTo(1000);
+ assertThat(intArray.getInt(1)).isEqualTo(2000);
+
+ // Verify long array
+ InternalArray longArray = row.getArray(11);
+ assertThat(longArray.size()).isEqualTo(2);
+ assertThat(longArray.getLong(0)).isEqualTo(10000L);
+ assertThat(longArray.getLong(1)).isEqualTo(20000L);
+
+ // Verify float array
+ InternalArray floatArray = row.getArray(12);
+ assertThat(floatArray.size()).isEqualTo(2);
+ assertThat(floatArray.getFloat(0)).isEqualTo(1.1f);
+ assertThat(floatArray.getFloat(1)).isEqualTo(2.2f);
+
+ // Verify double array
+ InternalArray doubleArray = row.getArray(13);
+ assertThat(doubleArray.size()).isEqualTo(2);
+ assertThat(doubleArray.getDouble(0)).isEqualTo(1.11);
+ assertThat(doubleArray.getDouble(1)).isEqualTo(2.22);
+
+ // Verify string array
+ InternalArray stringArray = row.getArray(14);
+ assertThat(stringArray.size()).isEqualTo(2);
+ assertThat(stringArray.getString(0).toString()).isEqualTo("hello");
+ assertThat(stringArray.getString(1).toString()).isEqualTo("world");
+
+ // Verify decimal array
+ InternalArray decimalArray = row.getArray(15);
+ assertThat(decimalArray.size()).isEqualTo(2);
+ assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal())
+ .isEqualTo(new BigDecimal("123.45"));
+ assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal())
+ .isEqualTo(new BigDecimal("678.90"));
+
+ // Verify date array (days since epoch)
+ InternalArray dateArray = row.getArray(16);
+ assertThat(dateArray.size()).isEqualTo(2);
+ assertThat(dateArray.getInt(0)).isEqualTo((int) LocalDate.of(2025, 1,
1).toEpochDay());
+ assertThat(dateArray.getInt(1)).isEqualTo((int) LocalDate.of(2025, 12,
31).toEpochDay());
+
+ // Verify time array (millis of day)
+ InternalArray timeArray = row.getArray(17);
+ assertThat(timeArray.size()).isEqualTo(2);
+ assertThat(timeArray.getInt(0)).isEqualTo(0); // midnight
+ assertThat(timeArray.getInt(1))
+ .isEqualTo((int) (LocalTime.of(12, 30, 0).toNanoOfDay() /
1_000_000));
+
+ // Verify timestamp array
+ InternalArray timestampArray = row.getArray(18);
+ assertThat(timestampArray.size()).isEqualTo(2);
+ assertThat(timestampArray.getTimestampNtz(0, 3).getMillisecond())
+ .isEqualTo(
+ LocalDateTime.of(2025, 7, 23, 15, 0, 0)
+ .atZone(java.time.ZoneOffset.UTC)
+ .toInstant()
+ .toEpochMilli());
+
+ // Verify timestampLtz array
+ InternalArray timestampLtzArray = row.getArray(19);
+ assertThat(timestampLtzArray.size()).isEqualTo(2);
+ assertThat(timestampLtzArray.getTimestampLtz(0,
3).getEpochMillisecond())
+
.isEqualTo(Instant.parse("2025-01-01T00:00:00Z").toEpochMilli());
+
+ // Verify nested array (array<array<int>>)
+ InternalArray nestedArray = row.getArray(20);
+ assertThat(nestedArray.size()).isEqualTo(2);
+ InternalArray innerArray1 = nestedArray.getArray(0);
+ assertThat(innerArray1.getInt(0)).isEqualTo(1);
+ assertThat(innerArray1.getInt(1)).isEqualTo(2);
+ InternalArray innerArray2 = nestedArray.getArray(1);
+ assertThat(innerArray2.getInt(0)).isEqualTo(3);
+ assertThat(innerArray2.getInt(1)).isEqualTo(4);
+ assertThat(innerArray2.getInt(2)).isEqualTo(5);
+
+ // Verify map array (array<map<string, int>>)
+ InternalArray mapArray = row.getArray(21);
+ assertThat(mapArray.size()).isEqualTo(2);
+ // Verify inner map 1
+ InternalMap innerMap1 = mapArray.getMap(0);
+ assertThat(innerMap1.size()).isEqualTo(2);
+ InternalArray keyArray1 = innerMap1.keyArray();
+ InternalArray valueArray1 = innerMap1.valueArray();
+
+ Map<String, Integer> resultMap1 = new HashMap<>();
+ resultMap1.put(keyArray1.getString(0).toString(),
valueArray1.getInt(0));
+ resultMap1.put(keyArray1.getString(1).toString(),
valueArray1.getInt(1));
+
+ assertThat(resultMap1).containsEntry("test_1", 1);
+ assertThat(resultMap1).containsEntry("test_2", 2);
+ // Verify inner map 2
+ InternalMap innerMap2 = mapArray.getMap(1);
+ assertThat(innerMap2.size()).isEqualTo(2);
+ InternalArray keyArray2 = innerMap2.keyArray();
+ InternalArray valueArray2 = innerMap2.valueArray();
+
+ Map<String, Integer> resultMap2 = new HashMap<>();
+ resultMap2.put(keyArray2.getString(0).toString(),
valueArray2.getInt(0));
+ resultMap2.put(keyArray2.getString(1).toString(),
valueArray2.getInt(1));
+
+ assertThat(resultMap2).containsEntry("test_3", 3);
+ assertThat(resultMap2).containsEntry("test_4", 4);
+ }
+
+ @Test
+ public void testArrayWithNullElements() {
+ RowType table =
+ RowType.builder()
+ .field("stringArray",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("intObjectArray",
DataTypes.ARRAY(DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<NullableArrayPojo> writer =
+ PojoToRowConverter.of(NullableArrayPojo.class, table, table);
+
+ NullableArrayPojo pojo = new NullableArrayPojo();
+ pojo.stringArray = new String[] {"hello", null, "world"};
+ pojo.intObjectArray = new Integer[] {1, null, 3};
+
+ GenericRow row = writer.toRow(pojo);
+
+ InternalArray stringArray = row.getArray(0);
+ assertThat(stringArray.size()).isEqualTo(3);
+ assertThat(stringArray.getString(0).toString()).isEqualTo("hello");
+ assertThat(stringArray.isNullAt(1)).isTrue();
+ assertThat(stringArray.getString(2).toString()).isEqualTo("world");
+
+ InternalArray intArray = row.getArray(1);
+ assertThat(intArray.size()).isEqualTo(3);
+ assertThat(intArray.getInt(0)).isEqualTo(1);
+ assertThat(intArray.isNullAt(1)).isTrue();
+ assertThat(intArray.getInt(2)).isEqualTo(3);
+ }
+
+ @Test
+ public void testNullArrayField() {
+ RowType table =
+ RowType.builder().field("intArray",
DataTypes.ARRAY(DataTypes.INT())).build();
+
+ PojoToRowConverter<SimpleArrayPojo> writer =
+ PojoToRowConverter.of(SimpleArrayPojo.class, table, table);
+
+ SimpleArrayPojo pojo = new SimpleArrayPojo();
+ pojo.intArray = null;
+
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.isNullAt(0)).isTrue();
+ }
+
+ /** POJO for testing all array types. */
+ @SuppressWarnings("unchecked")
+ public static class ArrayPojo {
+ public boolean[] primitiveBooleanArray;
+ public byte[] primitiveByteArray;
+ public short[] primitiveShortArray;
+ public int[] primitiveIntArray;
+ public long[] primitiveLongArray;
+ public float[] primitiveFloatArray;
+ public double[] primitiveDoubleArray;
+ public Boolean[] booleanArray;
+ public Byte[] byteArray;
+ public Short[] shortArray;
+ public Integer[] intArray;
+ public Long[] longArray;
+ public Float[] floatArray;
+ public Double[] doubleArray;
+ public String[] stringArray;
+ public BigDecimal[] decimalArray;
+ public LocalDate[] dateArray;
+ public LocalTime[] timeArray;
+ public LocalDateTime[] timestampArray;
+ public Instant[] timestampLtzArray;
+ public Integer[][] nestedIntArray;
+ public Map<String, Integer>[] mapArray;
+
+ public ArrayPojo() {}
+
+ public static ArrayPojo sample() {
+ ArrayPojo pojo = new ArrayPojo();
+ pojo.primitiveBooleanArray = new boolean[] {true, false};
+ pojo.primitiveByteArray = new byte[] {1, 2};
+ pojo.primitiveShortArray = new short[] {100, 200};
+ pojo.primitiveIntArray = new int[] {1000, 2000};
+ pojo.primitiveLongArray = new long[] {10000L, 20000L};
+ pojo.primitiveFloatArray = new float[] {1.1f, 2.2f};
+ pojo.primitiveDoubleArray = new double[] {1.11, 2.22};
+ pojo.booleanArray = new Boolean[] {true, false};
+ pojo.byteArray = new Byte[] {1, 2};
+ pojo.shortArray = new Short[] {100, 200};
+ pojo.intArray = new Integer[] {1000, 2000};
+ pojo.longArray = new Long[] {10000L, 20000L};
+ pojo.floatArray = new Float[] {1.1f, 2.2f};
+ pojo.doubleArray = new Double[] {1.11, 2.22};
+ pojo.stringArray = new String[] {"hello", "world"};
+ pojo.decimalArray =
+ new BigDecimal[] {new BigDecimal("123.45"), new
BigDecimal("678.90")};
+ pojo.dateArray = new LocalDate[] {LocalDate.of(2025, 1, 1),
LocalDate.of(2025, 12, 31)};
+ pojo.timeArray = new LocalTime[] {LocalTime.MIDNIGHT,
LocalTime.of(12, 30, 0)};
+ pojo.timestampArray =
+ new LocalDateTime[] {
+ LocalDateTime.of(2025, 7, 23, 15, 0, 0),
+ LocalDateTime.of(2025, 12, 31, 23, 59, 59)
+ };
+ pojo.timestampLtzArray =
+ new Instant[] {
+ Instant.parse("2025-01-01T00:00:00Z"),
Instant.parse("2025-12-31T23:59:59Z")
+ };
+ pojo.nestedIntArray =
+ new Integer[][] {
+ {1, 2},
+ {3, 4, 5}
+ };
+
+ pojo.mapArray =
+ new HashMap[] {
+ new HashMap<Object, Object>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ },
+ new HashMap<Object, Object>() {
+ {
+ put("test_3", 3);
+ put("test_4", 4);
+ }
+ }
+ };
+ return pojo;
+ }
+ }
+
+ @Test
+ public void testCollectionFields() {
+ // PojoArrayToFlussArray must accept java.util.Collection (List,
ArrayList, etc.)
+ RowType table =
+ RowType.builder()
+ .field("listOfStrings",
DataTypes.ARRAY(DataTypes.STRING()))
+ .field("listOfInts", DataTypes.ARRAY(DataTypes.INT()))
+ .field("listWithNull",
DataTypes.ARRAY(DataTypes.STRING()))
+ .build();
+
+ PojoToRowConverter<CollectionArrayPojo> writer =
+ PojoToRowConverter.of(CollectionArrayPojo.class, table, table);
+
+ CollectionArrayPojo pojo = new CollectionArrayPojo();
+ pojo.listOfStrings = Arrays.asList("alpha", "beta", "gamma");
+ pojo.listOfInts = new ArrayList<>(Arrays.asList(10, 20, 30));
+ pojo.listWithNull = Arrays.asList("a", null, "b");
+
+ GenericRow row = writer.toRow(pojo);
+
+ InternalArray strArray = row.getArray(0);
+ assertThat(strArray.size()).isEqualTo(3);
+ assertThat(strArray.getString(0).toString()).isEqualTo("alpha");
+ assertThat(strArray.getString(1).toString()).isEqualTo("beta");
+ assertThat(strArray.getString(2).toString()).isEqualTo("gamma");
+
+ InternalArray intArray = row.getArray(1);
+ assertThat(intArray.size()).isEqualTo(3);
+ assertThat(intArray.getInt(0)).isEqualTo(10);
+ assertThat(intArray.getInt(1)).isEqualTo(20);
+ assertThat(intArray.getInt(2)).isEqualTo(30);
+
+ InternalArray nullableArray = row.getArray(2);
+ assertThat(nullableArray.size()).isEqualTo(3);
+ assertThat(nullableArray.getString(0).toString()).isEqualTo("a");
+ assertThat(nullableArray.isNullAt(1)).isTrue();
+ assertThat(nullableArray.getString(2).toString()).isEqualTo("b");
+ }
+
+ /** POJO with Collection fields for array conversion tests. */
+ public static class CollectionArrayPojo {
+ public List<String> listOfStrings;
+ public ArrayList<Integer> listOfInts;
+ public List<String> listWithNull;
+
+ public CollectionArrayPojo() {}
+ }
+
+ /** POJO for testing arrays with null elements. */
+ public static class NullableArrayPojo {
+ public String[] stringArray;
+ public Integer[] intObjectArray;
+
+ public NullableArrayPojo() {}
+ }
+
+ /** Simple POJO for testing null array field. */
+ public static class SimpleArrayPojo {
+ public Integer[] intArray;
+
+ public SimpleArrayPojo() {}
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
index 4703c811c..d0cda979d 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java
@@ -18,6 +18,8 @@
package org.apache.fluss.client.converter;
import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypes;
@@ -27,6 +29,8 @@ import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -157,23 +161,6 @@ public class PojoToRowConverterTest {
.hasMessageContaining("timestampLtzField");
}
- @Test
- public void testUnsupportedSchemaFieldTypeThrows() {
- RowType table =
- RowType.builder()
- .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
- .build();
- RowType proj = table;
- assertThatThrownBy(
- () ->
- PojoToRowConverter.of(
- ConvertersTestFixtures.MapPojo.class,
table, proj))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Unsupported field type")
- .hasMessageContaining("MAP")
- .hasMessageContaining("mapField");
- }
-
@Test
public void testTimestampPrecision3() {
// Test with precision 3 milliseconds
@@ -296,6 +283,52 @@ public class PojoToRowConverterTest {
.isEqualTo(expectedInstant);
}
+ @Test
+ public void testMapType() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<MapPojo> writer =
PojoToRowConverter.of(MapPojo.class, table, table);
+
+ MapPojo pojo = MapPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+
+ // Verify map field
+ InternalMap mapField = row.getMap(0);
+ assertThat(mapField.size()).isEqualTo(2);
+
+ InternalArray keyArray = mapField.keyArray();
+ InternalArray valueArray = mapField.valueArray();
+
+ Map<String, Integer> resultMap = new HashMap<>();
+ resultMap.put(keyArray.getString(0).toString(), valueArray.getInt(0));
+ resultMap.put(keyArray.getString(1).toString(), valueArray.getInt(1));
+
+ assertThat(resultMap).containsEntry("test_1", 1);
+ assertThat(resultMap).containsEntry("test_2", 2);
+ }
+
+ /** POJO for testing map type. */
+ public static class MapPojo {
+ public Map<String, Integer> mapField;
+
+ public MapPojo() {}
+
+ public static MapPojo sample() {
+ MapPojo pojo = new MapPojo();
+ pojo.mapField =
+ new HashMap<String, Integer>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ };
+ return pojo;
+ }
+ }
+
private LocalDateTime truncateLocalDateTime(LocalDateTime ldt, int
precision) {
if (precision >= 9) {
return ldt;
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
index 52d69eb2a..917d6de09 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java
@@ -23,6 +23,9 @@ import org.apache.fluss.types.RowType;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -41,7 +44,7 @@ public class RowToPojoConverterTest {
ConvertersTestFixtures.TestPojo pojo =
ConvertersTestFixtures.TestPojo.sample();
GenericRow row = writer.toRow(pojo);
- assertThat(row.getFieldCount()).isEqualTo(15);
+ assertThat(row.getFieldCount()).isEqualTo(17);
ConvertersTestFixtures.TestPojo back = scanner.fromRow(row);
assertThat(back).isEqualTo(pojo);
@@ -126,4 +129,106 @@ public class RowToPojoConverterTest {
ConvertersTestFixtures.CharacterFieldPojo back = scanner.fromRow(row);
assertThat(back.charField).isEqualTo('A');
}
+
+ @Test
+ public void testMapType() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<MapPojo> writer =
PojoToRowConverter.of(MapPojo.class, table, table);
+ RowToPojoConverter<MapPojo> reader =
RowToPojoConverter.of(MapPojo.class, table, table);
+
+ MapPojo pojo = MapPojo.sample();
+ GenericRow row = writer.toRow(pojo);
+ MapPojo back = reader.fromRow(row);
+
+ // Verify map field
+ Map<Object, Object> mapField = back.mapField;
+ assertThat(mapField.size()).isEqualTo(2);
+ assertThat(mapField).containsEntry("test_1", 1);
+ assertThat(mapField).containsEntry("test_2", 2);
+ }
+
+ @Test
+ public void testNullMapField() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<MapPojo> writer =
PojoToRowConverter.of(MapPojo.class, table, table);
+ RowToPojoConverter<MapPojo> reader =
RowToPojoConverter.of(MapPojo.class, table, table);
+
+ MapPojo pojo = new MapPojo();
+ pojo.mapField = null;
+
+ GenericRow row = writer.toRow(pojo);
+ assertThat(row.isNullAt(0)).isTrue();
+
+ MapPojo back = reader.fromRow(row);
+ assertThat(back.mapField).isNull();
+ }
+
+ @Test
+ public void testMapWithNullValues() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<MapPojo> writer =
PojoToRowConverter.of(MapPojo.class, table, table);
+ RowToPojoConverter<MapPojo> reader =
RowToPojoConverter.of(MapPojo.class, table, table);
+
+ MapPojo pojo = new MapPojo();
+ pojo.mapField = new HashMap<>();
+ pojo.mapField.put("a", 1);
+ pojo.mapField.put("b", null);
+
+ GenericRow row = writer.toRow(pojo);
+ MapPojo back = reader.fromRow(row);
+
+ assertThat(back.mapField).containsEntry("a", 1);
+ assertThat(back.mapField).containsKey("b");
+ assertThat(back.mapField.get("b")).isNull();
+ }
+
+ @Test
+ public void testEmptyMap() {
+ RowType table =
+ RowType.builder()
+ .field("mapField", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ PojoToRowConverter<MapPojo> writer =
PojoToRowConverter.of(MapPojo.class, table, table);
+ RowToPojoConverter<MapPojo> reader =
RowToPojoConverter.of(MapPojo.class, table, table);
+
+ MapPojo pojo = new MapPojo();
+ pojo.mapField = new HashMap<>();
+
+ GenericRow row = writer.toRow(pojo);
+ MapPojo back = reader.fromRow(row);
+
+ assertThat(back.mapField).isEmpty();
+ }
+
+ /** POJO for testing map type. */
+ public static class MapPojo {
+ public Map<Object, Object> mapField;
+
+ public MapPojo() {}
+
+ public static MapPojo sample() {
+ MapPojo pojo = new MapPojo();
+ pojo.mapField =
+ new HashMap<Object, Object>() {
+ {
+ put("test_1", 1);
+ put("test_2", 2);
+ }
+ };
+ return pojo;
+ }
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
index c7724799b..e6b963ead 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
@@ -48,7 +48,9 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.assertj.core.api.Assertions.assertThat;
@@ -501,6 +503,341 @@ public class FlussTypedClientITCase extends
ClientToServerITCaseBase {
}
}
+ //
-------------------------------------------------------------------------
+ // Complex-type (ARRAY / MAP) POJO and helpers
+ //
-------------------------------------------------------------------------
+
+ /**
+ * POJO covering the complex-type fields that are new in this PR.
+ *
+ * <ul>
+ * <li>ARRAY<INT> — Integer[]
+ * <li>ARRAY<STRING> — String[]
+ * <li>ARRAY<ARRAY<INT>> — Integer[][]
+ * <li>MAP<STRING, INT> — Map<String, Integer>
+ * <li>MAP<STRING, ARRAY<INT>> — Map<String, Object[]>
(generic type is
+ * erased at runtime; inner arrays always come back as Object[])
+ * </ul>
+ */
+ public static class ComplexTypesPojo {
+ public Integer id;
+ public Integer[] intArray;
+ public String[] strArray;
+ public Integer[][] nestedArray;
+ public Map<String, Integer> simpleMap;
+ // Map values that are arrays are always deserialized as Object[]
(type erasure)
+ public Map<String, Object[]> mapOfArrays;
+
+ public ComplexTypesPojo() {}
+
+ /** Constructor that sets only the id; all array/map fields default to
null. */
+ public ComplexTypesPojo(Integer id) {
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ComplexTypesPojo that = (ComplexTypesPojo) o;
+ return Objects.equals(id, that.id)
+ && Arrays.equals(intArray, that.intArray)
+ && Arrays.equals(strArray, that.strArray)
+ && Arrays.deepEquals(nestedArray, that.nestedArray)
+ && Objects.equals(simpleMap, that.simpleMap)
+ && mapsOfArraysEqual(mapOfArrays, that.mapOfArrays);
+ }
+
+ private static boolean mapsOfArraysEqual(Map<String, Object[]> a,
Map<String, Object[]> b) {
+ if (a == b) {
+ return true;
+ }
+ if (a == null || b == null || a.size() != b.size()) {
+ return false;
+ }
+ for (Map.Entry<String, Object[]> e : a.entrySet()) {
+ if (!Arrays.equals(e.getValue(), b.get(e.getKey()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(id, simpleMap);
+ result = 31 * result + Arrays.hashCode(intArray);
+ result = 31 * result + Arrays.hashCode(strArray);
+ result = 31 * result + Arrays.deepHashCode(nestedArray);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ComplexTypesPojo{"
+ + "id="
+ + id
+ + ", intArray="
+ + Arrays.toString(intArray)
+ + ", strArray="
+ + Arrays.toString(strArray)
+ + ", nestedArray="
+ + Arrays.deepToString(nestedArray)
+ + ", simpleMap="
+ + simpleMap
+ + ", mapOfArrays="
+ + mapOfArrays
+ + '}';
+ }
+ }
+
+ /** Primary-key lookup POJO for {@link ComplexTypesPojo}. */
+ public static class ComplexTypesLookupKey {
+ public Integer id;
+
+ public ComplexTypesLookupKey() {}
+
+ public ComplexTypesLookupKey(Integer id) {
+ this.id = id;
+ }
+ }
+
+ private static Schema complexTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("intArray", DataTypes.ARRAY(DataTypes.INT()))
+ .column("strArray", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("nestedArray",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+ .column("simpleMap", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .column(
+ "mapOfArrays",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.INT())))
+ .build();
+ }
+
+ private static Schema complexTypesPkSchema() {
+ return Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("intArray", DataTypes.ARRAY(DataTypes.INT()))
+ .column("strArray", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("nestedArray",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+ .column("simpleMap", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .column(
+ "mapOfArrays",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+ }
+
+ private static ComplexTypesPojo newComplexTypesPojo(int i) {
+ ComplexTypesPojo p = new ComplexTypesPojo();
+ p.id = i;
+ p.intArray = new Integer[] {i, i + 1, i + 2};
+ p.strArray = new String[] {"s" + i, "s" + (i + 1)};
+ p.nestedArray = new Integer[][] {{i, i + 1}, {i + 2, i + 3}};
+ p.simpleMap = new HashMap<>();
+ p.simpleMap.put("k" + i, i * 10);
+ p.simpleMap.put("k" + (i + 1), (i + 1) * 10);
+ p.mapOfArrays = new HashMap<>();
+ p.mapOfArrays.put("arr" + i, new Object[] {i * 100, i * 100 + 1});
+ p.mapOfArrays.put("arr" + (i + 1), new Object[] {(i + 1) * 100, (i +
1) * 100 + 1});
+ return p;
+ }
+
+ @Test
+ void testComplexTypesAppendWriteAndScan() throws Exception {
+ TablePath path = TablePath.of("pojo_db", "complex_types_log");
+ TableDescriptor td =
+
TableDescriptor.builder().schema(complexTypesLogSchema()).distributedBy(1).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedAppendWriter<ComplexTypesPojo> writer =
+
table.newAppend().createTypedWriter(ComplexTypesPojo.class);
+
+ List<ComplexTypesPojo> expected = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ ComplexTypesPojo p = newComplexTypesPojo(i);
+ expected.add(p);
+ writer.append(p);
+ }
+
+ // also write a row with null array / map fields to verify null
propagation
+ ComplexTypesPojo nullFieldPojo = new ComplexTypesPojo(99);
+ expected.add(nullFieldPojo);
+ writer.append(nullFieldPojo);
+
+ writer.flush();
+
+ TypedLogScanner<ComplexTypesPojo> scanner =
+
table.newScan().createTypedLogScanner(ComplexTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ComplexTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<ComplexTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<ComplexTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+
+ // verify all elements (custom equals handles deep array
comparison)
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+ // spot-check the null-field row
+ ComplexTypesPojo nullBack =
+ actual.stream().filter(p -> p.id ==
99).findFirst().orElse(null);
+ assertThat(nullBack).isNotNull();
+ assertThat(nullBack.intArray).isNull();
+ assertThat(nullBack.strArray).isNull();
+ assertThat(nullBack.nestedArray).isNull();
+ assertThat(nullBack.simpleMap).isNull();
+ assertThat(nullBack.mapOfArrays).isNull();
+ }
+ }
+
+ @Test
+ void testComplexTypesUpsertAndLookup() throws Exception {
+ TablePath path = TablePath.of("pojo_db", "complex_types_pk");
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(complexTypesPkSchema())
+ .distributedBy(1, "id")
+ .build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedUpsertWriter<ComplexTypesPojo> writer =
+
table.newUpsert().createTypedWriter(ComplexTypesPojo.class);
+
+ ComplexTypesPojo original = newComplexTypesPojo(10);
+ writer.upsert(original).get();
+
+ // overwrite with updated arrays/maps
+ ComplexTypesPojo updated = new ComplexTypesPojo();
+ updated.id = 10;
+ updated.intArray = new Integer[] {100, 200, 300};
+ updated.strArray = new String[] {"updated"};
+ updated.nestedArray = new Integer[][] {{9, 8}, {7}};
+ updated.simpleMap = new HashMap<>();
+ updated.simpleMap.put("new_key", 999);
+ updated.mapOfArrays = new HashMap<>();
+ updated.mapOfArrays.put("new_arr", new Object[] {-1, -2});
+ writer.upsert(updated).get();
+ writer.flush();
+
+ // verify via typed lookup
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<ComplexTypesPojo> rowConv =
+ RowToPojoConverter.of(ComplexTypesPojo.class, tableSchema,
tableSchema);
+
+ TypedLookuper<ComplexTypesLookupKey> lookuper =
+
table.newLookup().createTypedLookuper(ComplexTypesLookupKey.class);
+ ComplexTypesPojo lookedUp =
+ rowConv.fromRow(
+ lookuper.lookup(new
ComplexTypesLookupKey(10)).get().getSingletonRow());
+
+ assertThat(lookedUp.id).isEqualTo(10);
+ assertThat(lookedUp.intArray).isEqualTo(new Integer[] {100, 200,
300});
+ assertThat(lookedUp.strArray).isEqualTo(new String[] {"updated"});
+ assertThat(Arrays.deepEquals(lookedUp.nestedArray, new Integer[][]
{{9, 8}, {7}}))
+ .isTrue();
+ assertThat(lookedUp.simpleMap).containsEntry("new_key", 999);
+ assertThat(lookedUp.mapOfArrays).containsKey("new_arr");
+ // inner arrays are deserialized as Object[] due to type erasure
+ assertThat(lookedUp.mapOfArrays.get("new_arr")).isEqualTo(new
Object[] {-1, -2});
+
+ // verify non-existent key returns null row
+ assertThat(lookuper.lookup(new
ComplexTypesLookupKey(999)).get().getSingletonRow())
+ .isNull();
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // List-field POJO — verifies that java.util.List fields work on both paths
+ //
-------------------------------------------------------------------------
+
+ /**
+ * POJO where ARRAY columns are mapped to {@link List} fields instead of
Java arrays. Both the
+ * write path ({@link PojoArrayToFlussArray}) and the read path ({@link
FlussArrayToPojoArray})
+ * support {@link java.util.Collection} types.
+ */
+ public static class ListTypesPojo {
+ public Integer id;
+ public List<Integer> intList;
+ public List<String> strList;
+ public List<Integer> nullableIntList;
+
+ public ListTypesPojo() {}
+ }
+
+ @Test
+ void testListFieldsRoundTrip() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("intList", DataTypes.ARRAY(DataTypes.INT()))
+ .column("strList", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("nullableIntList",
DataTypes.ARRAY(DataTypes.INT()))
+ .build();
+
+ TablePath path = TablePath.of("pojo_db", "list_fields_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(1).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedAppendWriter<ListTypesPojo> writer =
+ table.newAppend().createTypedWriter(ListTypesPojo.class);
+
+ ListTypesPojo p1 = new ListTypesPojo();
+ p1.id = 1;
+ p1.intList = new ArrayList<>(Arrays.asList(10, 20, 30));
+ p1.strList = Arrays.asList("alpha", "beta");
+ p1.nullableIntList = Arrays.asList(1, null, 3);
+
+ ListTypesPojo p2 = new ListTypesPojo();
+ p2.id = 2;
+ p2.intList = new ArrayList<>(); // empty list
+ p2.strList = Arrays.asList("only");
+ p2.nullableIntList = null; // null list field
+
+ writer.append(p1);
+ writer.append(p2);
+ writer.flush();
+
+ TypedLogScanner<ListTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(ListTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ Map<Integer, ListTypesPojo> actual = new HashMap<>();
+ while (actual.size() < 2) {
+ TypedScanRecords<ListTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<ListTypesPojo> r : recs) {
+ actual.put(r.getValue().id, r.getValue());
+ }
+ }
+
+ // verify row 1
+ ListTypesPojo back1 = actual.get(1);
+ assertThat(back1.intList).isInstanceOf(List.class);
+ assertThat(back1.intList).containsExactly(10, 20, 30);
+ assertThat(back1.strList).containsExactly("alpha", "beta");
+ assertThat(back1.nullableIntList).containsExactly(1, null, 3);
+
+ // verify row 2
+ ListTypesPojo back2 = actual.get(2);
+ assertThat(back2.intList).isInstanceOf(List.class);
+ assertThat(back2.intList).isEmpty();
+ assertThat(back2.strList).containsExactly("only");
+ assertThat(back2.nullableIntList).isNull();
+ }
+ }
+
@Test
void testTypedPartialUpdates() throws Exception {
// Use full PK schema and update a subset of fields