This is an automated email from the ASF dual-hosted git repository.

ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new da3b970  [FLINK-39106] Support `ROW` and `ARRAY<ROW>` data types in 
DynamoDB API / SQL sink
da3b970 is described below

commit da3b9703e387642cbb8a5860348170cad3fd9b84
Author: ddebowczyk92 <[email protected]>
AuthorDate: Wed May 27 18:51:09 2026 +0200

    [FLINK-39106] Support `ROW` and `ARRAY<ROW>` data types in DynamoDB API / 
SQL sink
---
 .../table/RowDataToAttributeValueConverter.java    | 103 +++++++++++++--
 .../table/DynamoDbDynamicSinkFactoryTest.java      |   7 +-
 .../RowDataToAttributeValueConverterTest.java      | 141 ++++++++++++++++++++-
 .../src/test/resources/create-table.sql            |   3 +
 4 files changed, 238 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
index dbfa66b..08d6234 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
@@ -19,14 +19,20 @@
 package org.apache.flink.connector.dynamodb.table;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverter;
 import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
 
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
 import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
 import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
 import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
 import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
@@ -34,6 +40,8 @@ import 
software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -84,18 +92,63 @@ public class RowDataToAttributeValueConverter {
             DataTypes.Field field,
             RowData.FieldGetter fieldGetter) {
 
+        EnhancedType<Object> enhancedType = 
getEnhancedType(field.getDataType());
         return builder.addAttribute(
-                getEnhancedType(field.getDataType()),
-                a ->
-                        a.name(field.getName())
-                                .getter(
-                                        rowData ->
-                                                
DataStructureConverters.getConverter(
-                                                                
field.getDataType())
-                                                        .toExternalOrNull(
-                                                                
fieldGetter.getFieldOrNull(
-                                                                        
rowData)))
-                                .setter(((rowData, t) -> {})));
+                enhancedType,
+                a -> {
+                    a.name(field.getName())
+                            .getter(
+                                    rowData ->
+                                            
DataStructureConverters.getConverter(
+                                                            
field.getDataType())
+                                                    .toExternalOrNull(
+                                                            
fieldGetter.getFieldOrNull(rowData)))
+                            .setter(((rowData, t) -> {}));
+                    buildRowAttributeConverter(field.getDataType())
+                            .ifPresent(a::attributeConverter);
+                });
+    }
+
+    private Optional<AttributeConverter> buildRowAttributeConverter(DataType 
dataType) {
+        if (LogicalTypeRoot.ROW == dataType.getLogicalType().getTypeRoot()) {
+            return 
Optional.of(createRowDocumentConverter(buildRowTableSchema(dataType)));
+        }
+        if (dataType instanceof CollectionDataType) {
+            DataType elementDataType = ((CollectionDataType) 
dataType).getElementDataType();
+            if (LogicalTypeRoot.ROW == 
elementDataType.getLogicalType().getTypeRoot()) {
+                AttributeConverter<Row> elementConverter =
+                        
createRowDocumentConverter(buildRowTableSchema(elementDataType));
+                return Optional.of(
+                        new ArrayAttributeConverter<>(
+                                elementConverter, 
EnhancedType.of(Row[].class)));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static AttributeConverter<Row> createRowDocumentConverter(
+            TableSchema<Row> tableSchema) {
+        return new AttributeConverter<Row>() {
+            @Override
+            public AttributeValue transformFrom(Row input) {
+                return AttributeValue.builder().m(tableSchema.itemToMap(input, 
false)).build();
+            }
+
+            @Override
+            public Row transformTo(AttributeValue input) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public EnhancedType<Row> type() {
+                return EnhancedType.of(Row.class);
+            }
+
+            @Override
+            public AttributeValueType attributeValueType() {
+                return AttributeValueType.M;
+            }
+        };
     }
 
     private <T> EnhancedType<T> getEnhancedType(DataType dataType) {
@@ -104,8 +157,36 @@ public class RowDataToAttributeValueConverter {
                     EnhancedType.mapOf(
                             getEnhancedType(((KeyValueDataType) 
dataType).getKeyDataType()),
                             getEnhancedType(((KeyValueDataType) 
dataType).getValueDataType()));
+        } else if (LogicalTypeRoot.ROW == 
dataType.getLogicalType().getTypeRoot()) {
+            return (EnhancedType<T>) EnhancedType.of(Row.class);
         } else {
             return (EnhancedType<T>) 
EnhancedType.of(dataType.getConversionClass());
         }
     }
+
+    private TableSchema<Row> buildRowTableSchema(DataType dataType) {
+        StaticTableSchema.Builder<Row> builder = 
TableSchema.builder(Row.class);
+        AttributeConverterProvider newAttributeConverterProvider =
+                new ArrayAttributeConverterProvider();
+        builder.attributeConverterProviders(
+                newAttributeConverterProvider, 
AttributeConverterProvider.defaultProvider());
+
+        final List<DataTypes.Field> fields = DataType.getFields(dataType);
+        IntStream.range(0, fields.size())
+                .forEach(
+                        idx -> {
+                            final DataTypes.Field field = fields.get(idx);
+                            final DataType fieldDataType = field.getDataType();
+                            builder.addAttribute(
+                                    getEnhancedType(fieldDataType),
+                                    a -> {
+                                        a.name(field.getName())
+                                                .getter(row -> 
row.getField(idx))
+                                                .setter((row, t) -> {});
+                                        
buildRowAttributeConverter(fieldDataType)
+                                                
.ifPresent(a::attributeConverter);
+                                    });
+                        });
+        return builder.build();
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index 862ab42..f76205e 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -336,7 +336,12 @@ public class DynamoDbDynamicSinkFactoryTest {
                 Column.physical("some_timestamp_array", 
DataTypes.ARRAY(DataTypes.TIMESTAMP())),
                 Column.physical(
                         "some_timestamp_ltz_array", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ())),
-                Column.physical("some_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.BIGINT())));
+                Column.physical("some_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.BIGINT())),
+                Column.physical(
+                        "some_row",
+                        DataTypes.ROW(
+                                DataTypes.FIELD("inner_string", 
DataTypes.STRING()),
+                                DataTypes.FIELD("inner_int", 
DataTypes.INT()))));
     }
 
     private ResolvedSchema defaultSinkSchema() {
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
index daba6aa..974a19f 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -69,7 +70,7 @@ public class RowDataToAttributeValueConverterTest {
         RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
                 new RowDataToAttributeValueConverter(dataType, true);
         Map<String, AttributeValue> actualResult =
-                
rowDataToAttributeValueConverter.convertRowData(createElement(null));
+                
rowDataToAttributeValueConverter.convertRowData(createElement((Object) null));
 
         assertThat(actualResult.isEmpty()).isEqualTo(true);
     }
@@ -254,6 +255,93 @@ public class RowDataToAttributeValueConverterTest {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
+    @Test
+    void testRowDataType() {
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(
+                                "innerRow",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT()))));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType);
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(
+                        
createElement(createElement(StringData.fromString("some string"), 123)));
+
+        Map<String, AttributeValue> innerMap =
+                Map.of(
+                        "f1", AttributeValue.builder().s("some 
string").build(),
+                        "f2", AttributeValue.builder().n("123").build());
+        Map<String, AttributeValue> expectedResult =
+                singletonMap("innerRow", 
AttributeValue.builder().m(innerMap).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+    }
+
+    @Test
+    void testRowDataTypeNullInnerField() {
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(
+                                "innerRow",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT()))));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType);
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(
+                        
createElement(createElement(StringData.fromString("value"), null)));
+
+        Map<String, AttributeValue> innerMap =
+                Map.of(
+                        "f1", AttributeValue.builder().s("value").build(),
+                        "f2", AttributeValue.builder().nul(true).build());
+        Map<String, AttributeValue> expectedResult =
+                singletonMap("innerRow", 
AttributeValue.builder().m(innerMap).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+    }
+
+    @Test
+    void testNestedRowDataType() {
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(
+                                "outer",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD(
+                                                "middle",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "leaf", 
DataTypes.STRING()))),
+                                        DataTypes.FIELD("sibling", 
DataTypes.INT()))));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType);
+
+        GenericRowData leafRow = new GenericRowData(1);
+        leafRow.setField(0, StringData.fromString("deep"));
+        GenericRowData outerRow = new GenericRowData(2);
+        outerRow.setField(0, leafRow);
+        outerRow.setField(1, 42);
+
+        Map<String, AttributeValue> actualResult =
+                
rowDataToAttributeValueConverter.convertRowData(createElement(outerRow));
+
+        Map<String, AttributeValue> leafMap =
+                Map.of("leaf", AttributeValue.builder().s("deep").build());
+        Map<String, AttributeValue> outerMap =
+                Map.of(
+                        "middle", AttributeValue.builder().m(leafMap).build(),
+                        "sibling", AttributeValue.builder().n("42").build());
+        Map<String, AttributeValue> expectedResult =
+                singletonMap("outer", 
AttributeValue.builder().m(outerMap).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+    }
+
     @Test
     void testStringArray() {
         String key = "key";
@@ -560,9 +648,54 @@ public class RowDataToAttributeValueConverterTest {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
-    private RowData createElement(Object value) {
-        GenericRowData element = new GenericRowData(1);
-        element.setField(0, value);
+    @Test
+    void testRowDataTypeArray() {
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(
+                                "rows",
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("value", 
DataTypes.INT())))));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType);
+
+        GenericRowData row1 = new GenericRowData(2);
+        row1.setField(0, StringData.fromString("first"));
+        row1.setField(1, 1);
+        GenericRowData row2 = new GenericRowData(2);
+        row2.setField(0, StringData.fromString("second"));
+        row2.setField(1, 2);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(
+                        createElement(new GenericArrayData(new RowData[] 
{row1, row2})));
+
+        Map<String, AttributeValue> row1Map =
+                Map.of(
+                        "name", AttributeValue.builder().s("first").build(),
+                        "value", AttributeValue.builder().n("1").build());
+        Map<String, AttributeValue> row2Map =
+                Map.of(
+                        "name", AttributeValue.builder().s("second").build(),
+                        "value", AttributeValue.builder().n("2").build());
+        Map<String, AttributeValue> expectedResult =
+                singletonMap(
+                        "rows",
+                        AttributeValue.builder()
+                                .l(
+                                        
AttributeValue.builder().m(row1Map).build(),
+                                        
AttributeValue.builder().m(row2Map).build())
+                                .build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+    }
+
+    private RowData createElement(Object... values) {
+        final int valuesLength = values.length;
+        GenericRowData element = new GenericRowData(valuesLength);
+        IntStream.range(0, valuesLength).forEach(idx -> element.setField(idx, 
values[idx]));
         return element;
     }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
index 3c47cab..9655b24 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
@@ -15,6 +15,8 @@ CREATE TABLE dynamo_db_table (
     `some_time` TIME,
     `some_timestamp` TIMESTAMP(3),
     `some_timestamp_ltz` TIMESTAMP_LTZ(5),
+    `some_row` ROW<myField INT, myOtherField String>,
+    `some_row_in_row` ROW<myField INT, myInnerRow ROW<innerRowField INT, 
innerRowOtherField String>>,
     `some_char_array` ARRAY<CHAR>,
     `some_varchar_array` ARRAY<VARCHAR>,
     `some_string_array` ARRAY<STRING>,
@@ -29,6 +31,7 @@ CREATE TABLE dynamo_db_table (
     `some_time_array` ARRAY<TIME>,
     `some_timestamp_array` ARRAY<TIMESTAMP(3)>,
     `some_timestamp_ltz_array` ARRAY<TIMESTAMP_LTZ(5)>,
+    `some_rows_array` ARRAY<ROW<myField INT, myOtherField STRING>>,
     `some_string_map` MAP<STRING,STRING>,
     `some_boolean_map` MAP<STRING,BOOLEAN>
 ) PARTITIONED BY ( partition_key )

Reply via email to