[
https://issues.apache.org/jira/browse/FLINK-38876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bowen Li updated FLINK-38876:
-----------------------------
Description:
There isn't a per-cluster/per-stream offset initializer hook today.
Currently dynamic kafka source uses a single OffsetsInitializer for starting
(and stopping) and it's applied to every cluster. You can see it in
DynamicKafkaSourceBuilder.java (setters + comments "applied to all clusters"),
and it's passed verbatim to each cluster's KafkaSourceEnumerator in
DynamicKafkaSourceEnumerator.java.
However, it's common that users need different initial offsets per cluster
This ticket is about adding per cluster offset to dynamic kafka source
was:
Yes. The API is single OffsetsInitializer for starting (and stopping) and it's
applied to every cluster. You can see it in DynamicKafkaSourceBuilder.java
(setters + comments "applied to all clusters"), and it's passed verbatim to
each cluster's KafkaSourceEnumerator in DynamicKafkaSourceEnumerator.java.
There isn't a per-cluster/per-stream offset initializer hook today. If you need
different initial offsets per cluster, you'd have to run separate sources or
extend the API to carry cluster-aware initializers.
> Support per-cluster offset in Dynamic Kafka Source
> --------------------------------------------------
>
> Key: FLINK-38876
> URL: https://issues.apache.org/jira/browse/FLINK-38876
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: kafka-4.0.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Fix For: kafka-4.1.0
>
>
> There isn't a per-cluster/per-stream offset initializer hook today.
> Currently dynamic kafka source uses a single OffsetsInitializer for starting
> (and stopping) and it's applied to every cluster. You can see it in
> DynamicKafkaSourceBuilder.java (setters + comments "applied to all
> clusters"), and it's passed verbatim to each cluster's KafkaSourceEnumerator
> in DynamicKafkaSourceEnumerator.java.
> However, it's common that users need different initial offsets per cluster
> This ticket is about adding per cluster offset to dynamic kafka source
--
This message was sent by Atlassian Jira
(v8.20.10#820010)