This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 5cd6e46f [FLINK-38869] InitializationContext for KafkaSubscriber (#208)
5cd6e46f is described below
commit 5cd6e46f28f65846a9271e2c658e0d4450117b20
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Jan 22 18:21:09 2026 +0200
[FLINK-38869] InitializationContext for KafkaSubscriber (#208)
---
.../source/enumerator/KafkaSourceEnumerator.java | 7 +++++++
.../enumerator/subscriber/KafkaSubscriber.java | 24 ++++++++++++++++++++++
.../enumerator/subscriber/KafkaSubscriberTest.java | 10 +++++++++
3 files changed, 41 insertions(+)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index c811f176..61c64e12 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -200,6 +200,8 @@ public class KafkaSourceEnumerator
addPartitionSplitChangeToPendingAssignments(preinitializedSplits);
}
+ subscriber.open(new KafkaSubscriberInitContext());
+
if (partitionDiscoveryIntervalMs > 0) {
LOG.info(
"Starting the KafkaSourceEnumerator for consumer group {} "
@@ -259,6 +261,7 @@ public class KafkaSourceEnumerator
if (adminClient != null) {
adminClient.close();
}
+ subscriber.close();
}
// ----------------- private methods -------------------
@@ -562,6 +565,10 @@ public class KafkaSourceEnumerator
// --------------- private class ---------------
+ static class KafkaSubscriberInitContext implements
KafkaSubscriber.InitializationContext {
+ private KafkaSubscriberInitContext() {}
+ }
+
/** A container class to hold the newly added partitions and removed
partitions. */
@VisibleForTesting
static class PartitionChange {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 37de884a..62959e69 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -47,6 +47,17 @@ import java.util.regex.Pattern;
@PublicEvolving
public interface KafkaSubscriber extends Serializable {
+ /**
+ * Opens the subscriber. This lifecycle method will be called before {@link
+ * #getSubscribedTopicPartitions(AdminClient)} calls are made.
+ *
+ * <p>Implementations may override this method to initialize any
additional resources (beyond
+ * the Kafka {@link AdminClient}) required for discovering topic
partitions.
+ *
+ * @param initializationContext initialization context for the subscriber.
+ */
+ default void open(InitializationContext initializationContext) {}
+
/**
* Get a set of subscribed {@link TopicPartition}s.
*
@@ -55,6 +66,19 @@ public interface KafkaSubscriber extends Serializable {
*/
Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
+ /**
+ * Closes the subscriber. This lifecycle method will be called after this
{@link
+ * KafkaSubscriber} will no longer be used.
+ *
+ * <p>Any resources created in the {@link #open(InitializationContext)}
method should be cleaned
+ * up here.
+ */
+ default void close() {}
+
+ /** Initialization context for the {@link KafkaSubscriber}. */
+ @PublicEvolving
+ interface InitializationContext {}
+
// ----------------- factory methods --------------
static KafkaSubscriber getTopicListSubscriber(List<String> topics) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 0fd61f86..b04d587a 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -68,6 +68,7 @@ public class KafkaSubscriberTest {
List<String> topics = Arrays.asList(TOPIC1, TOPIC2);
KafkaSubscriber subscriber =
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1,
TOPIC2));
+ subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
@@ -84,6 +85,7 @@ public class KafkaSubscriberTest {
final KafkaSubscriber subscriber =
KafkaSubscriber.getTopicListSubscriber(
Collections.singletonList(NON_EXISTING_TOPIC.topic()));
+ subscriber.open(new TestSubscriberInitContext());
assertThatThrownBy(() ->
subscriber.getSubscribedTopicPartitions(adminClient))
.isInstanceOf(RuntimeException.class)
@@ -96,6 +98,7 @@ public class KafkaSubscriberTest {
KafkaSubscriber subscriber =
KafkaSubscriber.getTopicPatternSubscriber(pattern);
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
+ subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> expectedSubscribedPartitions =
new HashSet<>(
@@ -114,6 +117,7 @@ public class KafkaSubscriberTest {
partitions.remove(new TopicPartition(TOPIC1, 1));
KafkaSubscriber subscriber =
KafkaSubscriber.getPartitionSetSubscriber(partitions);
+ subscriber.open(new TestSubscriberInitContext());
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
@@ -129,6 +133,7 @@ public class KafkaSubscriberTest {
final KafkaSubscriber subscriber =
KafkaSubscriber.getPartitionSetSubscriber(
Collections.singleton(nonExistingPartition));
+ subscriber.open(new TestSubscriberInitContext());
assertThatThrownBy(() ->
subscriber.getSubscribedTopicPartitions(adminClient))
.isInstanceOf(RuntimeException.class)
@@ -137,4 +142,9 @@ public class KafkaSubscriberTest {
"Partition '%s' does not exist on Kafka
brokers",
nonExistingPartition));
}
+
+ private static class TestSubscriberInitContext implements
KafkaSubscriber.InitializationContext {
+ private TestSubscriberInitContext() {}
+ }
+
}