Hi Hatem,

Could it be that you don't have checkpointing enabled? Flink only commits
its offset when a checkpoint has been completed successfully, as explained
on
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing

Best regards,

Martijn


On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa <m...@hatem.co> wrote:

> Hello,
>
> I have two questions that are related to each other:
>
> *First question:*
>
> I have been trying to set `client.id` to set a kafka client quota
> <https://kafka.apache.org/documentation.html#design_quotas> for
> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
> lot of data from our kafka cluster causing a denial of service for our
> kafka cluster. However `client.id` gets overridden by flink source here
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>.
> How would I enforce quotas for flink kafka source?
>
> *Second question:*
>
> Also something I didn't quite understand when describing our consumer
> group in kafka why I don't see the metadata for the consumer group
> information (consumer id, client id & host) and I get that the consumer
> group has no active members but it's actually active and consuming.
>
> *Example describing a flink consumer group*
>
>> ./kafka-consumer-groups.sh --bootstrap-server kafka-server-address:9092
>>  --describe --group flink-consumer-group
>> Consumer group 'flink-consumer-group' has no active members.
>> GROUP                           TOPIC           PARTITION  CURRENT-OFFSET
>>  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>> flink_consumer_group     topic_name 1         514588965       514689721
>>     100756                                       -
>>    -                     -
>
>
>
> *Example describing a normal consumer group written using a confluent
> kafka python library.*
>
>> ./kafka-consumer-groups.sh ---bootstrap-server kafka-server-address:9092
>> --describe --group python_confluent_kafka_consumer
>> GROUP                                            TOPIC
>>  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>
>>                   HOST           CLIENT-ID
>> python_confluent_kafka_consumer topic_name       1
>>  17279532                  17279908                  376
>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
>> /<HOST-IP> python_confluent_kafka_consumer_client_id
>
>
>
> I am using flink version 1.15.
>
> Thanks,
> Hatem
>
>
>
>

Reply via email to