Exidex created FLINK-31186:
------------------------------
Summary: Removing topic from kafka source does nothing
Key: FLINK-31186
URL: https://issues.apache.org/jira/browse/FLINK-31186
Project: Flink
Issue Type: Bug
Affects Versions: 1.15.3
Reporter: Exidex
As far as I can tell, there is no good way to remove topic from the list of
topic that kafka source consumes from.
We use {{StreamExecutionEnvironment.fromSource}} api with
{{KafkaSource.setTopics}} which accepts list of topics. but when we remove the
topic from list after some time the flink kafka source still consumes from it.
My guess is that it relates to this TODO in code:
[GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]
You can kind of workaroud this by removing whole job state or changing uid of
kafka source but that affects either whole job or whole source. The other way
is to use state processor api but it doesn't expose source operator state,
which in turn can be worked around using reflection and copying code from
SourceCoordinator. None of those are satisfactory
--
This message was sent by Atlassian Jira
(v8.20.10#820010)