[ https://issues.apache.org/jira/browse/FLINK-14077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16929999#comment-16929999 ]
Yun Tang commented on FLINK-14077: ---------------------------------- Hi Jun, I think this is not a Flink bug but a Kafka bug, please refer to KAFKA-4950 > get java.util.ConcurrentModificationException when push metrics to > PushGateway > ------------------------------------------------------------------------------- > > Key: FLINK-14077 > URL: https://issues.apache.org/jira/browse/FLINK-14077 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics > Affects Versions: 1.9.0 > Reporter: Jun Zhang > Priority: Major > > When my flink program is running for a while, I get the following error > message > {code:java} > 2019-09-15 10:11:28,058 WARN > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed > to push metrics to PushGateway with jobName > flinkjob_bb51bc6919b89a3e7d278d6666d0ef1d.java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at > java.util.HashSet.<init>(HashSet.java:119) at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:884) > at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:37) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:27) > at > org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:224) > at > org.apache.flink.shaded.io.prometheus.client.Gauge.collect(Gauge.java:295) > at > org.apache.flink.shaded.io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:183) > at > org.apache.flink.shaded.io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:216) > at > org.apache.flink.shaded.io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:137) > at > org.apache.flink.shaded.io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22) > at > org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:290) > at > org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) > at > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > my flink job is a stream job, write to hbase from kafka stream,my kafka > version is 0.10,the flink version is 1.9.0 ,the metrics conf is : > {code:java} > metrics.reporters: promgateway > metrics.reporter.promgateway.class: > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter > metrics.reporter.promgateway.host: ************ > metrics.reporter.promgateway.port: 9091 > metrics.reporter.promgateway.jobName: flinkjob_ > metrics.reporter.promgateway.randomJobNameSuffix: true > metrics.reporter.promgateway.deleteOnShutdown: true > {code} > -- This message was sent by Atlassian Jira (v8.3.2#803003)