[ 
https://issues.apache.org/jira/browse/FLINK-39262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Jona Fahlenkamp updated FLINK-39262:
------------------------------------------
    Description: 
The MongoDB Sink currently writes all records to a fixed database and 
collection specified in the MongoSinkBuilder. This limits its use in scenarios 
where records in the same stream need to be routed to different namespaces 
based on their content. Especially so, if the namespaces are not static - e.g. 
dependant on time buckets.

Suggested changes:
 # MongoNamespaceSelector<T>: Add an interface to allow selecting the target 
MongoNamespace (database & collection) for each input record.
 # MongoSinkBuilder: Add optional 
setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow 
configuring dynamic namespace selection logic.
 # If namespace selector is not supplied, set a constant namespace selector 
based on the existing options setDatabase and setCollection.
 # In MongoWriter switch from a List<WriteModel<BsonDocument>> to a 
Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records 
by their target namespace.

This approach adds support for dynamic target namespace while avoiding breaking 
changes. If this approach is acceptable, we have a pull request ready to share. 
We are also happy to make any necessary adjustments based on the community's 
feedback.

  was:
The MongoDB Sink currently writes all records to a fixed database and 
collection specified in the MongoSinkBuilder. This limits its use in scenarios 
where records in the same stream need to be routed to different namespaces 
based on their content. Especially so, if the namespaces are not static - e.g. 
dependant on time buckets.

Suggested changes:
# MongoNamespaceSelector<T>: Add an interface to allow selecting the target 
MongoNamespace for each input record.
# MongoSinkBuilder: Add optional 
setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow 
configuring dynamic namespace selection logic.
# If namespace selector is not supplied, set a constant namespace selector 
based on the existing options setDatabase and setCollection.
# In MongoWriter switch from a List<WriteModel<BsonDocument>> to a 
Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records 
by their target namespace.

This approach adds support for dynamic target namespace while avoiding breaking 
changes. If this approach is acceptable, we have a pull request ready to share. 
We are also happy to make any necessary adjustments based on the community's 
feedback.


> Support dynamic namespace selection in MongoDB Sink
> ---------------------------------------------------
>
>                 Key: FLINK-39262
>                 URL: https://issues.apache.org/jira/browse/FLINK-39262
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / MongoDB
>            Reporter: Jacob Jona Fahlenkamp
>            Priority: Minor
>
> The MongoDB Sink currently writes all records to a fixed database and 
> collection specified in the MongoSinkBuilder. This limits its use in 
> scenarios where records in the same stream need to be routed to different 
> namespaces based on their content. Especially so, if the namespaces are not 
> static - e.g. dependant on time buckets.
> Suggested changes:
>  # MongoNamespaceSelector<T>: Add an interface to allow selecting the target 
> MongoNamespace (database & collection) for each input record.
>  # MongoSinkBuilder: Add optional 
> setNamespaceSelector(MongoNamespaceSelector<IN> namespaceSelector) to allow 
> configuring dynamic namespace selection logic.
>  # If namespace selector is not supplied, set a constant namespace selector 
> based on the existing options setDatabase and setCollection.
>  # In MongoWriter switch from a List<WriteModel<BsonDocument>> to a 
> Map<MongoNamespace, List<WriteModel<BsonDocument>>> to group buffered records 
> by their target namespace.
> This approach adds support for dynamic target namespace while avoiding 
> breaking changes. If this approach is acceptable, we have a pull request 
> ready to share. We are also happy to make any necessary adjustments based on 
> the community's feedback.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to