Thanks Hong, I moved the AttributeValue creation into the ElementConverter and it started working without any custom serde work!
The reason for creating AttributeValue instances in a previous operator is that I was closely following the example code: https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java Thank you again for your help and sharing those resources. Cheers, Matt. On Wed, 9 Nov 2022 at 03:51, Teoh, Hong <lian...@amazon.co.uk> wrote: > Hi Matt, > > > > First of all, awesome that you are using the DynamoDB sink! > > > > To resolve your issue with serialization in the DDB sink, you are right, > the issue only happens when you create the AttributeValue object in a > previous operator and send it to the sink. > > The issue here is with serializing of ImmutableMap. Kryo tries to call the > put(), which is unsupported since its immutable, so you can register a > specific serializer for it. Like below: > > > > env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, > ImmutableMapSerializer.class); > > > > You can get ImmutableMapSerializer.class from a pre-package library like > this: https://github.com/magro/kryo-serializers > > Just add the following to your pom.xml > > > > <dependency> > > <groupId>de.javakaffee</groupId> > > <artifactId>kryo-serializers</artifactId> > > <version>0.45</version> > > </dependency> > > > > Regarding resources, I find the following helpful: > > - Article on serialization > - The FlinkForward youtube channel has a couple of useful deep dives > on Flink in general : > https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists > > > > Hope the above helps. > > > > > > A more general question on your use case, what is the reason you want to > generate the AttributeValue in a previous operator rather than in the sink > directly? Is it for some dynamic generation of objects to write into DDB? > > > > Regards, > > Hong > > > > > > *From: *Matt Fysh <mattf...@gmail.com> > *Date: *Tuesday, 8 November 2022 at 14:04 > *To: *User <user@flink.apache.org> > *Subject: *[EXTERNAL] How to write custom serializer for dynamodb > connector > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > I'm attempting to use the dynamodb sink located at > https://github.com/apache/flink-connector-aws > > > > The example > <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java> > in the repo is working as expected, however when I try to create a nested > data structure, I receive a Kryo serialization error message: > > > > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException > > Serialization trace: > > m (software.amazon.awssdk.services.dynamodb.model.AttributeValue) > > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > > > > The value that cannot be serialized is produced by this code: > > import software.amazon.awssdk.services.dynamodb.model.AttributeValue; > > > > AttributeValue.builder().m( > > ImmutableMap.of( > > "innerkey", AttributeValue.builder().s("innervalue").build() > > ) > > ).build(); > > > > There are tests in the connector repo > <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84> > for nested map structures, but they do not test that the structure can be > ser/de by Flink, which I believe occurs when the operator that produces the > value is separate to the sink operator. > > > > Given that this is a fairly simple data type, I should be able to register > a custom serializer with Flink, but since I'm new to java I'm having > trouble making sense of the docs > <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/> > and was hoping to find someone more knowledgeable in this area for some > pointers on what else I could start reading > > > > Thanks > > Matt >