Hi Juilo,

As Gordon said, the `records_lag_max` metric is a Kafka-shipped metric [1].
And I also found this thread [2] in Kafka mailing list. It seems that it is
the
design inner Kafka. So I think there is nothing we can do in
Flink-Kafka-Connector.

BTW, the Kafka document [1] said `records_lag_max` is the maximum lag
in terms of number of records for any partition in this "window". I'm not
sure
what this "window" means and if it is configurable. If it is configurable,
then
you can directly pass the config argument to Flink-Kafka-Connector to set
kafka consumer.

Best,
Tony Wei

[1] https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics
[2] https://lists.apache.org/thread.html/92475e5eb0c1a5fd08e49c30b3fef4
5213b8626e8fea8d52993c0d8c@%3Cusers.kafka.apache.org%3E

2018-07-31 1:36 GMT+08:00 Julio Biason <julio.bia...@azion.com>:

> Hey Gordon,
>
> (Reviving this long thread) I think I found part of the problem: It seems
> the metric is capturing the lag from time to time and reseting the value
> in-between. I managed to replicate this attaching a SQL Sink
> (JDBCOutputFormat) connecting to an outside database -- something that took
> about 2 minutes to write 500 records.
>
> I opened the ticket https://issues.apache.org/jira/browse/FLINK-9998 with
> a bit more information about this ('cause I completely forgot to open a
> ticket a month ago about this).
>
> On Thu, Jun 14, 2018 at 11:31 AM, Julio Biason <julio.bia...@azion.com>
> wrote:
>
>> Hey Gordon,
>>
>> The job restarted somewhere in the middle of the night (I haven't checked
>> why yet) and now I have this weird status of the first TaskManager with
>> only one valid lag, the second with 2 and the third with none.
>>
>> I dunno if I could see the partition in the logs, but all "numRecordsOut"
>> are increasing over time (attached the screenshot of the graphs).
>>
>> On Thu, Jun 14, 2018 at 5:27 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> > wrote:
>>
>>> Hi,
>>>
>>> Thanks for the extra information. So, there seems to be 2 separate
>>> issues here. I’ll go through them one by one.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>>
>>> After looking at the code changes in FLINK-8419, this unfortunately is a
>>> accidental “break” in the scope of the metric.
>>> In 1.4.0, the Kafka-shipped metrics were exposed under the
>>> “KafkaConsumer” metrics group. After FLINK-8419, this was changed, as you
>>> observed.
>>> In 1.5.0, however, I think the metrics are exposed under both patterns.
>>>
>>> Now, with the fact that some subtasks are returning -Inf for
>>> ‘record-lag-max’:
>>> If I understood the metric semantics correctly, this metric represents
>>> the "max record lag across **partitions subscribed by a Kafka consumer
>>> client**.
>>> So, the only possibility that could think of causing this, is that
>>> either the subtask does not have any partitions assigned to it, or simply
>>> there is a bug with the Kafka client returning this value.
>>>
>>> Is it possible that you verify that all subtasks have a partition
>>> assigned to it? That should be possible by just checking the job status in
>>> the Web UI, and observe the numRecordsOut value for each source subtask.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 13 June 2018 at 2:05:17 PM, Julio Biason (julio.bia...@azion.com)
>>> wrote:
>>>
>>> Hi Gordon,
>>>
>>> We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
>>> driver.
>>>
>>> There are a bunch of flink_taskmanager_job_task_operator_* metrics,
>>> including some about the committed offset for each partition. It seems I
>>> have 4 different records_lag_max with different attempt_id, though, 3 with
>>> -Inf and 1 with a value -- which will give me some more understand of
>>> Prometheus to extract this properly.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>> On the "KafkaConsumer" front, but it only has the "commited_offset" for
>>> each partition.
>>>
>>> On Wed, Jun 13, 2018 at 5:41 AM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> Which Kafka version are you using?
>>>>
>>>> AFAIK, the only recent changes to Kafka connector metrics in the 1.4.x
>>>> series would be FLINK-8419 [1].
>>>> The ‘records_lag_max’ metric is a Kafka-shipped metric simply forwarded
>>>> from the internally used Kafka client, so nothing should have been 
>>>> affected.
>>>>
>>>> Do you see other metrics under the pattern of
>>>> ‘flink_taskmanager_job_task_operator_*’? All Kafka-shipped metrics
>>>> should still follow this pattern.
>>>> If not, could you find the ‘records_lag_max’ metric (or any other
>>>> Kafka-shipped metrics [2]) under the user scope ‘KafkaConsumer’?
>>>>
>>>> The above should provide more insight into what may be wrong here.
>>>>
>>>> - Gordon
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8419
>>>> [2] https://docs.confluent.io/current/kafka/monitoring.html#
>>>> fetch-metrics
>>>>
>>>> On 12 June 2018 at 11:47:51 PM, Julio Biason (julio.bia...@azion.com)
>>>> wrote:
>>>>
>>>> Hey guys,
>>>>
>>>> I just updated our Flink install from 1.4.0 to 1.4.2, but our
>>>> Prometheus monitoring is not getting the current Kafka lag.
>>>>
>>>> After updating to 1.4.2 and making the symlink between
>>>> opt/flink-metrics-prometheus-1.4.2.jar to lib/, I got the metrics back
>>>> on Prometheus, but the most important one, 
>>>> flink_taskmanager_job_task_operator_records_lag_max
>>>> is now returning -Inf.
>>>>
>>>> Did I miss something?
>>>>
>>>> --
>>>> *Julio Biason*, Sofware Engineer
>>>> *AZION*  |  Deliver. Accelerate. Protect.
>>>> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
>>>> <callto:+5551996209291>*99907 0554*
>>>>
>>>>
>>>
>>>
>>> --
>>> *Julio Biason*, Sofware Engineer
>>> *AZION*  |  Deliver. Accelerate. Protect.
>>> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
>>> <callto:+5551996209291>*99907 0554*
>>>
>>>
>>
>>
>> --
>> *Julio Biason*, Sofware Engineer
>> *AZION*  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
>> <callto:+5551996209291>*99907 0554*
>>
>
>
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
> <callto:+5551996209291>*99907 0554*
>

Reply via email to