Jacob Jona Fahlenkamp created FLINK-39263:
---------------------------------------------
Summary: Support custom codecs in MongoDB Sink
Key: FLINK-39263
URL: https://issues.apache.org/jira/browse/FLINK-39263
Project: Flink
Issue Type: New Feature
Components: Connectors / MongoDB
Reporter: Jacob Jona Fahlenkamp
The MongoDB Sink currently supports serializing records into BsonDocument via
MongoSerializationSchema. This requires manual conversion of data types into
BsonDocument before they can be written to MongoDB.
The MongoDB Java Driver provides a Codec API that allows for direct encoding of
any type to BSON. Supporting this API in the MongoDB Sink would allow for more
direct writing of various data types.
This is important for migrating existing code that uses codecs to Flink, and
for performance-critical use cases where intermediate conversion to
BsonDocument is too expensive.
Furthermore, by using the automatic PojoCodecProvider, we could support POJOs
by default. This removes the boilerplate for mapping to BsonDocument without
requiring the manual implementation of a custom codec.
Suggested changes:
# MongoCodecSchema<T>: Add an interface to describe the type being written and
provide the CodecRegistry to be used by the internal MongoDB client.
# The default implementation returns a codec registry that supporting standard
MongoDB types and supports automatic POJO mapping.
# MongoSinkBuilder:
** Add setCodecSchema(MongoCodecSchema<IN> codecSchema) to allow providing
codec configurations.
** Add setWriteModelMapper(SerializableFunction<IN, WriteModel<IN>> mapper) to
allow mapping input records directly to a WriteModel<IN>, leveraging the
configured codecs.
** The purpose of adding a separate mapper is to prevent breaking changes to
the existing MongoSerializationSchema, which forces mapping to
WriteModel<BsonDocument>.
# MongoWriter:
** Update the writer to support both the existing MongoSerializationSchema and
the codec-based approach.
** Initialize the internal MongoClient with the provided CodecRegistry.
** Update bulk write operations to use the specific documentType defined in
the MongoCodecSchema.
This approach adds support for codecs while avoiding breaking changes. If this
approach is acceptable, we have a pull request ready to share. We are also
happy to make any necessary adjustments based on the community's feedback.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)