dannycranmer commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1565419487
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); + field.setAccessible(true); + Object fieldVal = field.get(t); + checkTypeCompatibility(fieldVal, fieldType); + attT fieldValCaster = (attT) fieldVal; + map.put(fieldKey, convertValue(fieldValCaster, fieldType)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + String.format( + "Failed to extract field %s declared in TypeInfo " + + "from Object %s", + fieldKey, t), + e); + } + } + + return map; + } + + private <attrT> AttributeValue convertValue( + attrT attribute, TypeInformation<attrT> objectTypeInformation) { + if (attribute == null) { + return AttributeValue.builder().nul(true).build(); Review Comment: Not sure this is the correct thing to do here? I believe a common practise is to omit null fields rather than setting to null. Maybe this could be a configuration on the element converter? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); + field.setAccessible(true); + Object fieldVal = field.get(t); + checkTypeCompatibility(fieldVal, fieldType); + attT fieldValCaster = (attT) fieldVal; + map.put(fieldKey, convertValue(fieldValCaster, fieldType)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + String.format( + "Failed to extract field %s declared in TypeInfo " + + "from Object %s", + fieldKey, t), + e); + } + } + + return map; + } + + private <attrT> AttributeValue convertValue( + attrT attribute, TypeInformation<attrT> objectTypeInformation) { + if (attribute == null) { + return AttributeValue.builder().nul(true).build(); + } else if (objectTypeInformation.isBasicType()) { + return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) objectTypeInformation); + } else if (objectTypeInformation.equals( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) { + // special handling for byte array as it has special support in DynamoDB + if (attribute instanceof byte[]) { + return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) attribute)); + } else if (attribute instanceof SdkBytes) { + return AttributeValue.fromB((SdkBytes) attribute); + } else { + throw new IllegalArgumentException( + String.format("Failed to extract byte array type from %s", attribute)); + } Review Comment: This block is untested, please add tests ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.dynamodb.util.ComplexPayload; +import org.apache.flink.connector.dynamodb.util.Order; +import org.apache.flink.util.FlinkRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DynamoDbTypeInformedElementConverter}. */ +public class DynamoDbTypeInformedElementConverterTest { + + @Test + public void testSimpleTypeConversion() { Review Comment: nit: JUnit 5 coding standards are slightly different from lower versions. In 5 we: 1/ Use package protected class/methods (drop `public`) 2/ Remove the `test` prefix on the methods (`testSimpleTypeConversion()` -> `simpleTypeConversion()`) ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); + field.setAccessible(true); + Object fieldVal = field.get(t); + checkTypeCompatibility(fieldVal, fieldType); + attT fieldValCaster = (attT) fieldVal; + map.put(fieldKey, convertValue(fieldValCaster, fieldType)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + String.format( + "Failed to extract field %s declared in TypeInfo " + + "from Object %s", + fieldKey, t), + e); + } + } + + return map; + } + + private <attrT> AttributeValue convertValue( + attrT attribute, TypeInformation<attrT> objectTypeInformation) { + if (attribute == null) { + return AttributeValue.builder().nul(true).build(); + } else if (objectTypeInformation.isBasicType()) { + return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) objectTypeInformation); + } else if (objectTypeInformation.equals( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) { + // special handling for byte array as it has special support in DynamoDB + if (attribute instanceof byte[]) { + return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) attribute)); + } else if (attribute instanceof SdkBytes) { + return AttributeValue.fromB((SdkBytes) attribute); + } else { + throw new IllegalArgumentException( + String.format("Failed to extract byte array type from %s", attribute)); + } + + } else if (isArrayTypeInfo(objectTypeInformation)) { + TypeInformation<?> componentTypeInfo = + objectTypeInformation instanceof BasicArrayTypeInfo + ? ((BasicArrayTypeInfo<?, ?>) objectTypeInformation).getComponentInfo() + : objectTypeInformation instanceof PrimitiveArrayTypeInfo + ? ((PrimitiveArrayTypeInfo<?>) objectTypeInformation) + .getComponentType() + : ((ObjectArrayTypeInfo<?, ?>) objectTypeInformation) + .getComponentInfo(); + if (attribute instanceof Collection) { + return convertArrayTypeObject( + ((Collection<?>) attribute).toArray(), componentTypeInfo, false); + } + + return convertArrayTypeObject( + attribute, + componentTypeInfo, + objectTypeInformation instanceof PrimitiveArrayTypeInfo); + + } else if (objectTypeInformation instanceof TupleTypeInfo) { + TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) objectTypeInformation; + Tuple t = (Tuple) attribute; + List<AttributeValue> attributeValues = new ArrayList<>(); + for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) { + attributeValues.add(convertValue(t.getField(i), tupleTypeInfo.getTypeAt(i))); + } + return AttributeValue.fromL(attributeValues); Review Comment: This block is untested, please add tests ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); + field.setAccessible(true); + Object fieldVal = field.get(t); + checkTypeCompatibility(fieldVal, fieldType); + attT fieldValCaster = (attT) fieldVal; + map.put(fieldKey, convertValue(fieldValCaster, fieldType)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + String.format( + "Failed to extract field %s declared in TypeInfo " + + "from Object %s", + fieldKey, t), + e); + } + } + + return map; + } + + private <attrT> AttributeValue convertValue( + attrT attribute, TypeInformation<attrT> objectTypeInformation) { + if (attribute == null) { + return AttributeValue.builder().nul(true).build(); + } else if (objectTypeInformation.isBasicType()) { + return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) objectTypeInformation); + } else if (objectTypeInformation.equals( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) { + // special handling for byte array as it has special support in DynamoDB + if (attribute instanceof byte[]) { + return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) attribute)); + } else if (attribute instanceof SdkBytes) { + return AttributeValue.fromB((SdkBytes) attribute); + } else { + throw new IllegalArgumentException( + String.format("Failed to extract byte array type from %s", attribute)); + } + + } else if (isArrayTypeInfo(objectTypeInformation)) { + TypeInformation<?> componentTypeInfo = + objectTypeInformation instanceof BasicArrayTypeInfo + ? ((BasicArrayTypeInfo<?, ?>) objectTypeInformation).getComponentInfo() + : objectTypeInformation instanceof PrimitiveArrayTypeInfo + ? ((PrimitiveArrayTypeInfo<?>) objectTypeInformation) + .getComponentType() + : ((ObjectArrayTypeInfo<?, ?>) objectTypeInformation) + .getComponentInfo(); + if (attribute instanceof Collection) { + return convertArrayTypeObject( + ((Collection<?>) attribute).toArray(), componentTypeInfo, false); + } + + return convertArrayTypeObject( + attribute, + componentTypeInfo, + objectTypeInformation instanceof PrimitiveArrayTypeInfo); + + } else if (objectTypeInformation instanceof TupleTypeInfo) { + TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) objectTypeInformation; + Tuple t = (Tuple) attribute; + List<AttributeValue> attributeValues = new ArrayList<>(); + for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) { + attributeValues.add(convertValue(t.getField(i), tupleTypeInfo.getTypeAt(i))); + } + return AttributeValue.fromL(attributeValues); + + } else if (objectTypeInformation instanceof CompositeType) { + return AttributeValue.fromM( + convertElementUsingTypeInfo( + attribute, (CompositeType<attrT>) objectTypeInformation)); + + } else { + throw new IllegalArgumentException( + String.format("Unsupported TypeInfo %s", objectTypeInformation)); + } + } + + private boolean isArrayTypeInfo(TypeInformation<?> typeInformation) { + return typeInformation instanceof BasicArrayTypeInfo + || typeInformation instanceof PrimitiveArrayTypeInfo + || typeInformation instanceof ObjectArrayTypeInfo; + } + + private <attrT> AttributeValue convertArrayTypeObject( + Object attribute, TypeInformation<attrT> componentTypeInfo, boolean isPrimitive) { + + if (componentTypeInfo.getTypeClass().isAssignableFrom(String.class)) { + return AttributeValue.fromSs(convertObjectArrayToStringList((Object[]) attribute)); + } else if (componentTypeInfo instanceof NumericTypeInfo && !isPrimitive) { + return AttributeValue.fromNs(convertObjectArrayToStringList((Object[]) attribute)); + } else if (componentTypeInfo instanceof NumericTypeInfo) { + if (attribute instanceof int[]) { + return AttributeValue.fromNs( + (Arrays.stream((int[]) attribute) + .boxed() + .map(Object::toString) + .collect(Collectors.toList()))); + } else if (attribute instanceof double[]) { + return AttributeValue.fromNs( + (Arrays.stream((double[]) attribute) + .boxed() + .map(Object::toString) + .collect(Collectors.toList()))); + } else if (attribute instanceof long[]) { + return AttributeValue.fromNs( + (Arrays.stream((long[]) attribute) + .boxed() + .map(Object::toString) + .collect(Collectors.toList()))); + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported primitive numeric typeInfo %s", + componentTypeInfo.getTypeClass())); + } + } else if (!isPrimitive) { + List<AttributeValue> attributeValueList = new ArrayList<>(); + for (Object attributeItem : (Object[]) attribute) { + checkTypeCompatibility(attributeItem, componentTypeInfo); + attributeValueList.add(convertValue(((attrT) attributeItem), componentTypeInfo)); + } + return AttributeValue.fromL(attributeValueList); + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported primitive typeInfo %s", componentTypeInfo.getTypeClass())); + } Review Comment: This block is untested, please add tests ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); Review Comment: As per the [coding guidelines](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection) we should not use reflection. Can we instead use the `TypeExtractor` class as suggested in the docs? Another note here is that loading the class information for each record is expensive. Suggest that we lazily cache the type info based on the class. ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.dynamodb.util.ComplexPayload; +import org.apache.flink.connector.dynamodb.util.Order; +import org.apache.flink.util.FlinkRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DynamoDbTypeInformedElementConverter}. */ +public class DynamoDbTypeInformedElementConverterTest { + + @Test + public void testSimpleTypeConversion() { + DynamoDbTypeInformedElementConverter<Order> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<Order>) TypeInformation.of(Order.class)); + + Order order = new Order("orderId", 1, 2.0); + + DynamoDbWriteRequest actual = elementConverter.apply(order, null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", "total"); + assertThat(actual.getItem().get("orderId").s()).isEqualTo("orderId"); + assertThat(actual.getItem().get("quantity").n()).isEqualTo("1"); + assertThat(actual.getItem().get("total").n()).isEqualTo("2.0"); + } + + @Test + public void testComplexTypeConversion() { + TypeInformation<ComplexPayload> typeInformation = TypeInformation.of(ComplexPayload.class); + DynamoDbTypeInformedElementConverter<ComplexPayload> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<ComplexPayload>) typeInformation); + + ComplexPayload payload = + new ComplexPayload( + "stringFieldVal", + new String[] {"stringArrayFieldVal1", "stringArrayFieldVal2"}, + new int[] {10, 20}, + new ComplexPayload.InnerPayload(true, new byte[] {1, 0, 10})); + + DynamoDbWriteRequest actual = elementConverter.apply(payload, null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getItem()) + .containsOnlyKeys( + "stringField", "stringArrayField", "intArrayField", "innerPayload"); + assertThat(actual.getItem().get("stringArrayField").ss()) + .containsExactly("stringArrayFieldVal1", "stringArrayFieldVal2"); + assertThat(actual.getItem().get("intArrayField").ns()).containsExactly("10", "20"); + + assertThat(actual.getItem().get("innerPayload").m()).isNotNull(); + Map<String, AttributeValue> innerPayload = actual.getItem().get("innerPayload").m(); + + assertThat(innerPayload).containsOnlyKeys("primitiveBooleanField", "byteArrayField"); + assertThat(innerPayload.get("primitiveBooleanField").bool()).isTrue(); + assertThat(innerPayload.get("byteArrayField").b()) + .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10})); + } + + @Test + public void testCollectionTypeConversion() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + BasicTypeInfo.STRING_TYPE_INFO, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)); + List<String> stringArray = new ArrayList<>(); + stringArray.add("stringArrayFieldVal1"); + stringArray.add("stringArrayFieldVal2"); + + DynamoDbWriteRequest actual = + elementConverter.apply(Tuple2.of("stringVal", stringArray), null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getItem()).containsOnlyKeys("f0", "f1"); + assertThat(actual.getItem().get("f0").s()).isEqualTo("stringVal"); + assertThat(actual.getItem().get("f1").ss()) + .containsExactly("stringArrayFieldVal1", "stringArrayFieldVal2"); + } + + @Test + public void testUnsupportedTypeIsWrappedByFlinkException() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + BasicTypeInfo.DATE_TYPE_INFO, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)); + + Date dateVal = Date.from(Instant.now()); + + Throwable throwable = + Assertions.assertThrows( + FlinkRuntimeException.class, + () -> { + elementConverter.apply(Tuple2.of(dateVal, "stringVal"), null); + }); + + assertThat(throwable).hasCauseInstanceOf(IllegalArgumentException.class); + assertThat(throwable.getCause()) + .hasMessageContaining("Unsupported BasicTypeInfo " + BasicTypeInfo.DATE_TYPE_INFO); Review Comment: Can you use the AssertJ lib here instead? ``` Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) .isThrownBy(() -> elementConverter.apply(Tuple1.of("nan"), null)) .withMessageContaining("Incompatible type when converting nan, using Integer type information.") .withRootCauseInstanceOf(IllegalArgumentException.class); ``` ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.FlinkRuntimeException; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<inputT> + implements ElementConverter<inputT, DynamoDbWriteRequest> { + private final CompositeType<inputT> typeInfo; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context context) { + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(convertElementUsingTypeInfo(input, typeInfo)) + .build(); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException("Couldn't convert Element to AttributeVal", e); + } + } + + private <attT> Map<String, AttributeValue> convertElementUsingTypeInfo( + attT t, CompositeType<attT> typeInfo) { + Map<String, AttributeValue> map = new HashMap<>(); + for (String fieldKey : typeInfo.getFieldNames()) { + TypeInformation<attT> fieldType = typeInfo.getTypeAt(fieldKey); + try { + Field field = t.getClass().getDeclaredField(fieldKey); + field.setAccessible(true); + Object fieldVal = field.get(t); + checkTypeCompatibility(fieldVal, fieldType); + attT fieldValCaster = (attT) fieldVal; + map.put(fieldKey, convertValue(fieldValCaster, fieldType)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + String.format( + "Failed to extract field %s declared in TypeInfo " + + "from Object %s", + fieldKey, t), + e); + } + } + + return map; + } + + private <attrT> AttributeValue convertValue( + attrT attribute, TypeInformation<attrT> objectTypeInformation) { + if (attribute == null) { + return AttributeValue.builder().nul(true).build(); + } else if (objectTypeInformation.isBasicType()) { + return convertBasicTypeObject(attribute, (BasicTypeInfo<?>) objectTypeInformation); + } else if (objectTypeInformation.equals( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) { + // special handling for byte array as it has special support in DynamoDB + if (attribute instanceof byte[]) { + return AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) attribute)); + } else if (attribute instanceof SdkBytes) { + return AttributeValue.fromB((SdkBytes) attribute); + } else { + throw new IllegalArgumentException( + String.format("Failed to extract byte array type from %s", attribute)); + } + + } else if (isArrayTypeInfo(objectTypeInformation)) { + TypeInformation<?> componentTypeInfo = + objectTypeInformation instanceof BasicArrayTypeInfo + ? ((BasicArrayTypeInfo<?, ?>) objectTypeInformation).getComponentInfo() + : objectTypeInformation instanceof PrimitiveArrayTypeInfo + ? ((PrimitiveArrayTypeInfo<?>) objectTypeInformation) + .getComponentType() + : ((ObjectArrayTypeInfo<?, ?>) objectTypeInformation) + .getComponentInfo(); + if (attribute instanceof Collection) { + return convertArrayTypeObject( + ((Collection<?>) attribute).toArray(), componentTypeInfo, false); + } + + return convertArrayTypeObject( + attribute, + componentTypeInfo, + objectTypeInformation instanceof PrimitiveArrayTypeInfo); + + } else if (objectTypeInformation instanceof TupleTypeInfo) { + TupleTypeInfo<?> tupleTypeInfo = (TupleTypeInfo<?>) objectTypeInformation; + Tuple t = (Tuple) attribute; + List<AttributeValue> attributeValues = new ArrayList<>(); + for (int i = 0; i < tupleTypeInfo.getTotalFields(); i++) { + attributeValues.add(convertValue(t.getField(i), tupleTypeInfo.getTypeAt(i))); + } + return AttributeValue.fromL(attributeValues); + + } else if (objectTypeInformation instanceof CompositeType) { + return AttributeValue.fromM( + convertElementUsingTypeInfo( + attribute, (CompositeType<attrT>) objectTypeInformation)); + + } else { + throw new IllegalArgumentException( + String.format("Unsupported TypeInfo %s", objectTypeInformation)); + } + } + + private boolean isArrayTypeInfo(TypeInformation<?> typeInformation) { + return typeInformation instanceof BasicArrayTypeInfo + || typeInformation instanceof PrimitiveArrayTypeInfo + || typeInformation instanceof ObjectArrayTypeInfo; + } + + private <attrT> AttributeValue convertArrayTypeObject( + Object attribute, TypeInformation<attrT> componentTypeInfo, boolean isPrimitive) { + + if (componentTypeInfo.getTypeClass().isAssignableFrom(String.class)) { + return AttributeValue.fromSs(convertObjectArrayToStringList((Object[]) attribute)); + } else if (componentTypeInfo instanceof NumericTypeInfo && !isPrimitive) { + return AttributeValue.fromNs(convertObjectArrayToStringList((Object[]) attribute)); + } else if (componentTypeInfo instanceof NumericTypeInfo) { + if (attribute instanceof int[]) { + return AttributeValue.fromNs( + (Arrays.stream((int[]) attribute) Review Comment: As a big fan of the stream API, it pains me to say that we [should not be using it for performance critical code](https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-streams). Since this would be called for each record I would consider this performance critical/data-intensive. Please convert to standard loops, can extract to new methods for readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org