Jacob Jona Fahlenkamp created FLINK-39263:
---------------------------------------------

             Summary: Support custom codecs in MongoDB Sink
                 Key: FLINK-39263
                 URL: https://issues.apache.org/jira/browse/FLINK-39263
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / MongoDB
            Reporter: Jacob Jona Fahlenkamp


The MongoDB Sink currently supports serializing records into BsonDocument via 
MongoSerializationSchema. This requires manual conversion of data types into 
BsonDocument before they can be written to MongoDB.

The MongoDB Java Driver provides a Codec API that allows for direct encoding of 
any type to BSON. Supporting this API in the MongoDB Sink would allow for more 
direct writing of various data types.

This is important for migrating existing code that uses codecs to Flink, and 
for performance-critical use cases where intermediate conversion to 
BsonDocument is too expensive.

Furthermore, by using the automatic PojoCodecProvider, we could support POJOs 
by default. This removes the boilerplate for mapping to BsonDocument without 
requiring the manual implementation of a custom codec.

Suggested changes:
 # MongoCodecSchema<T>: Add an interface to describe the type being written and 
provide the CodecRegistry to be used by the internal MongoDB client.
 # The default implementation returns a codec registry that supporting standard 
MongoDB types and supports automatic POJO mapping.
 # MongoSinkBuilder:
 ** Add setCodecSchema(MongoCodecSchema<IN> codecSchema) to allow providing 
codec configurations.
 ** Add setWriteModelMapper(SerializableFunction<IN, WriteModel<IN>> mapper) to 
allow mapping input records directly to a WriteModel<IN>, leveraging the 
configured codecs.
 ** The purpose of adding a separate mapper is to prevent breaking changes to 
the existing MongoSerializationSchema, which forces mapping to 
WriteModel<BsonDocument>.
 # MongoWriter:
 ** Update the writer to support both the existing MongoSerializationSchema and 
the codec-based approach.
 ** Initialize the internal MongoClient with the provided CodecRegistry.
 ** Update bulk write operations to use the specific documentType defined in 
the MongoCodecSchema.

This approach adds support for codecs while avoiding breaking changes. If this 
approach is acceptable, we have a pull request ready to share. We are also 
happy to make any necessary adjustments based on the community's feedback.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to