[ 
https://issues.apache.org/jira/browse/FLINK-38876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-38876:
-----------------------------
    Description: 
There isn't a per-cluster/per-stream offset initializer hook today. 

Currently dynamic kafka source uses a single OffsetsInitializer for starting 
(and stopping) and it's applied to every cluster. You can see it in 
DynamicKafkaSourceBuilder.java (setters + comments "applied to all clusters"), 
and it's passed verbatim to each cluster's KafkaSourceEnumerator in 
DynamicKafkaSourceEnumerator.java.

However, it's common that users need different initial offsets per cluster

This ticket is about adding per cluster offset to dynamic kafka source

  was:
Yes. The API is single OffsetsInitializer for starting (and stopping) and it's 
applied to every cluster. You can see it in DynamicKafkaSourceBuilder.java 
(setters + comments "applied to all clusters"), and it's passed verbatim to 
each cluster's KafkaSourceEnumerator in DynamicKafkaSourceEnumerator.java.

There isn't a per-cluster/per-stream offset initializer hook today. If you need 
different initial offsets per cluster, you'd have to run separate sources or 
extend the API to carry cluster-aware initializers.


> Support per-cluster offset in Dynamic Kafka Source
> --------------------------------------------------
>
>                 Key: FLINK-38876
>                 URL: https://issues.apache.org/jira/browse/FLINK-38876
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-4.0.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: kafka-4.1.0
>
>
> There isn't a per-cluster/per-stream offset initializer hook today. 
> Currently dynamic kafka source uses a single OffsetsInitializer for starting 
> (and stopping) and it's applied to every cluster. You can see it in 
> DynamicKafkaSourceBuilder.java (setters + comments "applied to all 
> clusters"), and it's passed verbatim to each cluster's KafkaSourceEnumerator 
> in DynamicKafkaSourceEnumerator.java.
> However, it's common that users need different initial offsets per cluster
> This ticket is about adding per cluster offset to dynamic kafka source



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to