This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 36479a9  [FLINK-35022][Connectors/DynamoDB] Add Default DynamoDB 
Element Converter
36479a9 is described below

commit 36479a98e11c154bd1537e11e8cc51e64a0ebcb8
Author: Ahmed Hamdy <ahmed.ha...@ververica.com>
AuthorDate: Mon Apr 8 09:32:13 2024 +0100

    [FLINK-35022][Connectors/DynamoDB] Add Default DynamoDB Element Converter
---
 .../content/docs/connectors/datastream/dynamodb.md |   5 +-
 .../sink/DefaultDynamoDbElementConverter.java      |  56 +++
 .../dynamodb/sink/DynamoDbSinkBuilder.java         |   4 +-
 .../sink/DynamoDbTypeInformedElementConverter.java | 380 +++++++++++++++++++
 .../sink/DefaultDynamoDbElementConverterTest.java  |  52 +++
 .../connector/dynamodb/sink/DynamoDbSinkTest.java  |  24 +-
 .../DynamoDbTypeInformedElementConverterTest.java  | 413 +++++++++++++++++++++
 .../connector/dynamodb/util/ComplexPayload.java    | 121 ++++++
 8 files changed, 1039 insertions(+), 16 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/dynamodb.md 
b/docs/content/docs/connectors/datastream/dynamodb.md
index d0f6d7c..834ac66 100644
--- a/docs/content/docs/connectors/datastream/dynamodb.md
+++ b/docs/content/docs/connectors/datastream/dynamodb.md
@@ -133,7 +133,10 @@ Flink's DynamoDB sink is created by using the static 
builder `DynamoDBSink.<Inpu
 ## Element Converter
 
 An element converter is used to convert from a record in the DataStream to a 
DynamoDbWriteRequest which the sink will write to the destination DynamoDB 
table. The DynamoDB sink allows the user to supply a custom element converter, 
or use the provided
-`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` 
objects. For more information on supported 
+`DefaultDynamoDbElementConverter` which extracts item schema from element 
class, this requires the element class to be of composite type (i.e. Pojo, 
Tuple or Row). In case TypeInformation of the elements is present the schema is 
eagerly constructed by using `DynamoDbTypeInformedElementConverter` as in `new 
DynamoDbTypeInformedElementConverter(TypeInformation.of(MyPojo.class))`.
+
+
+Alternatively when you are working with `@DynamoDbBean` objects you can use 
`DynamoDbBeanElementConverter`. For more information on supported 
 annotations see 
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema).
 
 A sample application using a custom `ElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java).
 A sample application using the `DynamoDbBeanElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dyna
 [...]
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java
new file mode 100644
index 0000000..7e0f911
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+/**
+ * Default implementation of {@link ElementConverter} that lazily falls back 
to {@link
+ * DynamoDbTypeInformedElementConverter}.
+ */
+@PublicEvolving
+public class DefaultDynamoDbElementConverter<T>
+        implements ElementConverter<T, DynamoDbWriteRequest> {
+
+    private ElementConverter<T, DynamoDbWriteRequest> elementConverter;
+
+    public DefaultDynamoDbElementConverter() {}
+
+    @Override
+    public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) {
+        if (elementConverter == null) {
+            TypeInformation<T> typeInfo = (TypeInformation<T>) 
TypeInformation.of(t.getClass());
+            if (!(typeInfo instanceof CompositeType<?>)) {
+                throw new IllegalArgumentException("The input type must be a 
CompositeType.");
+            }
+
+            elementConverter =
+                    new 
DynamoDbTypeInformedElementConverter<>((CompositeType<T>) typeInfo);
+        }
+
+        return elementConverter.apply(t, context);
+    }
+
+    @Override
+    public void open(Sink.InitContext context) {}
+}
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java
index b2592b6..968c206 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java
@@ -58,6 +58,7 @@ import java.util.Properties;
  * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
  *
  * <ul>
+ *   <li>{@code elementConverter} will be {@link 
DefaultDynamoDbElementConverter}
  *   <li>{@code maxBatchSize} will be 25
  *   <li>{@code maxInFlightRequests} will be 50
  *   <li>{@code maxBufferedRequests} will be 10000
@@ -145,7 +146,8 @@ public class DynamoDbSinkBuilder<InputT>
     @Override
     public DynamoDbSink<InputT> build() {
         return new DynamoDbSink<>(
-                elementConverter,
+                Optional.ofNullable(elementConverter)
+                        .orElse(new DefaultDynamoDbElementConverter<>()),
                 
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
                 Optional.ofNullable(getMaxInFlightRequests())
                         .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
new file mode 100644
index 0000000..4978f46
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<T>
+        implements ElementConverter<T, DynamoDbWriteRequest> {
+
+    private final CompositeType<T> typeInfo;
+    private final boolean ignoreNulls;
+    private final TableSchema<T> tableSchema;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<T> typeInfo) {
+        this(typeInfo, true);
+    }
+
+    public DynamoDbTypeInformedElementConverter(CompositeType<T> typeInfo, 
boolean ignoreNulls) {
+
+        try {
+            this.typeInfo = typeInfo;
+            this.ignoreNulls = ignoreNulls;
+            this.tableSchema = createTableSchema(typeInfo);
+        } catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+        }
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+        Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(tableSchema.itemToMap(input, ignoreNulls))
+                    .build();
+        } catch (ClassCastException | IllegalArgumentException e) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Failed to convert %s to DynamoDbWriteRequest 
using %s",
+                            input, typeInfo),
+                    e);
+        }
+    }
+
+    private <AttributeT> TableSchema<AttributeT> createTableSchema(
+            CompositeType<AttributeT> typeInfo) throws IntrospectionException {
+        if (typeInfo instanceof RowTypeInfo) {
+            return (TableSchema<AttributeT>) 
createTableSchemaFromRowType((RowTypeInfo) typeInfo);
+        } else if (typeInfo instanceof PojoTypeInfo<?>) {
+            return createTableSchemaFromPojo((PojoTypeInfo<AttributeT>) 
typeInfo);
+        } else if (typeInfo instanceof TupleTypeInfo<?>) {
+            return createTableSchemaFromTuple((TupleTypeInfo<?>) typeInfo);
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupported 
TypeInfo %s", typeInfo));
+        }
+    }
+
+    private TableSchema<Row> createTableSchemaFromRowType(RowTypeInfo 
typeInfo) {
+        StaticTableSchema.Builder<Row> tableSchemaBuilder =
+                StaticTableSchema.builder(typeInfo.getTypeClass());
+
+        String[] fieldNames = typeInfo.getFieldNames();
+        for (int i = 0; i < typeInfo.getArity(); i++) {
+            TypeInformation<?> fieldType = typeInfo.getTypeAt(i);
+            int finalI = i;
+            addAttribute(
+                    tableSchemaBuilder,
+                    fieldNames[finalI],
+                    (AttributeT) -> AttributeT.getField(finalI),
+                    (TypeInformation<? super Object>) fieldType);
+        }
+
+        return tableSchemaBuilder.build();
+    }
+
+    private <AttributeT> TableSchema<AttributeT> createTableSchemaFromTuple(
+            TupleTypeInfo<?> typeInfo) {
+        TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
+        String[] fieldNames = typeInfo.getFieldNames();
+
+        StaticTableSchema.Builder<?> tableSchemaBuilder =
+                StaticTableSchema.builder(typeInfo.getTypeClass());
+        for (int i = 0; i < fieldNames.length; i++) {
+            int finalI = i;
+            addAttribute(
+                    tableSchemaBuilder,
+                    fieldNames[finalI],
+                    (tuple) -> ((Tuple) tuple).getField(finalI),
+                    (TypeInformation<?>) fieldTypes[i]);
+        }
+
+        return (TableSchema<AttributeT>) tableSchemaBuilder.build();
+    }
+
+    private <AttributeT> TableSchema<AttributeT> createTableSchemaFromPojo(
+            PojoTypeInfo<AttributeT> typeInfo) throws IntrospectionException {
+        StaticTableSchema.Builder<AttributeT> tableSchemaBuilder =
+                StaticTableSchema.builder(typeInfo.getTypeClass());
+        BeanInfo beanInfo = Introspector.getBeanInfo(typeInfo.getTypeClass());
+        PropertyDescriptor[] propertyDescriptors = 
beanInfo.getPropertyDescriptors();
+        for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
+            Set<String> fieldNames = new 
HashSet<>(Arrays.asList(typeInfo.getFieldNames()));
+            if (!fieldNames.contains(propertyDescriptor.getName())) {
+                // Skip properties that are not part of the PojoTypeInfo
+                continue;
+            }
+
+            TypeInformation<?> fieldInfo = 
typeInfo.getTypeAt(propertyDescriptor.getName());
+            addAttribute(
+                    tableSchemaBuilder,
+                    propertyDescriptor.getName(),
+                    BeanAttributeGetter.create(
+                            typeInfo.getTypeClass(), 
propertyDescriptor.getReadMethod()),
+                    fieldInfo);
+        }
+
+        return tableSchemaBuilder.build();
+    }
+
+    private <T, AttributeT> void addAttribute(
+            StaticTableSchema.Builder<T> builder,
+            String fieldName,
+            Function<T, AttributeT> getter,
+            TypeInformation<AttributeT> typeInfo) {
+        builder.addAttribute(
+                typeInfo.getTypeClass(),
+                a ->
+                        a.name(fieldName)
+                                .getter(getter)
+                                .setter((e, o) -> {})
+                                
.attributeConverter(getAttributeConverter(typeInfo)));
+    }
+
+    private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter(
+            TypeInformation<AttributeT> typeInfo) {
+        if (typeInfo instanceof BasicTypeInfo) {
+            return AttributeConverterProvider.defaultProvider()
+                    .converterFor(EnhancedType.of(typeInfo.getTypeClass()));
+        } else if 
(typeInfo.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            return getAttributeConverter(
+                    AttributeValueType.B,
+                    bytes ->
+                            bytes instanceof SdkBytes
+                                    ? AttributeValue.fromB((SdkBytes) bytes)
+                                    : 
AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) bytes)));
+        } else if (typeInfo instanceof BasicArrayTypeInfo) {
+            BasicArrayTypeInfo<AttributeT, ?> basicArrayTypeInfo =
+                    (BasicArrayTypeInfo<AttributeT, ?>) typeInfo;
+            if 
(basicArrayTypeInfo.getComponentInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+                return getAttributeConverter(
+                        AttributeValueType.SS,
+                        array -> 
AttributeValue.fromSs(Arrays.asList((String[]) array)));
+            } else if (basicArrayTypeInfo.getComponentInfo() instanceof 
NumericTypeInfo) {
+                return getAttributeConverter(
+                        AttributeValueType.NS,
+                        array ->
+                                AttributeValue.fromNs(
+                                        
convertObjectArrayToStringList((Object[]) array)));
+            }
+
+            return new ArrayAttributeConverterProvider()
+                    .converterFor(EnhancedType.of(typeInfo.getTypeClass()));
+        } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+            return 
getObjectArrayTypeConverter((ObjectArrayTypeInfo<AttributeT, ?>) typeInfo);
+        } else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
+            PrimitiveArrayTypeInfo<AttributeT> primitiveArrayTypeInfo =
+                    (PrimitiveArrayTypeInfo<AttributeT>) typeInfo;
+            if (primitiveArrayTypeInfo.getComponentType() instanceof 
NumericTypeInfo) {
+                return getAttributeConverter(
+                        AttributeValueType.NS,
+                        array -> 
AttributeValue.fromNs(convertPrimitiveArrayToStringList(array)));
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unsupported primitive array typeInfo %s",
+                                primitiveArrayTypeInfo.getComponentType()));
+            }
+        } else if (typeInfo instanceof TupleTypeInfo<?>) {
+            return (AttributeConverter<AttributeT>)
+                    getTupleTypeConverter((TupleTypeInfo<?>) typeInfo);
+        } else if (typeInfo instanceof CompositeType) {
+            try {
+                TableSchema<AttributeT> schema =
+                        createTableSchema((CompositeType<AttributeT>) 
typeInfo);
+                return getAttributeConverter(
+                        AttributeValueType.M,
+                        o -> AttributeValue.fromM(schema.itemToMap(o, false)));
+            } catch (IntrospectionException e) {
+                throw new FlinkRuntimeException("Failed to extract nested 
table schema", e);
+            }
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupported 
TypeInfo %s", typeInfo));
+        }
+    }
+
+    private <TupleT extends Tuple> AttributeConverter<TupleT> 
getTupleTypeConverter(
+            TupleTypeInfo<TupleT> typeInfo) {
+        AttributeConverter<?>[] tupleFieldConverters =
+                new AttributeConverter<?>[typeInfo.getArity()];
+        for (int i = 0; i < typeInfo.getArity(); i++) {
+            tupleFieldConverters[i] = 
(getAttributeConverter(typeInfo.getTypeAt(i)));
+        }
+
+        return getAttributeConverter(
+                AttributeValueType.L,
+                tuple -> {
+                    List<AttributeValue> attributeValues = new ArrayList<>();
+                    for (int i = 0; i < typeInfo.getArity(); i++) {
+                        attributeValues.add(
+                                
tupleFieldConverters[i].transformFrom(tuple.getField(i)));
+                    }
+                    return AttributeValue.fromL(attributeValues);
+                });
+    }
+
+    private <ArrayT, AttributeT> AttributeConverter<ArrayT> 
getObjectArrayTypeConverter(
+            ObjectArrayTypeInfo<ArrayT, AttributeT> typeInfo) {
+        AttributeConverter<AttributeT> componentAttributeConverter =
+                getAttributeConverter(typeInfo.getComponentInfo());
+        return getAttributeConverter(
+                AttributeValueType.L,
+                array -> {
+                    AttributeT[] attrArray = (AttributeT[]) array;
+                    List<AttributeValue> attributeValues = new ArrayList<>();
+                    for (AttributeT attr : attrArray) {
+                        
attributeValues.add(componentAttributeConverter.transformFrom(attr));
+                    }
+                    return AttributeValue.fromL(attributeValues);
+                });
+    }
+
+    private List<String> convertObjectArrayToStringList(Object[] objectArray) {
+        List<String> stringList = new ArrayList<>();
+        for (Object object : objectArray) {
+            stringList.add(object.toString());
+        }
+        return stringList;
+    }
+
+    private List<String> convertPrimitiveArrayToStringList(Object objectArray) 
{
+        if (objectArray instanceof int[]) {
+            int[] intArray = (int[]) objectArray;
+            List<String> stringList = new ArrayList<>();
+            for (int i : intArray) {
+                stringList.add(Integer.toString(i));
+            }
+
+            return stringList;
+        } else if (objectArray instanceof double[]) {
+            double[] doubleArray = (double[]) objectArray;
+            List<String> stringList = new ArrayList<>();
+            for (double d : doubleArray) {
+                stringList.add(Double.toString(d));
+            }
+
+            return stringList;
+        } else if (objectArray instanceof long[]) {
+            long[] longArray = (long[]) objectArray;
+            List<String> stringList = new ArrayList<>();
+            for (long l : longArray) {
+                stringList.add(Long.toString(l));
+            }
+
+            return stringList;
+        } else if (objectArray instanceof float[]) {
+            float[] longArray = (float[]) objectArray;
+            List<String> stringList = new ArrayList<>();
+            for (float f : longArray) {
+                stringList.add(Float.toString(f));
+            }
+
+            return stringList;
+        } else if (objectArray instanceof short[]) {
+            short[] longArray = (short[]) objectArray;
+            List<String> stringList = new ArrayList<>();
+            for (short s : longArray) {
+                stringList.add(Short.toString(s));
+            }
+
+            return stringList;
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Unsupported primitive typeInfo %s",
+                            objectArray.getClass().getComponentType()));
+        }
+    }
+
+    private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter(
+            AttributeValueType attributeValueType,
+            Function<AttributeT, AttributeValue> transformer) {
+        return new AttributeConverter<AttributeT>() {
+            @Override
+            public AttributeValue transformFrom(AttributeT attribute) {
+                return transformer.apply(attribute);
+            }
+
+            @Override
+            public AttributeT transformTo(AttributeValue attributeValue) {
+                return null;
+            }
+
+            @Override
+            public EnhancedType<AttributeT> type() {
+                return null;
+            }
+
+            @Override
+            public AttributeValueType attributeValueType() {
+                return attributeValueType;
+            }
+        };
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java
new file mode 100644
index 0000000..c8c6fb4
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.connector.dynamodb.util.Order;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Test for {@link DefaultDynamoDbElementConverter}. */
+class DefaultDynamoDbElementConverterTest {
+
+    @Test
+    void defaultConverterFallsBackToInformedConverter() {
+        DefaultDynamoDbElementConverter<Order> converter = new 
DefaultDynamoDbElementConverter<>();
+        Order order = new Order("order-1", 1, 100.0);
+
+        DynamoDbWriteRequest request = converter.apply(order, null);
+        assertThat(converter).hasNoNullFieldsOrProperties();
+        assertThat(request.getItem()).isNotNull();
+        assertThat(request.getItem().get("orderId").s()).isEqualTo("order-1");
+        assertThat(request.getItem().get("quantity").n()).isEqualTo("1");
+        assertThat(request.getItem().get("total").n()).isEqualTo("100.0");
+    }
+
+    @Test
+    void defaultConverterThrowsExceptionForNonCompositeType() {
+        DefaultDynamoDbElementConverter<String> converter = new 
DefaultDynamoDbElementConverter<>();
+        String str = "test";
+
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> converter.apply(str, null))
+                .withMessage("The input type must be a CompositeType.");
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
index 9cf198e..26f75b0 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
@@ -41,23 +41,19 @@ public class DynamoDbSinkTest {
 
     @Test
     public void testSuccessfullyCreateWithMinimalConfiguration() {
-        DynamoDbSink.<Map<String, AttributeValue>>builder()
-                .setElementConverter(new TestDynamoDbElementConverter())
-                .setTableName("test_table")
-                .build();
+        DynamoDbSink.<Map<String, 
AttributeValue>>builder().setTableName("test_table").build();
     }
 
     @Test
-    public void testElementConverterRequired() {
-        assertThatExceptionOfType(NullPointerException.class)
-                .isThrownBy(
-                        () ->
-                                DynamoDbSink.builder()
-                                        .setTableName("test_table")
-                                        .setFailOnError(true)
-                                        .build())
-                .withMessageContaining(
-                        "ElementConverter must be not null when initializing 
the AsyncSinkBase.");
+    public void testElementConverterUsesDefaultConverterIfNotSet() {
+        DynamoDbSink<String> sink =
+                DynamoDbSink.<String>builder()
+                        .setTableName("test_table")
+                        .setFailOnError(true)
+                        .build();
+        assertThat(sink)
+                .extracting("elementConverter")
+                .isInstanceOf(DefaultDynamoDbElementConverter.class);
     }
 
     @Test
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java
new file mode 100644
index 0000000..66d251b
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.dynamodb.util.ComplexPayload;
+import org.apache.flink.connector.dynamodb.util.Order;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.RowUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamoDbTypeInformedElementConverter}. */
+public class DynamoDbTypeInformedElementConverterTest {
+
+    @Test
+    void simpleTypeConversion() {
+        DynamoDbTypeInformedElementConverter<Order> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<Order>) 
TypeInformation.of(Order.class));
+
+        Order order = new Order("orderId", 1, 2.0);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(order, null);
+
+        assertThat(actual.getType()).isEqualTo(PUT);
+        assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", 
"total");
+        assertThat(actual.getItem().get("orderId").s()).isEqualTo("orderId");
+        assertThat(actual.getItem().get("quantity").n()).isEqualTo("1");
+        assertThat(actual.getItem().get("total").n()).isEqualTo("2.0");
+    }
+
+    @Test
+    void complexTypeConversion() {
+        TypeInformation<ComplexPayload> typeInformation = 
TypeInformation.of(ComplexPayload.class);
+        DynamoDbTypeInformedElementConverter<ComplexPayload> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<ComplexPayload>) typeInformation);
+
+        ComplexPayload payload =
+                new ComplexPayload(
+                        "stringFieldVal",
+                        new String[] {"stringArrayFieldVal1", 
"stringArrayFieldVal2"},
+                        new int[] {10, 20},
+                        new ComplexPayload.InnerPayload(true, new byte[] {1, 
0, 10}),
+                        Tuple2.of(1, "tuple2FieldVal"));
+
+        DynamoDbWriteRequest actual = elementConverter.apply(payload, null);
+
+        assertThat(actual.getType()).isEqualTo(PUT);
+        assertThat(actual.getItem())
+                .containsOnlyKeys(
+                        "stringField",
+                        "stringArrayField",
+                        "intArrayField",
+                        "innerPayload",
+                        "tupleField");
+        assertThat(actual.getItem().get("stringArrayField").ss())
+                .containsExactly("stringArrayFieldVal1", 
"stringArrayFieldVal2");
+        
assertThat(actual.getItem().get("intArrayField").ns()).containsExactly("10", 
"20");
+
+        // verify tupleField
+        assertThat(actual.getItem().get("tupleField").l()).isNotNull();
+        assertThat(actual.getItem().get("tupleField").l()).hasSize(2);
+        
assertThat(actual.getItem().get("tupleField").l().get(0).n()).isEqualTo("1");
+        
assertThat(actual.getItem().get("tupleField").l().get(1).s()).isEqualTo("tuple2FieldVal");
+
+        // verify innerPayload
+        assertThat(actual.getItem().get("innerPayload").m()).isNotNull();
+        Map<String, AttributeValue> innerPayload = 
actual.getItem().get("innerPayload").m();
+
+        assertThat(innerPayload).containsOnlyKeys("primitiveBooleanField", 
"byteArrayField");
+        assertThat(innerPayload.get("primitiveBooleanField").bool()).isTrue();
+        assertThat(innerPayload.get("byteArrayField").b())
+                .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10}));
+    }
+
+    @Test
+    void convertTupleType() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO));
+
+        Tuple element = Tuple2.of("stringVal", 10);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0", "f1");
+        assertThat(actual.getItem().get("f0").s()).isEqualTo("stringVal");
+        assertThat(actual.getItem().get("f1").n()).isEqualTo("10");
+    }
+
+    @Test
+    void convertRowType() {
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {
+                            BasicTypeInfo.STRING_TYPE_INFO,
+                            BasicTypeInfo.INT_TYPE_INFO,
+                            BasicTypeInfo.DOUBLE_TYPE_INFO
+                        },
+                        new String[] {"stringRowFiled", "IntRowField", 
"DoubleRowField"});
+        DynamoDbTypeInformedElementConverter<Row> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(rowTypeInfo);
+
+        LinkedHashMap<String, Integer> rowMap = new LinkedHashMap<>();
+        rowMap.put("stringRowFiled", 1);
+        rowMap.put("IntRowField", 2);
+        rowMap.put("DoubleRowField", 3);
+        Row element =
+                RowUtils.createRowWithNamedPositions(
+                        RowKind.INSERT, new Object[] {"stringVal", 10, 20.0}, 
rowMap);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem())
+                .containsOnlyKeys("stringRowFiled", "IntRowField", 
"DoubleRowField");
+        
assertThat(actual.getItem().get("stringRowFiled").s()).isEqualTo("stringVal");
+        assertThat(actual.getItem().get("IntRowField").n()).isEqualTo("10");
+        
assertThat(actual.getItem().get("DoubleRowField").n()).isEqualTo("20.0");
+    }
+
+    @Test
+    void convertObjectArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                
ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Order.class))));
+
+        Tuple element =
+                Tuple1.of(
+                        new Order[] {new Order("orderId1", 1, 2.0), new 
Order("orderId2", 3, 4.0)});
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").l()).hasSize(2);
+        assertThat(actual.getItem().get("f0").l().get(0).m())
+                .containsOnlyKeys("orderId", "quantity", "total");
+        
assertThat(actual.getItem().get("f0").l().get(0).m().get("orderId").s())
+                .isEqualTo("orderId1");
+        
assertThat(actual.getItem().get("f0").l().get(0).m().get("quantity").n()).isEqualTo("1");
+        
assertThat(actual.getItem().get("f0").l().get(0).m().get("total").n()).isEqualTo("2.0");
+        assertThat(actual.getItem().get("f0").l().get(1).m())
+                .containsOnlyKeys("orderId", "quantity", "total");
+        
assertThat(actual.getItem().get("f0").l().get(1).m().get("orderId").s())
+                .isEqualTo("orderId2");
+        
assertThat(actual.getItem().get("f0").l().get(1).m().get("quantity").n()).isEqualTo("3");
+        
assertThat(actual.getItem().get("f0").l().get(1).m().get("total").n()).isEqualTo("4.0");
+    }
+
+    @Test
+    void convertLongPrimitiveArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new 
TupleTypeInfo<>(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of(new long[] {1, 2, 3});
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").ns()).containsExactly("1", "2", 
"3");
+    }
+
+    @Test
+    void convertFloatPrimitiveArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                
PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of(new float[] {1.0f, 2.0f, 3.0f});
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").ns()).containsExactly("1.0", 
"2.0", "3.0");
+    }
+
+    @Test
+    void convertDoublePrimitiveArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of(new double[] {1.0, 2.0, 3.0});
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").ns()).containsExactly("1.0", 
"2.0", "3.0");
+    }
+
+    @Test
+    void convertShortPrimitiveArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                
PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of(new short[] {1, 2, 3});
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").ns()).containsExactly("1", "2", 
"3");
+    }
+
+    @Test
+    void unsupportedTypeIsWrappedByFlinkException() {
+
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                new DynamoDbTypeInformedElementConverter<>(
+                                        new TupleTypeInfo<>(
+                                                BasicTypeInfo.DATE_TYPE_INFO,
+                                                
BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)))
+                .withCauseInstanceOf(IllegalStateException.class)
+                .withMessageContaining("Failed to extract DynamoDb table 
schema");
+    }
+
+    @Test
+    void unmatchedTypeIsWrappedByFlinkException() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO));
+
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> elementConverter.apply(Tuple1.of("nan"), 
null))
+                .withCauseInstanceOf(ClassCastException.class);
+    }
+
+    @Test
+    void convertOrderToDynamoDbWriteRequestWithIgnoresNullByDefault() {
+        DynamoDbTypeInformedElementConverter<Order> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<Order>) 
TypeInformation.of(Order.class));
+        Order order = new Order(null, 1, 2.0);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(order, null);
+        assertThat(actual.getItem()).containsOnlyKeys("quantity", "total");
+    }
+
+    @Test
+    void convertOrderToDynamoDbWriteRequestWritesNullIfConfigured() {
+        DynamoDbTypeInformedElementConverter<Order> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<Order>) 
TypeInformation.of(Order.class), false);
+        Order order = new Order(null, 1, 2.0);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(order, null);
+
+        assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", 
"total");
+        assertThat(actual.getItem().get("orderId").nul()).isTrue();
+    }
+
+    @Test
+    void convertSdkBytesAsTypeArray() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new 
TupleTypeInfo<>(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of(SdkBytes.fromByteArray(new byte[] {1, 0, 
10}));
+
+        DynamoDbWriteRequest actual = elementConverter.apply(element, null);
+        assertThat(actual.getItem()).containsOnlyKeys("f0");
+        assertThat(actual.getItem().get("f0").b())
+                .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10}));
+    }
+
+    @Test
+    void convertInvalidByteArrayThrowsException() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new 
TupleTypeInfo<>(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+
+        Tuple element = Tuple1.of("invalidByteArray");
+
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> elementConverter.apply(element, null))
+                .withCauseInstanceOf(ClassCastException.class);
+    }
+
+    @Test
+    void convertUnsupportedPrimitiveArrayThrowsException() {
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                new DynamoDbTypeInformedElementConverter<>(
+                                        new TupleTypeInfo<>(
+                                                PrimitiveArrayTypeInfo
+                                                        
.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)))
+                .withCauseInstanceOf(IllegalArgumentException.class)
+                .havingCause()
+                .withMessageContaining(
+                        "Unsupported primitive array typeInfo " + 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+    }
+
+    @Test
+    void convertUnsupportedCompositeTypeThrowsException() {
+        CompositeType<Order> unsupportedCompositeType =
+                new CompositeType<Order>(Order.class) {
+                    @Override
+                    public boolean isBasicType() {
+                        return false;
+                    }
+
+                    @Override
+                    public boolean isTupleType() {
+                        return false;
+                    }
+
+                    @Override
+                    public int getArity() {
+                        return 0;
+                    }
+
+                    @Override
+                    public int getTotalFields() {
+                        return 0;
+                    }
+
+                    @Override
+                    public <T> TypeInformation<T> getTypeAt(int pos) {
+                        return null;
+                    }
+
+                    @Override
+                    protected TypeComparatorBuilder<Order> 
createTypeComparatorBuilder() {
+                        return null;
+                    }
+
+                    @Override
+                    public <T> TypeInformation<T> getTypeAt(String 
fieldExpression) {
+                        return null;
+                    }
+
+                    @Override
+                    public String[] getFieldNames() {
+                        return new String[0];
+                    }
+
+                    @Override
+                    public int getFieldIndex(String fieldName) {
+                        return 0;
+                    }
+
+                    @Override
+                    public boolean isKeyType() {
+                        return false;
+                    }
+
+                    @Override
+                    public TypeSerializer<Order> 
createSerializer(ExecutionConfig executionConfig) {
+                        return null;
+                    }
+
+                    @Override
+                    public Class<Order> getTypeClass() {
+                        return null;
+                    }
+
+                    @Override
+                    public void getFlatFields(String s, int i, 
List<FlatFieldDescriptor> list) {}
+                };
+
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                new 
DynamoDbTypeInformedElementConverter<>(unsupportedCompositeType)
+                                        .open(null))
+                .withCauseInstanceOf(IllegalArgumentException.class)
+                .havingCause()
+                .withMessageContaining("Unsupported TypeInfo " + 
unsupportedCompositeType);
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java
new file mode 100644
index 0000000..c2e3c33
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.connector.dynamodb.sink.DynamoDbTypeInformedElementConverter;
+
+import java.io.Serializable;
+
+/** A test POJO for use with {@link DynamoDbTypeInformedElementConverter}. */
+public class ComplexPayload implements Serializable {
+    private static final long serialVersionUID = 233624606545704853L;
+
+    private String stringField;
+    private String[] stringArrayField;
+    private int[] intArrayField;
+    private InnerPayload innerPayload;
+    private Tuple2<Integer, String> tupleField;
+
+    public ComplexPayload() {}
+
+    public ComplexPayload(
+            String stringField,
+            String[] stringArrayField,
+            int[] intArrayField,
+            InnerPayload innerPayload,
+            Tuple2<Integer, String> tupleField) {
+        this.stringField = stringField;
+        this.stringArrayField = stringArrayField;
+        this.intArrayField = intArrayField;
+        this.innerPayload = innerPayload;
+        this.tupleField = tupleField;
+    }
+
+    public String getStringField() {
+        return stringField;
+    }
+
+    public String[] getStringArrayField() {
+        return stringArrayField;
+    }
+
+    public int[] getIntArrayField() {
+        return intArrayField;
+    }
+
+    public InnerPayload getInnerPayload() {
+        return innerPayload;
+    }
+
+    public Tuple2<Integer, String> getTupleField() {
+        return tupleField;
+    }
+
+    public void setStringField(String stringField) {
+        this.stringField = stringField;
+    }
+
+    public void setStringArrayField(String[] stringArrayField) {
+        this.stringArrayField = stringArrayField;
+    }
+
+    public void setIntArrayField(int[] intArrayField) {
+        this.intArrayField = intArrayField;
+    }
+
+    public void setInnerPayload(InnerPayload innerPayload) {
+        this.innerPayload = innerPayload;
+    }
+
+    public void setTupleField(Tuple2<Integer, String> tupleField) {
+        this.tupleField = tupleField;
+    }
+
+    /** A test POJO for use as InnerPayload for {@link ComplexPayload}. */
+    public static class InnerPayload implements Serializable {
+        private static final long serialVersionUID = 3986298180012117883L;
+
+        private boolean primitiveBooleanField;
+        private byte[] byteArrayField;
+
+        public InnerPayload() {}
+
+        public InnerPayload(boolean primitiveBooleanField, byte[] 
byteArrayField) {
+            this.primitiveBooleanField = primitiveBooleanField;
+            this.byteArrayField = byteArrayField;
+        }
+
+        public boolean getPrimitiveBooleanField() {
+            return primitiveBooleanField;
+        }
+
+        public byte[] getByteArrayField() {
+            return byteArrayField;
+        }
+
+        public void setPrimitiveBooleanField(boolean primitiveBooleanField) {
+            this.primitiveBooleanField = primitiveBooleanField;
+        }
+
+        public void setByteArrayField(byte[] byteArrayField) {
+            this.byteArrayField = byteArrayField;
+        }
+    }
+}

Reply via email to