dianfu commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-kafka/pull/177#discussion_r2134959384


##########
flink-python/pyflink/datastream/connectors/kafka.py:
##########
@@ -41,6 +43,157 @@
     'KafkaTopicSelector'
 ]
 
+# ---- DynamicKafkaSource ----
+
+
+class DynamicKafkaSource(Source):
+    """Python wrapper of the Java DynamicKafkaSource.
+
+    A DynamicKafkaSource enables consuming records from dynamically discovered 
Kafka streams
+    (topics that may span multiple clusters) without restarting the Flink job.
+
+    Use :py:meth:`builder` to construct an instance, for example::
+
+        >>> source = DynamicKafkaSource.builder() \\
+        ...     .set_stream_ids({"stream-a", "stream-b"}) \\
+        ...     .set_kafka_metadata_service(metadata_service) \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .set_property("group.id", "my_group") \\
+        ...     .build()
+
+    The builder methods closely mirror their Java counterparts defined on
+    
``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``.
+    """
+
+    def __init__(self, j_dynamic_kafka_source: JavaObject):
+        super().__init__(j_dynamic_kafka_source)
+
+    @staticmethod
+    def builder() -> 'DynamicKafkaSourceBuilder':
+        """Create and return a new :class:`DynamicKafkaSourceBuilder`."""
+        return DynamicKafkaSourceBuilder()
+
+
+class DynamicKafkaSourceBuilder(object):
+    """Builder for :class:`DynamicKafkaSource`.
+
+    The builder is a thin Python wrapper delegating to the underlying Java 
builder defined in
+    
``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``.
+    """
+
+    def __init__(self):
+        self._gateway = get_gateway()
+        self._j_builder = (
+            
self._gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource
+            .builder()
+        )
+
+    # ---------------------------------------------------------------------
+    #  Build
+    # ---------------------------------------------------------------------
+
+    def build(self) -> 'DynamicKafkaSource':
+        """Finalize the configuration and return a 
:class:`DynamicKafkaSource`."""
+        return DynamicKafkaSource(self._j_builder.build())
+
+    # ---------------------------------------------------------------------
+    #  Stream subscription configuration
+    # ---------------------------------------------------------------------
+
+    def set_stream_ids(self, stream_ids: Set[str]) -> 
'DynamicKafkaSourceBuilder':
+        """Subscribe to a fixed set of stream IDs.
+
+        :param stream_ids: A Python ``set`` of stream IDs.
+        """
+        j_set = self._gateway.jvm.java.util.HashSet()
+        for stream_id in stream_ids:
+            j_set.add(stream_id)
+        self._j_builder.setStreamIds(j_set)
+        return self
+
+    def set_stream_pattern(self, stream_pattern: str) -> 
'DynamicKafkaSourceBuilder':
+        """Subscribe to streams whose IDs match the given Java regex 
pattern."""
+        j_pattern = 
self._gateway.jvm.java.util.regex.Pattern.compile(stream_pattern)
+        self._j_builder.setStreamPattern(j_pattern)
+        return self
+
+    def set_kafka_stream_subscriber(self, kafka_stream_subscriber: JavaObject) 
-> \
+            'DynamicKafkaSourceBuilder':
+        """Use a custom ``KafkaStreamSubscriber`` implementation."""
+        self._j_builder.setKafkaStreamSubscriber(kafka_stream_subscriber)
+        return self
+
+    # ---------------------------------------------------------------------
+    #  Metadata service
+    # ---------------------------------------------------------------------
+
+    def set_kafka_metadata_service(self, kafka_metadata_service: JavaObject) 
-> \

Review Comment:
   The implementation of Python DynamicKafkaSource in this pull request could 
be seen as just a wrapper of an arbitrary JavaObject?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to