Hi Hatem,

Before a PR, you would need to create a JIRA to track this issue and have a
committer assign that JIRA to you. Make sure to go through
https://flink.apache.org/how-to-contribute/overview/ as it will make
contributions smoother.

Best,
Mason

On Thu, May 25, 2023 at 10:30 AM Hatem Mostafa <m...@hatem.co> wrote:

> Hello Mason,
>
> I created that PR <https://github.com/apache/flink/pull/22662/files> for
> a suggestion on how to address the issue so that it would enable us to set
> client id. Happy to do any modifications to get this merged for the future.
>
>
> On Thu, May 25, 2023 at 12:55 AM Mason Chen <mas.chen6...@gmail.com>
> wrote:
>
>> Hi Hatem,
>>
>> The reason for setting different client ids is to due to Kafka client
>> metrics conflicts and the issue is documented here:
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics.
>> I think that the warning log is benign if you are using Flink's metric
>> system for monitoring the Kafka connector and it would be nice to introduce
>> an option in the connector to configure the same `client.id` across all
>> tasks for the quota feature you mentioned.
>>
>> Best,
>> Mason
>>
>> On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa <m...@hatem.co> wrote:
>>
>>> Hello Martijn,
>>>
>>> Yes, checkpointing is enabled and the offsets are committed without a
>>> problem. I think I might have figured out the answer to my second question
>>> based on my understanding of this code
>>> <https://github.com/apache/flink/blob/0612a997ddcc791ee54f500fbf1299ce04987679/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java>,
>>> flink uses low level consumer that does not trigger consumer.subscribe
>>> which makes the consumer group not appear as an active member using
>>> kafka-consumer-group tool, The consumer group functionality is fine though.
>>> However I am more interested in an answer for my first question. Kafka
>>> Quotas is one of the important features of using kafka and with flink
>>> setting a different client id for every consumer in the same consumer group
>>> makes it hard to set quotas for that consumer group. What is the reason
>>> behind setting different client ids?
>>>
>>> On Wed, May 24, 2023 at 1:13 PM Martijn Visser <martijnvis...@apache.org>
>>> wrote:
>>>
>>>> Hi Hatem,
>>>>
>>>> Could it be that you don't have checkpointing enabled? Flink only
>>>> commits its offset when a checkpoint has been completed successfully, as
>>>> explained on
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>>
>>>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa <m...@hatem.co> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have two questions that are related to each other:
>>>>>
>>>>> *First question:*
>>>>>
>>>>> I have been trying to set `client.id` to set a kafka client quota
>>>>> <https://kafka.apache.org/documentation.html#design_quotas> for
>>>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>>>>> lot of data from our kafka cluster causing a denial of service for our
>>>>> kafka cluster. However `client.id` gets overridden by flink source
>>>>> here
>>>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>.
>>>>> How would I enforce quotas for flink kafka source?
>>>>>
>>>>> *Second question:*
>>>>>
>>>>> Also something I didn't quite understand when describing our consumer
>>>>> group in kafka why I don't see the metadata for the consumer group
>>>>> information (consumer id, client id & host) and I get that the consumer
>>>>> group has no active members but it's actually active and consuming.
>>>>>
>>>>> *Example describing a flink consumer group*
>>>>>
>>>>>> ./kafka-consumer-groups.sh --bootstrap-server
>>>>>> kafka-server-address:9092   --describe --group flink-consumer-group
>>>>>> Consumer group 'flink-consumer-group' has no active members.
>>>>>> GROUP                           TOPIC           PARTITION
>>>>>>  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST
>>>>>>      CLIENT-ID
>>>>>> flink_consumer_group     topic_name 1         514588965
>>>>>> 514689721       100756                                       -
>>>>>>                -                     -
>>>>>
>>>>>
>>>>>
>>>>> *Example describing a normal consumer group written using a confluent
>>>>> kafka python library.*
>>>>>
>>>>>> ./kafka-consumer-groups.sh ---bootstrap-server
>>>>>> kafka-server-address:9092  --describe --group
>>>>>> python_confluent_kafka_consumer
>>>>>> GROUP                                            TOPIC
>>>>>>  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>>>>>
>>>>>>                   HOST           CLIENT-ID
>>>>>> python_confluent_kafka_consumer topic_name       1
>>>>>>  17279532                  17279908                  376
>>>>>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
>>>>>> /<HOST-IP> python_confluent_kafka_consumer_client_id
>>>>>
>>>>>
>>>>>
>>>>> I am using flink version 1.15.
>>>>>
>>>>> Thanks,
>>>>> Hatem
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to