gaoxin-oai commented on code in PR #28512: URL: https://github.com/apache/flink/pull/28512#discussion_r3456190667
########## flink-python/pyflink/datastream/connectors/dynamic_kafka.py: ########## @@ -36,6 +37,52 @@ ] +def _to_j_offsets_initializer( + offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]: + return offsets_initializer._j_initializer if offsets_initializer is not None else None + + +def _has_cluster_offsets( + starting_offsets_initializer: Optional[KafkaOffsetsInitializer], + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool: + return starting_offsets_initializer is not None or stopping_offsets_initializer is not None + + +class ClusterMetadata(object): Review Comment: The SingleClusterTopicMetadataService offset path is usable, but I do not see a public consumer for the new ClusterMetadata wrapper. After a user constructs ClusterMetadata(...), the only accessible value is the private _j_cluster_metadata field; DynamicKafkaSourceBuilder accepts a metadata service, not cluster metadata. Should we either remove this public wrapper for now or add/document the supported API that consumes it? ########## flink-python/pyflink/datastream/connectors/dynamic_kafka.py: ########## @@ -36,6 +37,52 @@ ] +def _to_j_offsets_initializer( + offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]: + return offsets_initializer._j_initializer if offsets_initializer is not None else None + + +def _has_cluster_offsets( + starting_offsets_initializer: Optional[KafkaOffsetsInitializer], + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool: + return starting_offsets_initializer is not None or stopping_offsets_initializer is not None + + +class ClusterMetadata(object): Review Comment: The SingleClusterTopicMetadataService offset path is usable, but I do not see a public consumer for the new ClusterMetadata wrapper. After a user constructs ClusterMetadata(...), the only accessible value is the private _j_cluster_metadata field; DynamicKafkaSourceBuilder accepts a metadata service, not cluster metadata. Should we either remove this public wrapper for now or add/document the supported API that consumes it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
