[ https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruibin Xing updated FLINK-31483: -------------------------------- Description: Currently, the Flink Kafka Connector does not support split deletion and is left as a [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]]. I want to add this feature by doing these steps: 1. Add SplitsDeletion event to flink-connector-base, which currently only has SplitsAddition. 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a SplitsDeletion event to the source operator. To maintain compatibility, a default empty implementation for this method will be added. 3. Make SourceOperator handle the SplitsDeletion event, notifiying the SourceReader to delete splits. 4. Create a deleteSplits method in SourceReader to remove splits, including remove them from Split state and stopping SourceReader from reading the deleted splits. As an alternative, without modifying the flink-connector-base, KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for splits deletion and deal with it in the kafka-connector-specific code. But I think it's better to have SplitsDeletion in flink-connector-base, so other connectors can use it too. Let me know if you have any thoughts or ideas. Thanks! Related Issues: FLINK-30490 was: Currently, the Flink Kafka Connector does not support split deletion and is left as a [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]]. I want to add this feature by doing these steps: 1. Add SplitsDeletion event to flink-connector-base, which currently only has SplitsAddition. 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a SplitsDeletion event to the source operator. To maintain compatibility, a default empty implementation for this method will be added. 3. Make SourceOperator handle the SplitsDeletion event, notifiying the SourceReader to delete splits. 4. Create a deleteSplits method in SourceReader to remove splits, including remove them from Split state and stopping SourceReader from reading the deleted splits. As an alternative, without modifying the flink-connector-base, KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for splits deletion and deal with it in the kafka-connector-specific code. But I think it's better to have SplitsDeletion in flink-connector-base, so other connectors can use it too. Let me know if you have any thoughts or ideas. Thanks! Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490] > Implement Split Deletion Support in Flink Kafka Connector > --------------------------------------------------------- > > Key: FLINK-31483 > URL: https://issues.apache.org/jira/browse/FLINK-31483 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka, Connectors / Parent > Reporter: Ruibin Xing > Priority: Major > > Currently, the Flink Kafka Connector does not support split deletion and is > left as a > [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]]. > I want to add this feature by doing these steps: > 1. Add SplitsDeletion event to flink-connector-base, which currently only has > SplitsAddition. > 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a > SplitsDeletion event to the source operator. To maintain compatibility, a > default empty implementation for this method will be added. > 3. Make SourceOperator handle the SplitsDeletion event, notifiying the > SourceReader to delete splits. > 4. Create a deleteSplits method in SourceReader to remove splits, including > remove them from Split state and stopping SourceReader from reading the > deleted splits. > As an alternative, without modifying the flink-connector-base, > KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for > splits deletion and deal with it in the kafka-connector-specific code. But I > think it's better to have SplitsDeletion in flink-connector-base, so other > connectors can use it too. > Let me know if you have any thoughts or ideas. Thanks! > Related Issues: FLINK-30490 -- This message was sent by Atlassian Jira (v8.20.10#820010)