dianfu commented on code in PR #177: URL: https://github.com/apache/flink-connector-kafka/pull/177#discussion_r2134959384
########## flink-python/pyflink/datastream/connectors/kafka.py: ########## @@ -41,6 +43,157 @@ 'KafkaTopicSelector' ] +# ---- DynamicKafkaSource ---- + + +class DynamicKafkaSource(Source): + """Python wrapper of the Java DynamicKafkaSource. + + A DynamicKafkaSource enables consuming records from dynamically discovered Kafka streams + (topics that may span multiple clusters) without restarting the Flink job. + + Use :py:meth:`builder` to construct an instance, for example:: + + >>> source = DynamicKafkaSource.builder() \\ + ... .set_stream_ids({"stream-a", "stream-b"}) \\ + ... .set_kafka_metadata_service(metadata_service) \\ + ... .set_value_only_deserializer(SimpleStringSchema()) \\ + ... .set_property("group.id", "my_group") \\ + ... .build() + + The builder methods closely mirror their Java counterparts defined on + ``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``. + """ + + def __init__(self, j_dynamic_kafka_source: JavaObject): + super().__init__(j_dynamic_kafka_source) + + @staticmethod + def builder() -> 'DynamicKafkaSourceBuilder': + """Create and return a new :class:`DynamicKafkaSourceBuilder`.""" + return DynamicKafkaSourceBuilder() + + +class DynamicKafkaSourceBuilder(object): + """Builder for :class:`DynamicKafkaSource`. + + The builder is a thin Python wrapper delegating to the underlying Java builder defined in + ``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``. + """ + + def __init__(self): + self._gateway = get_gateway() + self._j_builder = ( + self._gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource + .builder() + ) + + # --------------------------------------------------------------------- + # Build + # --------------------------------------------------------------------- + + def build(self) -> 'DynamicKafkaSource': + """Finalize the configuration and return a :class:`DynamicKafkaSource`.""" + return DynamicKafkaSource(self._j_builder.build()) + + # --------------------------------------------------------------------- + # Stream subscription configuration + # --------------------------------------------------------------------- + + def set_stream_ids(self, stream_ids: Set[str]) -> 'DynamicKafkaSourceBuilder': + """Subscribe to a fixed set of stream IDs. + + :param stream_ids: A Python ``set`` of stream IDs. + """ + j_set = self._gateway.jvm.java.util.HashSet() + for stream_id in stream_ids: + j_set.add(stream_id) + self._j_builder.setStreamIds(j_set) + return self + + def set_stream_pattern(self, stream_pattern: str) -> 'DynamicKafkaSourceBuilder': + """Subscribe to streams whose IDs match the given Java regex pattern.""" + j_pattern = self._gateway.jvm.java.util.regex.Pattern.compile(stream_pattern) + self._j_builder.setStreamPattern(j_pattern) + return self + + def set_kafka_stream_subscriber(self, kafka_stream_subscriber: JavaObject) -> \ + 'DynamicKafkaSourceBuilder': + """Use a custom ``KafkaStreamSubscriber`` implementation.""" + self._j_builder.setKafkaStreamSubscriber(kafka_stream_subscriber) + return self + + # --------------------------------------------------------------------- + # Metadata service + # --------------------------------------------------------------------- + + def set_kafka_metadata_service(self, kafka_metadata_service: JavaObject) -> \ Review Comment: The implementation of Python DynamicKafkaSource in this pull request could be seen as just a wrapper of an arbitrary JavaObject? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org