[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269781#comment-16269781
 ] 

Randall Hauch commented on KAFKA-6252:
--------------------------------------

This also happens when a connector does not properly implement {{stop()}}. 

Connect will call {{stop()}} on a source task to signal that it should "_stop 
trying to poll for new data 
and interrupt any outstanding poll() requests_" (see the 
[JavaDoc|http://kafka.apache.org/10/javadoc/org/apache/kafka/connect/source/SourceTask.html#stop--]
 for this method).

Unfortunately, not all connectors properly adhere to this expectation. Since 
the metrics for the source task are cleaned up only when the worker source 
task's thread completes, a task whose {{poll()}} method blocks forever will 
current prevent its thread from completing. So, we need to change how the 
metrics are cleaned up to ensure this always happens.

> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6252
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>         Environment: Linux
>            Reporter: Alexis Sellier
>
> When a connector crashes, It cannot be restarted and an exception like this 
> is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>       at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
>       at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>       at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to