This is an automated email from the ASF dual-hosted git repository. hong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push: new 36479a9 [FLINK-35022][Connectors/DynamoDB] Add Default DynamoDB Element Converter 36479a9 is described below commit 36479a98e11c154bd1537e11e8cc51e64a0ebcb8 Author: Ahmed Hamdy <ahmed.ha...@ververica.com> AuthorDate: Mon Apr 8 09:32:13 2024 +0100 [FLINK-35022][Connectors/DynamoDB] Add Default DynamoDB Element Converter --- .../content/docs/connectors/datastream/dynamodb.md | 5 +- .../sink/DefaultDynamoDbElementConverter.java | 56 +++ .../dynamodb/sink/DynamoDbSinkBuilder.java | 4 +- .../sink/DynamoDbTypeInformedElementConverter.java | 380 +++++++++++++++++++ .../sink/DefaultDynamoDbElementConverterTest.java | 52 +++ .../connector/dynamodb/sink/DynamoDbSinkTest.java | 24 +- .../DynamoDbTypeInformedElementConverterTest.java | 413 +++++++++++++++++++++ .../connector/dynamodb/util/ComplexPayload.java | 121 ++++++ 8 files changed, 1039 insertions(+), 16 deletions(-) diff --git a/docs/content/docs/connectors/datastream/dynamodb.md b/docs/content/docs/connectors/datastream/dynamodb.md index d0f6d7c..834ac66 100644 --- a/docs/content/docs/connectors/datastream/dynamodb.md +++ b/docs/content/docs/connectors/datastream/dynamodb.md @@ -133,7 +133,10 @@ Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.<Inpu ## Element Converter An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided -`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` objects. For more information on supported +`DefaultDynamoDbElementConverter` which extracts item schema from element class, this requires the element class to be of composite type (i.e. Pojo, Tuple or Row). In case TypeInformation of the elements is present the schema is eagerly constructed by using `DynamoDbTypeInformedElementConverter` as in `new DynamoDbTypeInformedElementConverter(TypeInformation.of(MyPojo.class))`. + + +Alternatively when you are working with `@DynamoDbBean` objects you can use `DynamoDbBeanElementConverter`. For more information on supported annotations see [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema). A sample application using a custom `ElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java). A sample application using the `DynamoDbBeanElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dyna [...] diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java new file mode 100644 index 0000000..7e0f911 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +/** + * Default implementation of {@link ElementConverter} that lazily falls back to {@link + * DynamoDbTypeInformedElementConverter}. + */ +@PublicEvolving +public class DefaultDynamoDbElementConverter<T> + implements ElementConverter<T, DynamoDbWriteRequest> { + + private ElementConverter<T, DynamoDbWriteRequest> elementConverter; + + public DefaultDynamoDbElementConverter() {} + + @Override + public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) { + if (elementConverter == null) { + TypeInformation<T> typeInfo = (TypeInformation<T>) TypeInformation.of(t.getClass()); + if (!(typeInfo instanceof CompositeType<?>)) { + throw new IllegalArgumentException("The input type must be a CompositeType."); + } + + elementConverter = + new DynamoDbTypeInformedElementConverter<>((CompositeType<T>) typeInfo); + } + + return elementConverter.apply(t, context); + } + + @Override + public void open(Sink.InitContext context) {} +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java index b2592b6..968c206 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkBuilder.java @@ -58,6 +58,7 @@ import java.util.Properties; * <p>If the following parameters are not set in this builder, the following defaults will be used: * * <ul> + * <li>{@code elementConverter} will be {@link DefaultDynamoDbElementConverter} * <li>{@code maxBatchSize} will be 25 * <li>{@code maxInFlightRequests} will be 50 * <li>{@code maxBufferedRequests} will be 10000 @@ -145,7 +146,8 @@ public class DynamoDbSinkBuilder<InputT> @Override public DynamoDbSink<InputT> build() { return new DynamoDbSink<>( - elementConverter, + Optional.ofNullable(elementConverter) + .orElse(new DefaultDynamoDbElementConverter<>()), Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE), Optional.ofNullable(getMaxInFlightRequests()) .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS), diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java new file mode 100644 index 0000000..4978f46 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter<T> + implements ElementConverter<T, DynamoDbWriteRequest> { + + private final CompositeType<T> typeInfo; + private final boolean ignoreNulls; + private final TableSchema<T> tableSchema; + + /** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ + public DynamoDbTypeInformedElementConverter(CompositeType<T> typeInfo) { + this(typeInfo, true); + } + + public DynamoDbTypeInformedElementConverter(CompositeType<T> typeInfo, boolean ignoreNulls) { + + try { + this.typeInfo = typeInfo; + this.ignoreNulls = ignoreNulls; + this.tableSchema = createTableSchema(typeInfo); + } catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { + throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); + } + } + + @Override + public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { + Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); + try { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(tableSchema.itemToMap(input, ignoreNulls)) + .build(); + } catch (ClassCastException | IllegalArgumentException e) { + throw new FlinkRuntimeException( + String.format( + "Failed to convert %s to DynamoDbWriteRequest using %s", + input, typeInfo), + e); + } + } + + private <AttributeT> TableSchema<AttributeT> createTableSchema( + CompositeType<AttributeT> typeInfo) throws IntrospectionException { + if (typeInfo instanceof RowTypeInfo) { + return (TableSchema<AttributeT>) createTableSchemaFromRowType((RowTypeInfo) typeInfo); + } else if (typeInfo instanceof PojoTypeInfo<?>) { + return createTableSchemaFromPojo((PojoTypeInfo<AttributeT>) typeInfo); + } else if (typeInfo instanceof TupleTypeInfo<?>) { + return createTableSchemaFromTuple((TupleTypeInfo<?>) typeInfo); + } else { + throw new IllegalArgumentException(String.format("Unsupported TypeInfo %s", typeInfo)); + } + } + + private TableSchema<Row> createTableSchemaFromRowType(RowTypeInfo typeInfo) { + StaticTableSchema.Builder<Row> tableSchemaBuilder = + StaticTableSchema.builder(typeInfo.getTypeClass()); + + String[] fieldNames = typeInfo.getFieldNames(); + for (int i = 0; i < typeInfo.getArity(); i++) { + TypeInformation<?> fieldType = typeInfo.getTypeAt(i); + int finalI = i; + addAttribute( + tableSchemaBuilder, + fieldNames[finalI], + (AttributeT) -> AttributeT.getField(finalI), + (TypeInformation<? super Object>) fieldType); + } + + return tableSchemaBuilder.build(); + } + + private <AttributeT> TableSchema<AttributeT> createTableSchemaFromTuple( + TupleTypeInfo<?> typeInfo) { + TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes(); + String[] fieldNames = typeInfo.getFieldNames(); + + StaticTableSchema.Builder<?> tableSchemaBuilder = + StaticTableSchema.builder(typeInfo.getTypeClass()); + for (int i = 0; i < fieldNames.length; i++) { + int finalI = i; + addAttribute( + tableSchemaBuilder, + fieldNames[finalI], + (tuple) -> ((Tuple) tuple).getField(finalI), + (TypeInformation<?>) fieldTypes[i]); + } + + return (TableSchema<AttributeT>) tableSchemaBuilder.build(); + } + + private <AttributeT> TableSchema<AttributeT> createTableSchemaFromPojo( + PojoTypeInfo<AttributeT> typeInfo) throws IntrospectionException { + StaticTableSchema.Builder<AttributeT> tableSchemaBuilder = + StaticTableSchema.builder(typeInfo.getTypeClass()); + BeanInfo beanInfo = Introspector.getBeanInfo(typeInfo.getTypeClass()); + PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); + for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { + Set<String> fieldNames = new HashSet<>(Arrays.asList(typeInfo.getFieldNames())); + if (!fieldNames.contains(propertyDescriptor.getName())) { + // Skip properties that are not part of the PojoTypeInfo + continue; + } + + TypeInformation<?> fieldInfo = typeInfo.getTypeAt(propertyDescriptor.getName()); + addAttribute( + tableSchemaBuilder, + propertyDescriptor.getName(), + BeanAttributeGetter.create( + typeInfo.getTypeClass(), propertyDescriptor.getReadMethod()), + fieldInfo); + } + + return tableSchemaBuilder.build(); + } + + private <T, AttributeT> void addAttribute( + StaticTableSchema.Builder<T> builder, + String fieldName, + Function<T, AttributeT> getter, + TypeInformation<AttributeT> typeInfo) { + builder.addAttribute( + typeInfo.getTypeClass(), + a -> + a.name(fieldName) + .getter(getter) + .setter((e, o) -> {}) + .attributeConverter(getAttributeConverter(typeInfo))); + } + + private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter( + TypeInformation<AttributeT> typeInfo) { + if (typeInfo instanceof BasicTypeInfo) { + return AttributeConverterProvider.defaultProvider() + .converterFor(EnhancedType.of(typeInfo.getTypeClass())); + } else if (typeInfo.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) { + return getAttributeConverter( + AttributeValueType.B, + bytes -> + bytes instanceof SdkBytes + ? AttributeValue.fromB((SdkBytes) bytes) + : AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) bytes))); + } else if (typeInfo instanceof BasicArrayTypeInfo) { + BasicArrayTypeInfo<AttributeT, ?> basicArrayTypeInfo = + (BasicArrayTypeInfo<AttributeT, ?>) typeInfo; + if (basicArrayTypeInfo.getComponentInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) { + return getAttributeConverter( + AttributeValueType.SS, + array -> AttributeValue.fromSs(Arrays.asList((String[]) array))); + } else if (basicArrayTypeInfo.getComponentInfo() instanceof NumericTypeInfo) { + return getAttributeConverter( + AttributeValueType.NS, + array -> + AttributeValue.fromNs( + convertObjectArrayToStringList((Object[]) array))); + } + + return new ArrayAttributeConverterProvider() + .converterFor(EnhancedType.of(typeInfo.getTypeClass())); + } else if (typeInfo instanceof ObjectArrayTypeInfo) { + return getObjectArrayTypeConverter((ObjectArrayTypeInfo<AttributeT, ?>) typeInfo); + } else if (typeInfo instanceof PrimitiveArrayTypeInfo) { + PrimitiveArrayTypeInfo<AttributeT> primitiveArrayTypeInfo = + (PrimitiveArrayTypeInfo<AttributeT>) typeInfo; + if (primitiveArrayTypeInfo.getComponentType() instanceof NumericTypeInfo) { + return getAttributeConverter( + AttributeValueType.NS, + array -> AttributeValue.fromNs(convertPrimitiveArrayToStringList(array))); + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported primitive array typeInfo %s", + primitiveArrayTypeInfo.getComponentType())); + } + } else if (typeInfo instanceof TupleTypeInfo<?>) { + return (AttributeConverter<AttributeT>) + getTupleTypeConverter((TupleTypeInfo<?>) typeInfo); + } else if (typeInfo instanceof CompositeType) { + try { + TableSchema<AttributeT> schema = + createTableSchema((CompositeType<AttributeT>) typeInfo); + return getAttributeConverter( + AttributeValueType.M, + o -> AttributeValue.fromM(schema.itemToMap(o, false))); + } catch (IntrospectionException e) { + throw new FlinkRuntimeException("Failed to extract nested table schema", e); + } + } else { + throw new IllegalArgumentException(String.format("Unsupported TypeInfo %s", typeInfo)); + } + } + + private <TupleT extends Tuple> AttributeConverter<TupleT> getTupleTypeConverter( + TupleTypeInfo<TupleT> typeInfo) { + AttributeConverter<?>[] tupleFieldConverters = + new AttributeConverter<?>[typeInfo.getArity()]; + for (int i = 0; i < typeInfo.getArity(); i++) { + tupleFieldConverters[i] = (getAttributeConverter(typeInfo.getTypeAt(i))); + } + + return getAttributeConverter( + AttributeValueType.L, + tuple -> { + List<AttributeValue> attributeValues = new ArrayList<>(); + for (int i = 0; i < typeInfo.getArity(); i++) { + attributeValues.add( + tupleFieldConverters[i].transformFrom(tuple.getField(i))); + } + return AttributeValue.fromL(attributeValues); + }); + } + + private <ArrayT, AttributeT> AttributeConverter<ArrayT> getObjectArrayTypeConverter( + ObjectArrayTypeInfo<ArrayT, AttributeT> typeInfo) { + AttributeConverter<AttributeT> componentAttributeConverter = + getAttributeConverter(typeInfo.getComponentInfo()); + return getAttributeConverter( + AttributeValueType.L, + array -> { + AttributeT[] attrArray = (AttributeT[]) array; + List<AttributeValue> attributeValues = new ArrayList<>(); + for (AttributeT attr : attrArray) { + attributeValues.add(componentAttributeConverter.transformFrom(attr)); + } + return AttributeValue.fromL(attributeValues); + }); + } + + private List<String> convertObjectArrayToStringList(Object[] objectArray) { + List<String> stringList = new ArrayList<>(); + for (Object object : objectArray) { + stringList.add(object.toString()); + } + return stringList; + } + + private List<String> convertPrimitiveArrayToStringList(Object objectArray) { + if (objectArray instanceof int[]) { + int[] intArray = (int[]) objectArray; + List<String> stringList = new ArrayList<>(); + for (int i : intArray) { + stringList.add(Integer.toString(i)); + } + + return stringList; + } else if (objectArray instanceof double[]) { + double[] doubleArray = (double[]) objectArray; + List<String> stringList = new ArrayList<>(); + for (double d : doubleArray) { + stringList.add(Double.toString(d)); + } + + return stringList; + } else if (objectArray instanceof long[]) { + long[] longArray = (long[]) objectArray; + List<String> stringList = new ArrayList<>(); + for (long l : longArray) { + stringList.add(Long.toString(l)); + } + + return stringList; + } else if (objectArray instanceof float[]) { + float[] longArray = (float[]) objectArray; + List<String> stringList = new ArrayList<>(); + for (float f : longArray) { + stringList.add(Float.toString(f)); + } + + return stringList; + } else if (objectArray instanceof short[]) { + short[] longArray = (short[]) objectArray; + List<String> stringList = new ArrayList<>(); + for (short s : longArray) { + stringList.add(Short.toString(s)); + } + + return stringList; + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported primitive typeInfo %s", + objectArray.getClass().getComponentType())); + } + } + + private <AttributeT> AttributeConverter<AttributeT> getAttributeConverter( + AttributeValueType attributeValueType, + Function<AttributeT, AttributeValue> transformer) { + return new AttributeConverter<AttributeT>() { + @Override + public AttributeValue transformFrom(AttributeT attribute) { + return transformer.apply(attribute); + } + + @Override + public AttributeT transformTo(AttributeValue attributeValue) { + return null; + } + + @Override + public EnhancedType<AttributeT> type() { + return null; + } + + @Override + public AttributeValueType attributeValueType() { + return attributeValueType; + } + }; + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java new file mode 100644 index 0000000..c8c6fb4 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverterTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.connector.dynamodb.util.Order; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** Test for {@link DefaultDynamoDbElementConverter}. */ +class DefaultDynamoDbElementConverterTest { + + @Test + void defaultConverterFallsBackToInformedConverter() { + DefaultDynamoDbElementConverter<Order> converter = new DefaultDynamoDbElementConverter<>(); + Order order = new Order("order-1", 1, 100.0); + + DynamoDbWriteRequest request = converter.apply(order, null); + assertThat(converter).hasNoNullFieldsOrProperties(); + assertThat(request.getItem()).isNotNull(); + assertThat(request.getItem().get("orderId").s()).isEqualTo("order-1"); + assertThat(request.getItem().get("quantity").n()).isEqualTo("1"); + assertThat(request.getItem().get("total").n()).isEqualTo("100.0"); + } + + @Test + void defaultConverterThrowsExceptionForNonCompositeType() { + DefaultDynamoDbElementConverter<String> converter = new DefaultDynamoDbElementConverter<>(); + String str = "test"; + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> converter.apply(str, null)) + .withMessage("The input type must be a CompositeType."); + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java index 9cf198e..26f75b0 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java @@ -41,23 +41,19 @@ public class DynamoDbSinkTest { @Test public void testSuccessfullyCreateWithMinimalConfiguration() { - DynamoDbSink.<Map<String, AttributeValue>>builder() - .setElementConverter(new TestDynamoDbElementConverter()) - .setTableName("test_table") - .build(); + DynamoDbSink.<Map<String, AttributeValue>>builder().setTableName("test_table").build(); } @Test - public void testElementConverterRequired() { - assertThatExceptionOfType(NullPointerException.class) - .isThrownBy( - () -> - DynamoDbSink.builder() - .setTableName("test_table") - .setFailOnError(true) - .build()) - .withMessageContaining( - "ElementConverter must be not null when initializing the AsyncSinkBase."); + public void testElementConverterUsesDefaultConverterIfNotSet() { + DynamoDbSink<String> sink = + DynamoDbSink.<String>builder() + .setTableName("test_table") + .setFailOnError(true) + .build(); + assertThat(sink) + .extracting("elementConverter") + .isInstanceOf(DefaultDynamoDbElementConverter.class); } @Test diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java new file mode 100644 index 0000000..66d251b --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverterTest.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.dynamodb.util.ComplexPayload; +import org.apache.flink.connector.dynamodb.util.Order; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.types.RowUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DynamoDbTypeInformedElementConverter}. */ +public class DynamoDbTypeInformedElementConverterTest { + + @Test + void simpleTypeConversion() { + DynamoDbTypeInformedElementConverter<Order> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<Order>) TypeInformation.of(Order.class)); + + Order order = new Order("orderId", 1, 2.0); + + DynamoDbWriteRequest actual = elementConverter.apply(order, null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", "total"); + assertThat(actual.getItem().get("orderId").s()).isEqualTo("orderId"); + assertThat(actual.getItem().get("quantity").n()).isEqualTo("1"); + assertThat(actual.getItem().get("total").n()).isEqualTo("2.0"); + } + + @Test + void complexTypeConversion() { + TypeInformation<ComplexPayload> typeInformation = TypeInformation.of(ComplexPayload.class); + DynamoDbTypeInformedElementConverter<ComplexPayload> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<ComplexPayload>) typeInformation); + + ComplexPayload payload = + new ComplexPayload( + "stringFieldVal", + new String[] {"stringArrayFieldVal1", "stringArrayFieldVal2"}, + new int[] {10, 20}, + new ComplexPayload.InnerPayload(true, new byte[] {1, 0, 10}), + Tuple2.of(1, "tuple2FieldVal")); + + DynamoDbWriteRequest actual = elementConverter.apply(payload, null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getItem()) + .containsOnlyKeys( + "stringField", + "stringArrayField", + "intArrayField", + "innerPayload", + "tupleField"); + assertThat(actual.getItem().get("stringArrayField").ss()) + .containsExactly("stringArrayFieldVal1", "stringArrayFieldVal2"); + assertThat(actual.getItem().get("intArrayField").ns()).containsExactly("10", "20"); + + // verify tupleField + assertThat(actual.getItem().get("tupleField").l()).isNotNull(); + assertThat(actual.getItem().get("tupleField").l()).hasSize(2); + assertThat(actual.getItem().get("tupleField").l().get(0).n()).isEqualTo("1"); + assertThat(actual.getItem().get("tupleField").l().get(1).s()).isEqualTo("tuple2FieldVal"); + + // verify innerPayload + assertThat(actual.getItem().get("innerPayload").m()).isNotNull(); + Map<String, AttributeValue> innerPayload = actual.getItem().get("innerPayload").m(); + + assertThat(innerPayload).containsOnlyKeys("primitiveBooleanField", "byteArrayField"); + assertThat(innerPayload.get("primitiveBooleanField").bool()).isTrue(); + assertThat(innerPayload.get("byteArrayField").b()) + .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10})); + } + + @Test + void convertTupleType() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); + + Tuple element = Tuple2.of("stringVal", 10); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0", "f1"); + assertThat(actual.getItem().get("f0").s()).isEqualTo("stringVal"); + assertThat(actual.getItem().get("f1").n()).isEqualTo("10"); + } + + @Test + void convertRowType() { + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }, + new String[] {"stringRowFiled", "IntRowField", "DoubleRowField"}); + DynamoDbTypeInformedElementConverter<Row> elementConverter = + new DynamoDbTypeInformedElementConverter<>(rowTypeInfo); + + LinkedHashMap<String, Integer> rowMap = new LinkedHashMap<>(); + rowMap.put("stringRowFiled", 1); + rowMap.put("IntRowField", 2); + rowMap.put("DoubleRowField", 3); + Row element = + RowUtils.createRowWithNamedPositions( + RowKind.INSERT, new Object[] {"stringVal", 10, 20.0}, rowMap); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()) + .containsOnlyKeys("stringRowFiled", "IntRowField", "DoubleRowField"); + assertThat(actual.getItem().get("stringRowFiled").s()).isEqualTo("stringVal"); + assertThat(actual.getItem().get("IntRowField").n()).isEqualTo("10"); + assertThat(actual.getItem().get("DoubleRowField").n()).isEqualTo("20.0"); + } + + @Test + void convertObjectArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Order.class)))); + + Tuple element = + Tuple1.of( + new Order[] {new Order("orderId1", 1, 2.0), new Order("orderId2", 3, 4.0)}); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").l()).hasSize(2); + assertThat(actual.getItem().get("f0").l().get(0).m()) + .containsOnlyKeys("orderId", "quantity", "total"); + assertThat(actual.getItem().get("f0").l().get(0).m().get("orderId").s()) + .isEqualTo("orderId1"); + assertThat(actual.getItem().get("f0").l().get(0).m().get("quantity").n()).isEqualTo("1"); + assertThat(actual.getItem().get("f0").l().get(0).m().get("total").n()).isEqualTo("2.0"); + assertThat(actual.getItem().get("f0").l().get(1).m()) + .containsOnlyKeys("orderId", "quantity", "total"); + assertThat(actual.getItem().get("f0").l().get(1).m().get("orderId").s()) + .isEqualTo("orderId2"); + assertThat(actual.getItem().get("f0").l().get(1).m().get("quantity").n()).isEqualTo("3"); + assertThat(actual.getItem().get("f0").l().get(1).m().get("total").n()).isEqualTo("4.0"); + } + + @Test + void convertLongPrimitiveArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of(new long[] {1, 2, 3}); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").ns()).containsExactly("1", "2", "3"); + } + + @Test + void convertFloatPrimitiveArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of(new float[] {1.0f, 2.0f, 3.0f}); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").ns()).containsExactly("1.0", "2.0", "3.0"); + } + + @Test + void convertDoublePrimitiveArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of(new double[] {1.0, 2.0, 3.0}); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").ns()).containsExactly("1.0", "2.0", "3.0"); + } + + @Test + void convertShortPrimitiveArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of(new short[] {1, 2, 3}); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").ns()).containsExactly("1", "2", "3"); + } + + @Test + void unsupportedTypeIsWrappedByFlinkException() { + + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy( + () -> + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + BasicTypeInfo.DATE_TYPE_INFO, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO))) + .withCauseInstanceOf(IllegalStateException.class) + .withMessageContaining("Failed to extract DynamoDb table schema"); + } + + @Test + void unmatchedTypeIsWrappedByFlinkException() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO)); + + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy(() -> elementConverter.apply(Tuple1.of("nan"), null)) + .withCauseInstanceOf(ClassCastException.class); + } + + @Test + void convertOrderToDynamoDbWriteRequestWithIgnoresNullByDefault() { + DynamoDbTypeInformedElementConverter<Order> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<Order>) TypeInformation.of(Order.class)); + Order order = new Order(null, 1, 2.0); + + DynamoDbWriteRequest actual = elementConverter.apply(order, null); + assertThat(actual.getItem()).containsOnlyKeys("quantity", "total"); + } + + @Test + void convertOrderToDynamoDbWriteRequestWritesNullIfConfigured() { + DynamoDbTypeInformedElementConverter<Order> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + (CompositeType<Order>) TypeInformation.of(Order.class), false); + Order order = new Order(null, 1, 2.0); + + DynamoDbWriteRequest actual = elementConverter.apply(order, null); + + assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", "total"); + assertThat(actual.getItem().get("orderId").nul()).isTrue(); + } + + @Test + void convertSdkBytesAsTypeArray() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of(SdkBytes.fromByteArray(new byte[] {1, 0, 10})); + + DynamoDbWriteRequest actual = elementConverter.apply(element, null); + assertThat(actual.getItem()).containsOnlyKeys("f0"); + assertThat(actual.getItem().get("f0").b()) + .isEqualTo(SdkBytes.fromByteArray(new byte[] {1, 0, 10})); + } + + @Test + void convertInvalidByteArrayThrowsException() { + DynamoDbTypeInformedElementConverter<Tuple> elementConverter = + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); + + Tuple element = Tuple1.of("invalidByteArray"); + + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy(() -> elementConverter.apply(element, null)) + .withCauseInstanceOf(ClassCastException.class); + } + + @Test + void convertUnsupportedPrimitiveArrayThrowsException() { + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy( + () -> + new DynamoDbTypeInformedElementConverter<>( + new TupleTypeInfo<>( + PrimitiveArrayTypeInfo + .BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO))) + .withCauseInstanceOf(IllegalArgumentException.class) + .havingCause() + .withMessageContaining( + "Unsupported primitive array typeInfo " + BasicTypeInfo.BOOLEAN_TYPE_INFO); + } + + @Test + void convertUnsupportedCompositeTypeThrowsException() { + CompositeType<Order> unsupportedCompositeType = + new CompositeType<Order>(Order.class) { + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 0; + } + + @Override + public <T> TypeInformation<T> getTypeAt(int pos) { + return null; + } + + @Override + protected TypeComparatorBuilder<Order> createTypeComparatorBuilder() { + return null; + } + + @Override + public <T> TypeInformation<T> getTypeAt(String fieldExpression) { + return null; + } + + @Override + public String[] getFieldNames() { + return new String[0]; + } + + @Override + public int getFieldIndex(String fieldName) { + return 0; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<Order> createSerializer(ExecutionConfig executionConfig) { + return null; + } + + @Override + public Class<Order> getTypeClass() { + return null; + } + + @Override + public void getFlatFields(String s, int i, List<FlatFieldDescriptor> list) {} + }; + + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy( + () -> + new DynamoDbTypeInformedElementConverter<>(unsupportedCompositeType) + .open(null)) + .withCauseInstanceOf(IllegalArgumentException.class) + .havingCause() + .withMessageContaining("Unsupported TypeInfo " + unsupportedCompositeType); + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java new file mode 100644 index 0000000..c2e3c33 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/ComplexPayload.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.dynamodb.sink.DynamoDbTypeInformedElementConverter; + +import java.io.Serializable; + +/** A test POJO for use with {@link DynamoDbTypeInformedElementConverter}. */ +public class ComplexPayload implements Serializable { + private static final long serialVersionUID = 233624606545704853L; + + private String stringField; + private String[] stringArrayField; + private int[] intArrayField; + private InnerPayload innerPayload; + private Tuple2<Integer, String> tupleField; + + public ComplexPayload() {} + + public ComplexPayload( + String stringField, + String[] stringArrayField, + int[] intArrayField, + InnerPayload innerPayload, + Tuple2<Integer, String> tupleField) { + this.stringField = stringField; + this.stringArrayField = stringArrayField; + this.intArrayField = intArrayField; + this.innerPayload = innerPayload; + this.tupleField = tupleField; + } + + public String getStringField() { + return stringField; + } + + public String[] getStringArrayField() { + return stringArrayField; + } + + public int[] getIntArrayField() { + return intArrayField; + } + + public InnerPayload getInnerPayload() { + return innerPayload; + } + + public Tuple2<Integer, String> getTupleField() { + return tupleField; + } + + public void setStringField(String stringField) { + this.stringField = stringField; + } + + public void setStringArrayField(String[] stringArrayField) { + this.stringArrayField = stringArrayField; + } + + public void setIntArrayField(int[] intArrayField) { + this.intArrayField = intArrayField; + } + + public void setInnerPayload(InnerPayload innerPayload) { + this.innerPayload = innerPayload; + } + + public void setTupleField(Tuple2<Integer, String> tupleField) { + this.tupleField = tupleField; + } + + /** A test POJO for use as InnerPayload for {@link ComplexPayload}. */ + public static class InnerPayload implements Serializable { + private static final long serialVersionUID = 3986298180012117883L; + + private boolean primitiveBooleanField; + private byte[] byteArrayField; + + public InnerPayload() {} + + public InnerPayload(boolean primitiveBooleanField, byte[] byteArrayField) { + this.primitiveBooleanField = primitiveBooleanField; + this.byteArrayField = byteArrayField; + } + + public boolean getPrimitiveBooleanField() { + return primitiveBooleanField; + } + + public byte[] getByteArrayField() { + return byteArrayField; + } + + public void setPrimitiveBooleanField(boolean primitiveBooleanField) { + this.primitiveBooleanField = primitiveBooleanField; + } + + public void setByteArrayField(byte[] byteArrayField) { + this.byteArrayField = byteArrayField; + } + } +}