Hi Hatem, Before a PR, you would need to create a JIRA to track this issue and have a committer assign that JIRA to you. Make sure to go through https://flink.apache.org/how-to-contribute/overview/ as it will make contributions smoother.
Best, Mason On Thu, May 25, 2023 at 10:30 AM Hatem Mostafa <m...@hatem.co> wrote: > Hello Mason, > > I created that PR <https://github.com/apache/flink/pull/22662/files> for > a suggestion on how to address the issue so that it would enable us to set > client id. Happy to do any modifications to get this merged for the future. > > > On Thu, May 25, 2023 at 12:55 AM Mason Chen <mas.chen6...@gmail.com> > wrote: > >> Hi Hatem, >> >> The reason for setting different client ids is to due to Kafka client >> metrics conflicts and the issue is documented here: >> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics. >> I think that the warning log is benign if you are using Flink's metric >> system for monitoring the Kafka connector and it would be nice to introduce >> an option in the connector to configure the same `client.id` across all >> tasks for the quota feature you mentioned. >> >> Best, >> Mason >> >> On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa <m...@hatem.co> wrote: >> >>> Hello Martijn, >>> >>> Yes, checkpointing is enabled and the offsets are committed without a >>> problem. I think I might have figured out the answer to my second question >>> based on my understanding of this code >>> <https://github.com/apache/flink/blob/0612a997ddcc791ee54f500fbf1299ce04987679/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java>, >>> flink uses low level consumer that does not trigger consumer.subscribe >>> which makes the consumer group not appear as an active member using >>> kafka-consumer-group tool, The consumer group functionality is fine though. >>> However I am more interested in an answer for my first question. Kafka >>> Quotas is one of the important features of using kafka and with flink >>> setting a different client id for every consumer in the same consumer group >>> makes it hard to set quotas for that consumer group. What is the reason >>> behind setting different client ids? >>> >>> On Wed, May 24, 2023 at 1:13 PM Martijn Visser <martijnvis...@apache.org> >>> wrote: >>> >>>> Hi Hatem, >>>> >>>> Could it be that you don't have checkpointing enabled? Flink only >>>> commits its offset when a checkpoint has been completed successfully, as >>>> explained on >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing >>>> >>>> Best regards, >>>> >>>> Martijn >>>> >>>> >>>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa <m...@hatem.co> wrote: >>>> >>>>> Hello, >>>>> >>>>> I have two questions that are related to each other: >>>>> >>>>> *First question:* >>>>> >>>>> I have been trying to set `client.id` to set a kafka client quota >>>>> <https://kafka.apache.org/documentation.html#design_quotas> for >>>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a >>>>> lot of data from our kafka cluster causing a denial of service for our >>>>> kafka cluster. However `client.id` gets overridden by flink source >>>>> here >>>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>. >>>>> How would I enforce quotas for flink kafka source? >>>>> >>>>> *Second question:* >>>>> >>>>> Also something I didn't quite understand when describing our consumer >>>>> group in kafka why I don't see the metadata for the consumer group >>>>> information (consumer id, client id & host) and I get that the consumer >>>>> group has no active members but it's actually active and consuming. >>>>> >>>>> *Example describing a flink consumer group* >>>>> >>>>>> ./kafka-consumer-groups.sh --bootstrap-server >>>>>> kafka-server-address:9092 --describe --group flink-consumer-group >>>>>> Consumer group 'flink-consumer-group' has no active members. >>>>>> GROUP TOPIC PARTITION >>>>>> CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST >>>>>> CLIENT-ID >>>>>> flink_consumer_group topic_name 1 514588965 >>>>>> 514689721 100756 - >>>>>> - - >>>>> >>>>> >>>>> >>>>> *Example describing a normal consumer group written using a confluent >>>>> kafka python library.* >>>>> >>>>>> ./kafka-consumer-groups.sh ---bootstrap-server >>>>>> kafka-server-address:9092 --describe --group >>>>>> python_confluent_kafka_consumer >>>>>> GROUP TOPIC >>>>>> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID >>>>>> >>>>>> HOST CLIENT-ID >>>>>> python_confluent_kafka_consumer topic_name 1 >>>>>> 17279532 17279908 376 >>>>>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e >>>>>> /<HOST-IP> python_confluent_kafka_consumer_client_id >>>>> >>>>> >>>>> >>>>> I am using flink version 1.15. >>>>> >>>>> Thanks, >>>>> Hatem >>>>> >>>>> >>>>> >>>>>