Ahmed Hamdy created FLINK-35022: ----------------------------------- Summary: Add TypeInformed Element Converter for DynamoDbSink Key: FLINK-35022 URL: https://issues.apache.org/jira/browse/FLINK-35022 Project: Flink Issue Type: Improvement Components: Connectors / DynamoDB Affects Versions: aws-connector-4.3.0 Reporter: Ahmed Hamdy
h2. Context {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert Flink stream objects to DynamoDb write requests, where item is represented as {{Map<String, AttributeValue[1]>}}. {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a format similar with type identification properties as in {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}. Since TypeInformation is already natively supported in Flink, many implementations of the DynamoDb ElementConverted is just a boiler plate. For example {code:title="Simple POJO Element Conversion"} public class Order { String id; int quantity; double total; } {code} The implementation of the converter must be {code:title="Simple POJO DDB Element Converter"} public static class SimplePojoElementConverter implements ElementConverter<Order, DynamoDbWriteRequest> { @Override public DynamoDbWriteRequest apply(Order order, SinkWriter.Context context) { Map<String, AttributeValue> itemMap = new HashMap<>(); itemMap.put("id", AttributeValue.builder().s(order.id).build()); itemMap.put("quantity", AttributeValue.builder().n(String.valueOf(order.quantity)).build()); itemMap.put("total", AttributeValue.builder().n(String.valueOf(order.total)).build()); return DynamoDbWriteRequest.builder() .setType(DynamoDbWriteRequestType.PUT) .setItem(itemMap) .build(); } @Override public void open(Sink.InitContext context) { } } {code} while this might not be too much of work, however it is a fairly common case in Flink and this implementation requires some fair knowledge of DDB model for new users. h2. Proposal Introduce {{ DynamoDbTypeInformedElementConverter}} as follows: {code:title="TypeInformedElementconverter"} public class DynamoDbTypeInformedElementConverter<inputT> implements ElementConverter<inputT, DynamoDbWriteRequest> { DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo); public DynamoDbWriteRequest convertElement(input) { switch this.typeInfo{ case: BasicTypeInfo.STRING_TYPE_INFO: return input -> AttributeValue.fromS(o.toString()) case: BasicTypeInfo.SHORT_TYPE_INFO: case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> AttributeValue.fromN(o.toString()) case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input)) ..... } } } // User Code public static void main(String []args) { DynamoDbTypeInformedElementConverter elementConverter = new DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class)); DdbSink.setElementConverter(elementConverter); } {code} We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which should be enough to cover all DDB supported types (s,n,bool,b,ss,ns,bs,bools,m,l) 1- https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html -- This message was sent by Atlassian Jira (v8.20.10#820010)