dannycranmer commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1565419487


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);
+                field.setAccessible(true);
+                Object fieldVal = field.get(t);
+                checkTypeCompatibility(fieldVal, fieldType);
+                attT fieldValCaster = (attT) fieldVal;
+                map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to extract field %s declared in 
TypeInfo "
+                                        + "from Object %s",
+                                fieldKey, t),
+                        e);
+            }
+        }
+
+        return map;
+    }
+
+    private <attrT> AttributeValue convertValue(
+            attrT attribute, TypeInformation<attrT> objectTypeInformation) {
+        if (attribute == null) {
+            return AttributeValue.builder().nul(true).build();

Review Comment:
   Not sure this is the correct thing to do here? I believe a common practise 
is to omit null fields rather than setting to null. Maybe this could be a 
configuration on the element converter? 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);
+                field.setAccessible(true);
+                Object fieldVal = field.get(t);
+                checkTypeCompatibility(fieldVal, fieldType);
+                attT fieldValCaster = (attT) fieldVal;
+                map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to extract field %s declared in 
TypeInfo "
+                                        + "from Object %s",
+                                fieldKey, t),
+                        e);
+            }
+        }
+
+        return map;
+    }
+
+    private <attrT> AttributeValue convertValue(
+            attrT attribute, TypeInformation<attrT> objectTypeInformation) {
+        if (attribute == null) {
+            return AttributeValue.builder().nul(true).build();
+        } else if (objectTypeInformation.isBasicType()) {
+            return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) 
objectTypeInformation);
+        } else if (objectTypeInformation.equals(
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            // special handling for byte array as it has special support in 
DynamoDB
+            if (attribute instanceof byte[]) {
+                return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) 
attribute));
+            } else if (attribute instanceof SdkBytes) {
+                return AttributeValue.fromB((SdkBytes) attribute);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Failed to extract byte array type from 
%s", attribute));
+            }

Review Comment:
   This block is untested, please add tests



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.dynamodb.util.ComplexPayload;
+import org.apache.flink.connector.dynamodb.util.Order;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamoDbTypeInformedElementConverter}. */
+public class DynamoDbTypeInformedElementConverterTest {
+
+    @Test
+    public void testSimpleTypeConversion() {

Review Comment:
   nit: JUnit 5 coding standards are slightly different from lower versions. In 
5 we:
     1/ Use package protected class/methods (drop `public`)
     2/ Remove the `test` prefix on the methods (`testSimpleTypeConversion()` 
-> `simpleTypeConversion()`)



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);
+                field.setAccessible(true);
+                Object fieldVal = field.get(t);
+                checkTypeCompatibility(fieldVal, fieldType);
+                attT fieldValCaster = (attT) fieldVal;
+                map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to extract field %s declared in 
TypeInfo "
+                                        + "from Object %s",
+                                fieldKey, t),
+                        e);
+            }
+        }
+
+        return map;
+    }
+
+    private <attrT> AttributeValue convertValue(
+            attrT attribute, TypeInformation<attrT> objectTypeInformation) {
+        if (attribute == null) {
+            return AttributeValue.builder().nul(true).build();
+        } else if (objectTypeInformation.isBasicType()) {
+            return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) 
objectTypeInformation);
+        } else if (objectTypeInformation.equals(
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            // special handling for byte array as it has special support in 
DynamoDB
+            if (attribute instanceof byte[]) {
+                return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) 
attribute));
+            } else if (attribute instanceof SdkBytes) {
+                return AttributeValue.fromB((SdkBytes) attribute);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Failed to extract byte array type from 
%s", attribute));
+            }
+
+        } else if (isArrayTypeInfo(objectTypeInformation)) {
+            TypeInformation<?> componentTypeInfo =
+                    objectTypeInformation instanceof BasicArrayTypeInfo
+                            ? ((BasicArrayTypeInfo<?, ?>) 
objectTypeInformation).getComponentInfo()
+                            : objectTypeInformation instanceof 
PrimitiveArrayTypeInfo
+                                    ? ((PrimitiveArrayTypeInfo<?>) 
objectTypeInformation)
+                                            .getComponentType()
+                                    : ((ObjectArrayTypeInfo<?, ?>) 
objectTypeInformation)
+                                            .getComponentInfo();
+            if (attribute instanceof Collection) {
+                return convertArrayTypeObject(
+                        ((Collection<?>) attribute).toArray(), 
componentTypeInfo, false);
+            }
+
+            return convertArrayTypeObject(
+                    attribute,
+                    componentTypeInfo,
+                    objectTypeInformation instanceof PrimitiveArrayTypeInfo);
+
+        } else if (objectTypeInformation instanceof TupleTypeInfo) {
+            TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) 
objectTypeInformation;
+            Tuple t = (Tuple) attribute;
+            List<AttributeValue> attributeValues = new ArrayList<>();
+            for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) {
+                attributeValues.add(convertValue(t.getField(i), 
tupleTypeInfo.getTypeAt(i)));
+            }
+            return AttributeValue.fromL(attributeValues);

Review Comment:
   This block is untested, please add tests



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);
+                field.setAccessible(true);
+                Object fieldVal = field.get(t);
+                checkTypeCompatibility(fieldVal, fieldType);
+                attT fieldValCaster = (attT) fieldVal;
+                map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to extract field %s declared in 
TypeInfo "
+                                        + "from Object %s",
+                                fieldKey, t),
+                        e);
+            }
+        }
+
+        return map;
+    }
+
+    private <attrT> AttributeValue convertValue(
+            attrT attribute, TypeInformation<attrT> objectTypeInformation) {
+        if (attribute == null) {
+            return AttributeValue.builder().nul(true).build();
+        } else if (objectTypeInformation.isBasicType()) {
+            return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) 
objectTypeInformation);
+        } else if (objectTypeInformation.equals(
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            // special handling for byte array as it has special support in 
DynamoDB
+            if (attribute instanceof byte[]) {
+                return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) 
attribute));
+            } else if (attribute instanceof SdkBytes) {
+                return AttributeValue.fromB((SdkBytes) attribute);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Failed to extract byte array type from 
%s", attribute));
+            }
+
+        } else if (isArrayTypeInfo(objectTypeInformation)) {
+            TypeInformation<?> componentTypeInfo =
+                    objectTypeInformation instanceof BasicArrayTypeInfo
+                            ? ((BasicArrayTypeInfo<?, ?>) 
objectTypeInformation).getComponentInfo()
+                            : objectTypeInformation instanceof 
PrimitiveArrayTypeInfo
+                                    ? ((PrimitiveArrayTypeInfo<?>) 
objectTypeInformation)
+                                            .getComponentType()
+                                    : ((ObjectArrayTypeInfo<?, ?>) 
objectTypeInformation)
+                                            .getComponentInfo();
+            if (attribute instanceof Collection) {
+                return convertArrayTypeObject(
+                        ((Collection<?>) attribute).toArray(), 
componentTypeInfo, false);
+            }
+
+            return convertArrayTypeObject(
+                    attribute,
+                    componentTypeInfo,
+                    objectTypeInformation instanceof PrimitiveArrayTypeInfo);
+
+        } else if (objectTypeInformation instanceof TupleTypeInfo) {
+            TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) 
objectTypeInformation;
+            Tuple t = (Tuple) attribute;
+            List<AttributeValue> attributeValues = new ArrayList<>();
+            for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) {
+                attributeValues.add(convertValue(t.getField(i), 
tupleTypeInfo.getTypeAt(i)));
+            }
+            return AttributeValue.fromL(attributeValues);
+
+        } else if (objectTypeInformation instanceof CompositeType) {
+            return AttributeValue.fromM(
+                    convertElementUsingTypeInfo(
+                            attribute, (CompositeType<attrT>) 
objectTypeInformation));
+
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Unsupported TypeInfo %s", 
objectTypeInformation));
+        }
+    }
+
+    private boolean isArrayTypeInfo(TypeInformation<?> typeInformation) {
+        return typeInformation instanceof BasicArrayTypeInfo
+                || typeInformation instanceof PrimitiveArrayTypeInfo
+                || typeInformation instanceof ObjectArrayTypeInfo;
+    }
+
+    private <attrT> AttributeValue convertArrayTypeObject(
+            Object attribute, TypeInformation<attrT> componentTypeInfo, 
boolean isPrimitive) {
+
+        if (componentTypeInfo.getTypeClass().isAssignableFrom(String.class)) {
+            return 
AttributeValue.fromSs(convertObjectArrayToStringList((Object[]) attribute));
+        } else if (componentTypeInfo instanceof NumericTypeInfo && 
!isPrimitive) {
+            return 
AttributeValue.fromNs(convertObjectArrayToStringList((Object[]) attribute));
+        } else if (componentTypeInfo instanceof NumericTypeInfo) {
+            if (attribute instanceof int[]) {
+                return AttributeValue.fromNs(
+                        (Arrays.stream((int[]) attribute)
+                                .boxed()
+                                .map(Object::toString)
+                                .collect(Collectors.toList())));
+            } else if (attribute instanceof double[]) {
+                return AttributeValue.fromNs(
+                        (Arrays.stream((double[]) attribute)
+                                .boxed()
+                                .map(Object::toString)
+                                .collect(Collectors.toList())));
+            } else if (attribute instanceof long[]) {
+                return AttributeValue.fromNs(
+                        (Arrays.stream((long[]) attribute)
+                                .boxed()
+                                .map(Object::toString)
+                                .collect(Collectors.toList())));
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unsupported primitive numeric typeInfo %s",
+                                componentTypeInfo.getTypeClass()));
+            }
+        } else if (!isPrimitive) {
+            List<AttributeValue> attributeValueList = new ArrayList<>();
+            for (Object attributeItem : (Object[]) attribute) {
+                checkTypeCompatibility(attributeItem, componentTypeInfo);
+                attributeValueList.add(convertValue(((attrT) attributeItem), 
componentTypeInfo));
+            }
+            return AttributeValue.fromL(attributeValueList);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Unsupported primitive typeInfo %s", 
componentTypeInfo.getTypeClass()));
+        }

Review Comment:
   This block is untested, please add tests



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);

Review Comment:
   As per the [coding 
guidelines](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection)
 we should not use reflection. Can we instead use the `TypeExtractor` class as 
suggested in the docs? 
   
   Another note here is that loading the class information for each record is 
expensive. Suggest that we lazily cache the type info based on the class.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.dynamodb.util.ComplexPayload;
+import org.apache.flink.connector.dynamodb.util.Order;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamoDbTypeInformedElementConverter}. */
+public class DynamoDbTypeInformedElementConverterTest {
+
+    @Test
+    public void testSimpleTypeConversion() {
+        DynamoDbTypeInformedElementConverter<Order> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<Order>) 
TypeInformation.of(Order.class));
+
+        Order order = new Order("orderId", 1, 2.0);
+
+        DynamoDbWriteRequest actual = elementConverter.apply(order, null);
+
+        assertThat(actual.getType()).isEqualTo(PUT);
+        assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", 
"total");
+        assertThat(actual.getItem().get("orderId").s()).isEqualTo("orderId");
+        assertThat(actual.getItem().get("quantity").n()).isEqualTo("1");
+        assertThat(actual.getItem().get("total").n()).isEqualTo("2.0");
+    }
+
+    @Test
+    public void testComplexTypeConversion() {
+        TypeInformation<ComplexPayload> typeInformation = 
TypeInformation.of(ComplexPayload.class);
+        DynamoDbTypeInformedElementConverter<ComplexPayload> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        (CompositeType<ComplexPayload>) typeInformation);
+
+        ComplexPayload payload =
+                new ComplexPayload(
+                        "stringFieldVal",
+                        new String[] {"stringArrayFieldVal1", 
"stringArrayFieldVal2"},
+                        new int[] {10, 20},
+                        new ComplexPayload.InnerPayload(true, new byte[] {1, 
0, 10}));
+
+        DynamoDbWriteRequest actual = elementConverter.apply(payload, null);
+
+        assertThat(actual.getType()).isEqualTo(PUT);
+        assertThat(actual.getItem())
+                .containsOnlyKeys(
+                        "stringField", "stringArrayField", "intArrayField", 
"innerPayload");
+        assertThat(actual.getItem().get("stringArrayField").ss())
+                .containsExactly("stringArrayFieldVal1", 
"stringArrayFieldVal2");
+        
assertThat(actual.getItem().get("intArrayField").ns()).containsExactly("10", 
"20");
+
+        assertThat(actual.getItem().get("innerPayload").m()).isNotNull();
+        Map<String, AttributeValue> innerPayload = 
actual.getItem().get("innerPayload").m();
+
+        assertThat(innerPayload).containsOnlyKeys("primitiveBooleanField", 
"byteArrayField");
+        assertThat(innerPayload.get("primitiveBooleanField").bool()).isTrue();
+        assertThat(innerPayload.get("byteArrayField").b())
+                .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10}));
+    }
+
+    @Test
+    public void testCollectionTypeConversion() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                BasicTypeInfo.STRING_TYPE_INFO,
+                                BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
+        List<String> stringArray = new ArrayList<>();
+        stringArray.add("stringArrayFieldVal1");
+        stringArray.add("stringArrayFieldVal2");
+
+        DynamoDbWriteRequest actual =
+                elementConverter.apply(Tuple2.of("stringVal", stringArray), 
null);
+
+        assertThat(actual.getType()).isEqualTo(PUT);
+        assertThat(actual.getItem()).containsOnlyKeys("f0", "f1");
+        assertThat(actual.getItem().get("f0").s()).isEqualTo("stringVal");
+        assertThat(actual.getItem().get("f1").ss())
+                .containsExactly("stringArrayFieldVal1", 
"stringArrayFieldVal2");
+    }
+
+    @Test
+    public void testUnsupportedTypeIsWrappedByFlinkException() {
+        DynamoDbTypeInformedElementConverter<Tuple> elementConverter =
+                new DynamoDbTypeInformedElementConverter<>(
+                        new TupleTypeInfo<>(
+                                BasicTypeInfo.DATE_TYPE_INFO,
+                                BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
+
+        Date dateVal = Date.from(Instant.now());
+
+        Throwable throwable =
+                Assertions.assertThrows(
+                        FlinkRuntimeException.class,
+                        () -> {
+                            elementConverter.apply(Tuple2.of(dateVal, 
"stringVal"), null);
+                        });
+
+        
assertThat(throwable).hasCauseInstanceOf(IllegalArgumentException.class);
+        assertThat(throwable.getCause())
+                .hasMessageContaining("Unsupported BasicTypeInfo " + 
BasicTypeInfo.DATE_TYPE_INFO);

Review Comment:
   Can you use the AssertJ lib here instead?
   
   ```
   Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
                   .isThrownBy(() -> elementConverter.apply(Tuple1.of("nan"), 
null))
                   .withMessageContaining("Incompatible type when converting 
nan, using Integer type information.")
                   .withRootCauseInstanceOf(IllegalArgumentException.class);
   ```



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter<inputT>
+        implements ElementConverter<inputT, DynamoDbWriteRequest> {
+    private final CompositeType<inputT> typeInfo;
+
+    /**
+     * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+     * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+     * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+     *
+     * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+     */
+    public DynamoDbTypeInformedElementConverter(CompositeType<inputT> 
typeInfo) {
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+        try {
+            return DynamoDbWriteRequest.builder()
+                    .setType(DynamoDbWriteRequestType.PUT)
+                    .setItem(convertElementUsingTypeInfo(input, typeInfo))
+                    .build();
+        } catch (IllegalArgumentException e) {
+            throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+        }
+    }
+
+    private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo(
+            attT t, CompositeType<attT> typeInfo) {
+        Map<String, AttributeValue> map = new HashMap<>();
+        for (String fieldKey : typeInfo.getFieldNames()) {
+            TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey);
+            try {
+                Field field = t.getClass().getDeclaredField(fieldKey);
+                field.setAccessible(true);
+                Object fieldVal = field.get(t);
+                checkTypeCompatibility(fieldVal, fieldType);
+                attT fieldValCaster = (attT) fieldVal;
+                map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to extract field %s declared in 
TypeInfo "
+                                        + "from Object %s",
+                                fieldKey, t),
+                        e);
+            }
+        }
+
+        return map;
+    }
+
+    private <attrT> AttributeValue convertValue(
+            attrT attribute, TypeInformation<attrT> objectTypeInformation) {
+        if (attribute == null) {
+            return AttributeValue.builder().nul(true).build();
+        } else if (objectTypeInformation.isBasicType()) {
+            return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) 
objectTypeInformation);
+        } else if (objectTypeInformation.equals(
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            // special handling for byte array as it has special support in 
DynamoDB
+            if (attribute instanceof byte[]) {
+                return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) 
attribute));
+            } else if (attribute instanceof SdkBytes) {
+                return AttributeValue.fromB((SdkBytes) attribute);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Failed to extract byte array type from 
%s", attribute));
+            }
+
+        } else if (isArrayTypeInfo(objectTypeInformation)) {
+            TypeInformation<?> componentTypeInfo =
+                    objectTypeInformation instanceof BasicArrayTypeInfo
+                            ? ((BasicArrayTypeInfo<?, ?>) 
objectTypeInformation).getComponentInfo()
+                            : objectTypeInformation instanceof 
PrimitiveArrayTypeInfo
+                                    ? ((PrimitiveArrayTypeInfo<?>) 
objectTypeInformation)
+                                            .getComponentType()
+                                    : ((ObjectArrayTypeInfo<?, ?>) 
objectTypeInformation)
+                                            .getComponentInfo();
+            if (attribute instanceof Collection) {
+                return convertArrayTypeObject(
+                        ((Collection<?>) attribute).toArray(), 
componentTypeInfo, false);
+            }
+
+            return convertArrayTypeObject(
+                    attribute,
+                    componentTypeInfo,
+                    objectTypeInformation instanceof PrimitiveArrayTypeInfo);
+
+        } else if (objectTypeInformation instanceof TupleTypeInfo) {
+            TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) 
objectTypeInformation;
+            Tuple t = (Tuple) attribute;
+            List<AttributeValue> attributeValues = new ArrayList<>();
+            for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) {
+                attributeValues.add(convertValue(t.getField(i), 
tupleTypeInfo.getTypeAt(i)));
+            }
+            return AttributeValue.fromL(attributeValues);
+
+        } else if (objectTypeInformation instanceof CompositeType) {
+            return AttributeValue.fromM(
+                    convertElementUsingTypeInfo(
+                            attribute, (CompositeType<attrT>) 
objectTypeInformation));
+
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Unsupported TypeInfo %s", 
objectTypeInformation));
+        }
+    }
+
+    private boolean isArrayTypeInfo(TypeInformation<?> typeInformation) {
+        return typeInformation instanceof BasicArrayTypeInfo
+                || typeInformation instanceof PrimitiveArrayTypeInfo
+                || typeInformation instanceof ObjectArrayTypeInfo;
+    }
+
+    private <attrT> AttributeValue convertArrayTypeObject(
+            Object attribute, TypeInformation<attrT> componentTypeInfo, 
boolean isPrimitive) {
+
+        if (componentTypeInfo.getTypeClass().isAssignableFrom(String.class)) {
+            return 
AttributeValue.fromSs(convertObjectArrayToStringList((Object[]) attribute));
+        } else if (componentTypeInfo instanceof NumericTypeInfo && 
!isPrimitive) {
+            return 
AttributeValue.fromNs(convertObjectArrayToStringList((Object[]) attribute));
+        } else if (componentTypeInfo instanceof NumericTypeInfo) {
+            if (attribute instanceof int[]) {
+                return AttributeValue.fromNs(
+                        (Arrays.stream((int[]) attribute)

Review Comment:
   As a big fan of the stream API, it pains me to say that we [should not be 
using it for performance critical 
code](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-streams).
 Since this would be called for each record I would consider this performance 
critical/data-intensive. Please convert to standard loops, can extract to new 
methods for readability. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to