This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4def6efb47e [FLINK-38530][python] Add support of Dynamic Kafka Source 
for PyFlink (#27409)
4def6efb47e is described below

commit 4def6efb47ebc2dff46f444a3319d3c2ae02c3a6
Author: bowenli86 <[email protected]>
AuthorDate: Wed Jan 14 17:37:20 2026 -0800

    [FLINK-38530][python] Add support of Dynamic Kafka Source for PyFlink 
(#27409)
---
 flink-python/pom.xml                               |   2 +-
 .../pyflink/datastream/connectors/__init__.py      |  12 +
 .../pyflink/datastream/connectors/dynamic_kafka.py | 267 +++++++++++++
 .../connectors/tests/test_dynamic_kafka.py         | 413 +++++++++++++++++++++
 4 files changed, 693 insertions(+), 1 deletion(-)

diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 411fa96d63c..dbace7043c1 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -289,7 +289,7 @@ under the License.
                        <!-- Indirectly accessed in pyflink_gateway_server -->
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-sql-connector-kafka</artifactId>
-                       <version>3.0.0-1.17</version>
+                       <version>4.0.1-2.0</version>
                        <scope>test</scope>
                </dependency>
 
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py 
b/flink-python/pyflink/datastream/connectors/__init__.py
index f8a29f60978..4f2e1036b20 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -44,6 +44,18 @@ def _install():
     setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer)
     setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer)
     setattr(connectors, 'Semantic', kafka.Semantic)
+    # dynamic kafka
+    from pyflink.datastream.connectors import dynamic_kafka
+    setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource)
+    setattr(connectors, 'DynamicKafkaSourceBuilder', 
dynamic_kafka.DynamicKafkaSourceBuilder)
+    setattr(connectors, 'KafkaMetadataService', 
dynamic_kafka.KafkaMetadataService)
+    setattr(connectors, 'KafkaRecordDeserializationSchema',
+            dynamic_kafka.KafkaRecordDeserializationSchema)
+    setattr(connectors, 'KafkaStreamSetSubscriber', 
dynamic_kafka.KafkaStreamSetSubscriber)
+    setattr(connectors, 'KafkaStreamSubscriber', 
dynamic_kafka.KafkaStreamSubscriber)
+    setattr(connectors, 'StreamPatternSubscriber', 
dynamic_kafka.StreamPatternSubscriber)
+    setattr(connectors, 'SingleClusterTopicMetadataService',
+            dynamic_kafka.SingleClusterTopicMetadataService)
 
     # pulsar
     from pyflink.datastream.connectors import pulsar
diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py 
b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
new file mode 100644
index 00000000000..7473077a08e
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
@@ -0,0 +1,267 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Set, Union
+
+from py4j.java_gateway import JavaObject
+
+from pyflink.common import DeserializationSchema
+from pyflink.datastream.connectors import Source
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'DynamicKafkaSource',
+    'DynamicKafkaSourceBuilder',
+    'KafkaMetadataService',
+    'KafkaRecordDeserializationSchema',
+    'KafkaStreamSetSubscriber',
+    'KafkaStreamSubscriber',
+    'StreamPatternSubscriber',
+    'SingleClusterTopicMetadataService'
+]
+
+
+class KafkaMetadataService(object):
+    """
+    Base class for Kafka metadata service wrappers.
+    """
+
+    def __init__(self, j_metadata_service: JavaObject):
+        self._j_metadata_service = j_metadata_service
+
+
+class SingleClusterTopicMetadataService(KafkaMetadataService):
+    """
+    A KafkaMetadataService backed by a single Kafka cluster where stream ids 
map to topics.
+    """
+
+    def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in properties.items():
+            j_properties.setProperty(key, value)
+        j_service = 
gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
+            .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)
+        super().__init__(j_service)
+
+
+class KafkaStreamSubscriber(object):
+    """
+    Wrapper for Java KafkaStreamSubscriber implementations.
+    """
+
+    def __init__(self, j_kafka_stream_subscriber: JavaObject):
+        self._j_kafka_stream_subscriber = j_kafka_stream_subscriber
+
+
+class KafkaStreamSetSubscriber(KafkaStreamSubscriber):
+    """
+    Subscriber that consumes from a fixed set of stream ids.
+    """
+
+    def __init__(self, stream_ids: Set[str]):
+        gateway = get_gateway()
+        j_stream_ids = gateway.jvm.java.util.HashSet()
+        for stream_id in stream_ids:
+            j_stream_ids.add(stream_id)
+        j_subscriber = 
gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.enumerator \
+            .subscriber.KafkaStreamSetSubscriber(j_stream_ids)
+        super().__init__(j_subscriber)
+
+
+class StreamPatternSubscriber(KafkaStreamSubscriber):
+    """
+    Subscriber that consumes from stream ids matching a regex pattern.
+    """
+
+    def __init__(self, stream_pattern: str):
+        gateway = get_gateway()
+        j_pattern = gateway.jvm.java.util.regex.Pattern.compile(stream_pattern)
+        j_subscriber = 
gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.enumerator \
+            .subscriber.StreamPatternSubscriber(j_pattern)
+        super().__init__(j_subscriber)
+
+
+class KafkaRecordDeserializationSchema(DeserializationSchema):
+    """
+    Wrapper for KafkaRecordDeserializationSchema.
+    """
+
+    def __init__(self, j_deserialization_schema: JavaObject):
+        super().__init__(j_deserialization_schema)
+
+    @staticmethod
+    def value_only(deserialization_schema: DeserializationSchema) -> \
+            'KafkaRecordDeserializationSchema':
+        jvm = get_gateway().jvm
+        j_deserializer = 
jvm.org.apache.flink.connector.kafka.source.reader.deserializer \
+            .KafkaRecordDeserializationSchema.valueOnly(
+                deserialization_schema._j_deserialization_schema)
+        return KafkaRecordDeserializationSchema(j_deserializer)
+
+
+class DynamicKafkaSource(Source):
+    """
+    Source implementation for dynamic Kafka streams.
+
+    Example:
+    ::
+
+        >>> metadata_service = SingleClusterTopicMetadataService(
+        ...     'cluster-a', {'bootstrap.servers': 'localhost:9092'})
+        >>> source = DynamicKafkaSource.builder() \\
+        ...     .set_stream_ids({'stream-a'}) \\
+        ...     .set_kafka_metadata_service(metadata_service) \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .build()
+    """
+
+    def __init__(self, j_dynamic_kafka_source: JavaObject):
+        super().__init__(j_dynamic_kafka_source)
+
+    @staticmethod
+    def builder() -> 'DynamicKafkaSourceBuilder':
+        return DynamicKafkaSourceBuilder()
+
+
+class DynamicKafkaSourceBuilder(object):
+    """
+    Builder for DynamicKafkaSource.
+    """
+
+    def __init__(self):
+        self._j_builder = 
get_gateway().jvm.org.apache.flink.connector.kafka.dynamic.source \
+            .DynamicKafkaSource.builder()
+
+    def build(self) -> DynamicKafkaSource:
+        return DynamicKafkaSource(self._j_builder.build())
+
+    def set_stream_ids(self, stream_ids: Set[str]) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set the stream ids to consume.
+        """
+        j_set = get_gateway().jvm.java.util.HashSet()
+        for stream_id in stream_ids:
+            j_set.add(stream_id)
+        self._j_builder.setStreamIds(j_set)
+        return self
+
+    def set_stream_pattern(self, stream_pattern: str) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set a regex pattern to match stream ids.
+        """
+        self._j_builder.setStreamPattern(get_gateway().jvm.java.util.regex
+                                         .Pattern.compile(stream_pattern))
+        return self
+
+    def set_kafka_stream_subscriber(
+            self,
+            kafka_stream_subscriber: Union[KafkaStreamSubscriber, JavaObject]) 
\
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set a custom KafkaStreamSubscriber.
+        """
+        if isinstance(kafka_stream_subscriber, KafkaStreamSubscriber):
+            j_subscriber = kafka_stream_subscriber._j_kafka_stream_subscriber
+        else:
+            j_subscriber = kafka_stream_subscriber
+        self._j_builder.setKafkaStreamSubscriber(j_subscriber)
+        return self
+
+    def set_kafka_metadata_service(
+            self,
+            kafka_metadata_service: Union[KafkaMetadataService, JavaObject]) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the KafkaMetadataService.
+        """
+        if isinstance(kafka_metadata_service, KafkaMetadataService):
+            j_metadata_service = kafka_metadata_service._j_metadata_service
+        else:
+            j_metadata_service = kafka_metadata_service
+        self._j_builder.setKafkaMetadataService(j_metadata_service)
+        return self
+
+    def set_deserializer(self,
+                         deserializer: Union[KafkaRecordDeserializationSchema, 
JavaObject]) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the KafkaRecordDeserializationSchema.
+        """
+        if isinstance(deserializer, KafkaRecordDeserializationSchema):
+            j_deserializer = deserializer._j_deserialization_schema
+        else:
+            j_deserializer = deserializer
+        self._j_builder.setDeserializer(j_deserializer)
+        return self
+
+    def set_value_only_deserializer(self, deserialization_schema: 
DeserializationSchema) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set a value-only DeserializationSchema.
+        """
+        return self.set_deserializer(
+            
KafkaRecordDeserializationSchema.value_only(deserialization_schema))
+
+    def set_starting_offsets(self, starting_offsets_initializer: 
KafkaOffsetsInitializer) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the starting offsets for all streams.
+        """
+        
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
+        return self
+
+    def set_bounded(self, stopping_offsets_initializer: 
KafkaOffsetsInitializer) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the source to bounded mode with stopping offsets.
+        """
+        self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
+        return self
+
+    def set_properties(self, props: Dict[str, str]) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set consumer properties for all clusters.
+        """
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in props.items():
+            j_properties.setProperty(key, value)
+        self._j_builder.setProperties(j_properties)
+        return self
+
+    def set_property(self, key: str, value: str) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set a consumer property for all clusters.
+        """
+        self._j_builder.setProperty(key, value)
+        return self
+
+    def set_group_id(self, group_id: str) -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the consumer group id for all clusters.
+        """
+        self._j_builder.setGroupId(group_id)
+        return self
+
+    def set_client_id_prefix(self, prefix: str) -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the client id prefix for all clusters.
+        """
+        self._j_builder.setClientIdPrefix(prefix)
+        return self
diff --git 
a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py 
b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
new file mode 100644
index 00000000000..448289f2727
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
@@ -0,0 +1,413 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from typing import Dict
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \
+    KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, 
StreamPatternSubscriber, \
+    SingleClusterTopicMetadataService
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, 
KafkaOffsetResetStrategy, \
+    KafkaTopicPartition
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.util.java_utils import get_field, get_field_value, is_instance_of
+
+
+class DynamicKafkaSourceTests(PyFlinkStreamingTestCase):
+
+    def test_compiling(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'test-stream'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+
+        ds = self.env.from_source(source=source,
+                                  
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                                  source_name='dynamic kafka source')
+        ds.print()
+        plan = json.loads(self.env.get_execution_plan())
+        self.assertEqual('Source: dynamic kafka source', 
plan['nodes'][0]['type'])
+
+    def test_set_stream_ids(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a', 'stream-b'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.KafkaStreamSetSubscriber'
+        )
+        stream_ids = get_field_value(subscriber, 'streamIds')
+        self.assertTrue(is_instance_of(stream_ids, 
get_gateway().jvm.java.util.Set))
+        self.assertEqual(stream_ids.size(), 2)
+        self.assertTrue('stream-a' in stream_ids)
+        self.assertTrue('stream-b' in stream_ids)
+
+    def test_set_stream_pattern(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_pattern('stream-*') \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.StreamPatternSubscriber'
+        )
+        stream_pattern = get_field_value(subscriber, 'streamPattern')
+        self.assertTrue(is_instance_of(stream_pattern, 
get_gateway().jvm.java.util.regex.Pattern))
+        self.assertEqual(stream_pattern.toString(), 'stream-*')
+
+    def test_set_stream_set_subscriber(self):
+        subscriber = KafkaStreamSetSubscriber({'stream-a', 'stream-b'})
+        source = DynamicKafkaSource.builder() \
+            .set_kafka_stream_subscriber(subscriber) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        j_subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            j_subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.KafkaStreamSetSubscriber'
+        )
+        stream_ids = get_field_value(j_subscriber, 'streamIds')
+        self.assertTrue(is_instance_of(stream_ids, 
get_gateway().jvm.java.util.Set))
+        self.assertEqual(stream_ids.size(), 2)
+        self.assertTrue('stream-a' in stream_ids)
+        self.assertTrue('stream-b' in stream_ids)
+
+    def test_set_stream_pattern_subscriber(self):
+        subscriber = StreamPatternSubscriber('stream-*')
+        source = DynamicKafkaSource.builder() \
+            .set_kafka_stream_subscriber(subscriber) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        j_subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            j_subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.StreamPatternSubscriber'
+        )
+        stream_pattern = get_field_value(j_subscriber, 'streamPattern')
+        self.assertTrue(is_instance_of(stream_pattern, 
get_gateway().jvm.java.util.regex.Pattern))
+        self.assertEqual(stream_pattern.toString(), 'stream-*')
+
+    def test_set_properties(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .set_group_id('test-group') \
+            .set_client_id_prefix('test-client-id') \
+            .set_property('test-property', 'test-value') \
+            .build()
+        properties = get_field_value(source.get_java_function(), 'properties')
+        self.assertEqual(properties.getProperty('group.id'), 'test-group')
+        self.assertEqual(properties.getProperty('client.id.prefix'), 
'test-client-id')
+        self.assertEqual(properties.getProperty('test-property'), 'test-value')
+
+    def test_set_starting_offsets(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return DynamicKafkaSource.builder() \
+                .set_stream_ids({'stream-a'}) \
+                .set_kafka_metadata_service(self._build_metadata_service()) \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_starting_offsets(initializer) \
+                .build()
+
+        self._check_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.latest()),
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'LatestOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.LATEST,
+            offset=-1
+        )
+        self._check_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.earliest()),
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'EarliestOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.EARLIEST,
+            offset=-2
+        )
+        self._check_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets()),
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'CommittedOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.NONE,
+            offset=-3
+        )
+        self._check_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets(
+                KafkaOffsetResetStrategy.LATEST
+            )),
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'CommittedOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.LATEST,
+            offset=-3
+        )
+        self._check_timestamp_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.timestamp(100)), 100
+        )
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)), 
specified_offsets,
+            KafkaOffsetResetStrategy.EARLIEST
+        )
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(
+                specified_offsets,
+                KafkaOffsetResetStrategy.LATEST
+            )),
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST
+        )
+
+    def test_bounded(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return DynamicKafkaSource.builder() \
+                .set_stream_ids({'stream-a'}) \
+                .set_kafka_metadata_service(self._build_metadata_service()) \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_bounded(initializer) \
+                .build()
+
+        def _check_bounded(source: DynamicKafkaSource):
+            self.assertEqual(
+                get_field_value(source.get_java_function(), 
'boundedness').toString(), 'BOUNDED'
+            )
+
+        source = _build_source(KafkaOffsetsInitializer.latest())
+        _check_bounded(source)
+        self._check_offsets_initializer(
+            source,
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'LatestOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.LATEST,
+            offset=-1,
+            is_start=False
+        )
+        source = _build_source(KafkaOffsetsInitializer.earliest())
+        _check_bounded(source)
+        self._check_offsets_initializer(
+            source,
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'EarliestOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.EARLIEST,
+            offset=-2,
+            is_start=False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets())
+        _check_bounded(source)
+        self._check_offsets_initializer(
+            source,
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'CommittedOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.NONE,
+            offset=-3,
+            is_start=False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets(
+            KafkaOffsetResetStrategy.LATEST
+        ))
+        _check_bounded(source)
+        self._check_offsets_initializer(
+            source,
+            {
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'CommittedOffsetsInitializer',
+                
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+                'ReaderHandledOffsetsInitializer',
+            },
+            reset_strategy=KafkaOffsetResetStrategy.LATEST,
+            offset=-3,
+            is_start=False
+        )
+        source = _build_source(KafkaOffsetsInitializer.timestamp(100))
+        _check_bounded(source)
+        self._check_timestamp_offsets_initializer(source, 100, False)
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        source = 
_build_source(KafkaOffsetsInitializer.offsets(specified_offsets))
+        _check_bounded(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.offsets(
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST)
+        )
+        _check_bounded(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False
+        )
+
+    def test_set_value_only_deserializer(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        deserialization_schema_wrapper = 
get_field_value(source.get_java_function(),
+                                                         
'deserializationSchema')
+        self.assertEqual(
+            deserialization_schema_wrapper.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.reader.deserializer'
+            '.KafkaValueOnlyDeserializationSchemaWrapper'
+        )
+        deserialization_schema = 
get_field_value(deserialization_schema_wrapper,
+                                                 'deserializationSchema')
+        self.assertEqual(deserialization_schema.getClass().getCanonicalName(),
+                         
'org.apache.flink.api.common.serialization.SimpleStringSchema')
+
+    def test_set_deserializer(self):
+        record_deserializer = 
KafkaRecordDeserializationSchema.value_only(SimpleStringSchema())
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_deserializer(record_deserializer) \
+            .build()
+        deserialization_schema_wrapper = 
get_field_value(source.get_java_function(),
+                                                         
'deserializationSchema')
+        self.assertEqual(
+            deserialization_schema_wrapper.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.reader.deserializer'
+            '.KafkaValueOnlyDeserializationSchemaWrapper'
+        )
+
+    @staticmethod
+    def _build_metadata_service() -> SingleClusterTopicMetadataService:
+        return SingleClusterTopicMetadataService(
+            'test-cluster', {'bootstrap.servers': 'localhost:9092'})
+
+    def _check_offsets_initializer(self,
+                                   source: DynamicKafkaSource,
+                                   expected_class_names,
+                                   reset_strategy: KafkaOffsetResetStrategy = 
None,
+                                   offset: int = None,
+                                   is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        class_name = offsets_initializer.getClass().getCanonicalName()
+        self.assertIn(class_name, expected_class_names)
+
+        if offset is not None:
+            starting_offset_field = get_field(offsets_initializer.getClass(), 
'startingOffset')
+            if starting_offset_field is not None:
+                starting_offset = 
starting_offset_field.get(offsets_initializer)
+                self.assertEqual(starting_offset, offset)
+
+        if reset_strategy is not None:
+            offset_reset_strategy_field = 
get_field(offsets_initializer.getClass(),
+                                                    'offsetResetStrategy')
+            if offset_reset_strategy_field is not None:
+                offset_reset_strategy = 
offset_reset_strategy_field.get(offsets_initializer)
+                self.assertTrue(
+                    
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+                )
+
+    def _check_timestamp_offsets_initializer(self,
+                                             source: DynamicKafkaSource,
+                                             timestamp: int,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.TimestampOffsetsInitializer'
+        )
+
+        starting_timestamp = get_field_value(offsets_initializer, 
'startingTimestamp')
+        self.assertEqual(starting_timestamp, timestamp)
+
+    def _check_specified_offsets_initializer(self,
+                                             source: DynamicKafkaSource,
+                                             offsets: 
Dict[KafkaTopicPartition, int],
+                                             reset_strategy: 
KafkaOffsetResetStrategy,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.SpecifiedOffsetsInitializer'
+        )
+
+        initial_offsets = get_field_value(offsets_initializer, 
'initialOffsets')
+        self.assertTrue(is_instance_of(initial_offsets, 
get_gateway().jvm.java.util.Map))
+        self.assertEqual(initial_offsets.size(), len(offsets))
+        for j_topic_partition in initial_offsets:
+            topic_partition = KafkaTopicPartition(j_topic_partition.topic(),
+                                                  
j_topic_partition.partition())
+            self.assertIsNotNone(offsets.get(topic_partition))
+            self.assertEqual(initial_offsets[j_topic_partition], 
offsets[topic_partition])
+
+        offset_reset_strategy = get_field_value(offsets_initializer, 
'offsetResetStrategy')
+        self.assertTrue(
+            
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+        )

Reply via email to