[
https://issues.apache.org/jira/browse/FLINK-39262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jacob Jona Fahlenkamp updated FLINK-39262:
------------------------------------------
Description:
The MongoDB Sink currently writes all records to a fixed database and
collection specified in the MongoSinkBuilder. This limits its use in scenarios
where records in the same stream need to be routed to different namespaces
based on their content. Especially so, if the namespaces are not static - e.g.
dependant on time buckets.
Suggested changes:
# MongoNamespaceSelector<T>: Add an interface to allow selecting the target
MongoNamespace (database & collection) for each input record.
# MongoSinkBuilder: Add optional
setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow
configuring dynamic namespace selection logic.
# If namespace selector is not supplied, set a constant namespace selector
based on the existing options setDatabase and setCollection.
# In MongoWriter switch from a List<WriteModel<BsonDocument>> to a
Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records
by their target namespace.
This approach adds support for dynamic target namespace while avoiding breaking
changes. If this approach is acceptable, we have a pull request ready to share.
We are also happy to make any necessary adjustments based on the community's
feedback.
was:
The MongoDB Sink currently writes all records to a fixed database and
collection specified in the MongoSinkBuilder. This limits its use in scenarios
where records in the same stream need to be routed to different namespaces
based on their content. Especially so, if the namespaces are not static - e.g.
dependant on time buckets.
Suggested changes:
# MongoNamespaceSelector<T>: Add an interface to allow selecting the target
MongoNamespace for each input record.
# MongoSinkBuilder: Add optional
setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow
configuring dynamic namespace selection logic.
# If namespace selector is not supplied, set a constant namespace selector
based on the existing options setDatabase and setCollection.
# In MongoWriter switch from a List<WriteModel<BsonDocument>> to a
Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records
by their target namespace.
This approach adds support for dynamic target namespace while avoiding breaking
changes. If this approach is acceptable, we have a pull request ready to share.
We are also happy to make any necessary adjustments based on the community's
feedback.
> Support dynamic namespace selection in MongoDB Sink
> ---------------------------------------------------
>
> Key: FLINK-39262
> URL: https://issues.apache.org/jira/browse/FLINK-39262
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / MongoDB
> Reporter: Jacob Jona Fahlenkamp
> Priority: Minor
>
> The MongoDB Sink currently writes all records to a fixed database and
> collection specified in the MongoSinkBuilder. This limits its use in
> scenarios where records in the same stream need to be routed to different
> namespaces based on their content. Especially so, if the namespaces are not
> static - e.g. dependant on time buckets.
> Suggested changes:
> # MongoNamespaceSelector<T>: Add an interface to allow selecting the target
> MongoNamespace (database & collection) for each input record.
> # MongoSinkBuilder: Add optional
> setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow
> configuring dynamic namespace selection logic.
> # If namespace selector is not supplied, set a constant namespace selector
> based on the existing options setDatabase and setCollection.
> # In MongoWriter switch from a List<WriteModel<BsonDocument>> to a
> Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records
> by their target namespace.
> This approach adds support for dynamic target namespace while avoiding
> breaking changes. If this approach is acceptable, we have a pull request
> ready to share. We are also happy to make any necessary adjustments based on
> the community's feedback.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)