Ruibin Xing created FLINK-31483:
-----------------------------------

             Summary: Implement Split Deletion Support in Flink Kafka Connector
                 Key: FLINK-31483
                 URL: https://issues.apache.org/jira/browse/FLINK-31483
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / Kafka, Connectors / Parent
            Reporter: Ruibin Xing


Currently, the Flink Kafka Connector does not support split deletion and is 
left as a 
[TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
 I want to add this feature by doing these steps:

1. Add SplitsDeletion event to flink-connector-base, which currently only has 
SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
SplitsDeletion event to the source operator. To maintain compatibility, a 
default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including 
remove them from Split state and stopping SourceReader from reading the deleted 
splits.

As an alternative, without modifying the flink-connector-base, 
KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
splits deletion and deal with it in the kafka-connector-specific code. But I 
think it's better to have SplitsDeletion in flink-connector-base, so other 
connectors can use it too.

Let me know if you have any thoughts or ideas. Thanks!

Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to