Thanks Hong, I moved the AttributeValue creation into the ElementConverter
and it started working without any custom serde work!

The reason for creating AttributeValue instances in a previous operator is
that I was closely following the example code:
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java

Thank you again for your help and sharing those resources.

Cheers,
Matt.


On Wed, 9 Nov 2022 at 03:51, Teoh, Hong <lian...@amazon.co.uk> wrote:

> Hi Matt,
>
>
>
> First of all, awesome that you are using the DynamoDB sink!
>
>
>
> To resolve your issue with serialization in the DDB sink, you are right,
> the issue only happens when you create the AttributeValue object in a
> previous operator and send it to the sink.
>
> The issue here is with serializing of ImmutableMap. Kryo tries to call the
> put(), which is unsupported since its immutable, so you can register a
> specific serializer for it. Like below:
>
>
>
> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
> ImmutableMapSerializer.class);
>
>
>
> You can get ImmutableMapSerializer.class from a pre-package library like
> this: https://github.com/magro/kryo-serializers
>
> Just add the following to your pom.xml
>
>
>
> <dependency>
>
>     <groupId>de.javakaffee</groupId>
>
>     <artifactId>kryo-serializers</artifactId>
>
>     <version>0.45</version>
>
> </dependency>
>
>
>
> Regarding resources, I find the following helpful:
>
>    - Article on serialization
>    - The FlinkForward youtube channel has a couple of useful deep dives
>    on Flink in general :
>    https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists
>
>
>
> Hope the above helps.
>
>
>
>
>
> A more general question on your use case, what is the reason you want to
> generate the AttributeValue in a previous operator rather than in the sink
> directly? Is it for some dynamic generation of objects to write into DDB?
>
>
>
> Regards,
>
> Hong
>
>
>
>
>
> *From: *Matt Fysh <mattf...@gmail.com>
> *Date: *Tuesday, 8 November 2022 at 14:04
> *To: *User <user@flink.apache.org>
> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb
> connector
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I'm attempting to use the dynamodb sink located at
> https://github.com/apache/flink-connector-aws
>
>
>
> The example
> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
> in the repo is working as expected, however when I try to create a nested
> data structure, I receive a Kryo serialization error message:
>
>
>
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
>
> Serialization trace:
>
> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
>
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>
>
>
> The value that cannot be serialized is produced by this code:
>
> import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
>
>
>
> AttributeValue.builder().m(
>
>   ImmutableMap.of(
>
>     "innerkey", AttributeValue.builder().s("innervalue").build()
>
>   )
>
> ).build();
>
>
>
> There are tests in the connector repo
> <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
> for nested map structures, but they do not test that the structure can be
> ser/de by Flink, which I believe occurs when the operator that produces the
> value is separate to the sink operator.
>
>
>
> Given that this is a fairly simple data type, I should be able to register
> a custom serializer with Flink, but since I'm new to java I'm having
> trouble making sense of the docs
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
> and was hoping to find someone more knowledgeable in this area for some
> pointers on what else I could start reading
>
>
>
> Thanks
>
> Matt
>

Reply via email to