Repository: kafka
Updated Branches:
  refs/heads/trunk c6e5a32d0 -> 11afff099


KAFKA-5990: Enable generation of metrics docs for Connect (KIP-196)

A new mechanism was added recently to the Metrics framework to make it easier 
to generate the documentation. It uses a registry with a MetricsNameTemplate 
for each metric, and then those templates are used when creating the actual 
metrics. The metrics framework provides utilities that can generate the HTML 
documentation from the registry of templates.

This change moves the recently-added Connect metrics over to use these 
templates and to then generate the metric documentation for Connect.

This PR is based upon #3975 and can be rebased once that has been merged.

Author: Randall Hauch <rha...@gmail.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>

Closes #3987 from rhauch/kafka-5990


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/11afff09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/11afff09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/11afff09

Branch: refs/heads/trunk
Commit: 11afff09908035166febf9b75c410112693ff98c
Parents: c6e5a32
Author: Randall Hauch <rha...@gmail.com>
Authored: Wed Oct 4 11:05:50 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Wed Oct 4 11:05:50 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  11 +-
 checkstyle/suppressions.xml                     |   2 +-
 .../kafka/connect/runtime/ConnectMetrics.java   |  92 ++++---
 .../connect/runtime/ConnectMetricsRegistry.java | 275 +++++++++++++++++++
 .../kafka/connect/runtime/WorkerConnector.java  |  21 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   |  83 ++----
 .../kafka/connect/runtime/WorkerSourceTask.java |  54 +---
 .../kafka/connect/runtime/WorkerTask.java       |  64 ++---
 .../connect/runtime/ConnectMetricsTest.java     |  63 +----
 .../connect/runtime/MockConnectMetrics.java     |   4 +-
 .../connect/runtime/WorkerSinkTaskTest.java     |  35 +--
 .../runtime/WorkerSinkTaskThreadedTest.java     |   2 +
 docs/ops.html                                   |   7 +
 13 files changed, 454 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 7c93a45..d9c455f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -681,7 +681,8 @@ project(':core') {
                                'genAdminClientConfigDocs', 
'genProducerConfigDocs', 'genConsumerConfigDocs',
                                'genKafkaConfigDocs', 'genTopicConfigDocs',
                                ':connect:runtime:genConnectConfigDocs', 
':connect:runtime:genConnectTransformationDocs',
-                               ':streams:genStreamsConfigDocs', 
'genConsumerMetricsDocs', 'genProducerMetricsDocs'], type: Tar) {
+                               ':streams:genStreamsConfigDocs', 
'genConsumerMetricsDocs', 'genProducerMetricsDocs',
+                               ':connect:runtime:genConnectMetricsDocs'], 
type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("$rootDir/docs")
@@ -1189,6 +1190,14 @@ project(':connect:runtime') {
     if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
     standardOutput = new File(generatedDocsDir, 
"connect_transforms.html").newOutputStream()
   }
+
+  task genConnectMetricsDocs(type: JavaExec) {
+    classpath = sourceSets.test.runtimeClasspath
+    main = 'org.apache.kafka.connect.runtime.ConnectMetrics'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, 
"connect_metrics.html").newOutputStream()
+  }
+
 }
 
 project(':connect:file') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b747e4c..7c32eb0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,7 +23,7 @@
               files="AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
-              files="KerberosLogin.java|RequestResponseTest.java"/>
+              
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java"/>
 
     <suppress checks="ParameterNumber"
               files="NetworkClient.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index eb01337..974967a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,7 +47,6 @@ import java.util.concurrent.TimeUnit;
 public class ConnectMetrics {
 
     public static final String JMX_PREFIX = "kafka.connect";
-    public static final String WORKER_ID_TAG_NAME = "worker-id";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ConnectMetrics.class);
 
@@ -54,6 +54,7 @@ public class ConnectMetrics {
     private final Time time;
     private final String workerId;
     private final ConcurrentMap<MetricGroupId, MetricGroup> groupsByName = new 
ConcurrentHashMap<>();
+    private final ConnectMetricsRegistry registry = new 
ConnectMetricsRegistry();
 
     /**
      * Create an instance.
@@ -96,15 +97,12 @@ public class ConnectMetrics {
     }
 
     /**
-     * Get or create a {@link MetricGroup} with the specified group name.
+     * Get the registry of metric names.
      *
-     * @param groupName the name of the metric group; may not be null and must 
be a
-     *                  {@link #checkNameIsValid(String) valid name}
-     * @return the {@link MetricGroup} that can be used to create metrics; 
never null
-     * @throws IllegalArgumentException if the group name is not valid
+     * @return the registry for the Connect metrics; never null
      */
-    public MetricGroup group(String groupName) {
-        return group(groupName, false);
+    public ConnectMetricsRegistry registry() {
+        return registry;
     }
 
     /**
@@ -118,22 +116,7 @@ public class ConnectMetrics {
      * @throws IllegalArgumentException if the group name is not valid
      */
     public MetricGroup group(String groupName, String... tagKeyValues) {
-        return group(groupName, false, tagKeyValues);
-    }
-
-    /**
-     * Get or create a {@link MetricGroup} with the specified group name and 
the given tags.
-     * Each group is uniquely identified by the name and tags.
-     *
-     * @param groupName       the name of the metric group; may not be null 
and must be a
-     *                        {@link #checkNameIsValid(String) valid name}
-     * @param includeWorkerId true if the tags should include the worker ID
-     * @param tagKeyValues    pairs of tag name and values
-     * @return the {@link MetricGroup} that can be used to create metrics; 
never null
-     * @throws IllegalArgumentException if the group name is not valid
-     */
-    public MetricGroup group(String groupName, boolean includeWorkerId, 
String... tagKeyValues) {
-        MetricGroupId groupId = groupId(groupName, includeWorkerId, 
tagKeyValues);
+        MetricGroupId groupId = groupId(groupName, tagKeyValues);
         MetricGroup group = groupsByName.get(groupId);
         if (group == null) {
             group = new MetricGroup(groupId);
@@ -143,9 +126,9 @@ public class ConnectMetrics {
         return group;
     }
 
-    protected MetricGroupId groupId(String groupName, boolean includeWorkerId, 
String... tagKeyValues) {
+    protected MetricGroupId groupId(String groupName, String... tagKeyValues) {
         checkNameIsValid(groupName);
-        Map<String, String> tags = tags(includeWorkerId ? workerId : null, 
tagKeyValues);
+        Map<String, String> tags = tags(tagKeyValues);
         return new MetricGroupId(groupName, tags);
     }
 
@@ -174,8 +157,8 @@ public class ConnectMetrics {
         private final String str;
 
         public MetricGroupId(String groupName, Map<String, String> tags) {
-            assert groupName != null;
-            assert tags != null;
+            Objects.requireNonNull(groupName);
+            Objects.requireNonNull(tags);
             this.groupName = groupName;
             this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
             this.hc = Objects.hash(this.groupName, this.tags);
@@ -253,21 +236,35 @@ public class ConnectMetrics {
          * @param groupId the identifier of the group; may not be null and 
must be valid
          */
         protected MetricGroup(MetricGroupId groupId) {
+            Objects.requireNonNull(groupId);
             this.groupId = groupId;
             sensorPrefix = "connect-sensor-group: " + groupId.toString() + ";";
         }
 
         /**
+         * Get the group identifier.
+         *
+         * @return the group identifier; never null
+         */
+        public MetricGroupId groupId() {
+            return groupId;
+        }
+
+        /**
          * Create the name of a metric that belongs to this group and has the 
group's tags.
          *
-         * @param name the name of the metric/attribute; may not be null and 
must be valid
-         * @param desc the description for the metric/attribute; may not be 
null
+         * @param template the name template for the metric; may not be null
          * @return the metric name; never null
          * @throws IllegalArgumentException if the name is not valid
          */
-        public MetricName metricName(String name, String desc) {
-            checkNameIsValid(name);
-            return metrics.metricName(name, groupId.groupName(), desc, 
groupId.tags());
+        public MetricName metricName(MetricNameTemplate template) {
+            checkNameIsValid(template.name());
+            return metrics.metricInstance(template, groupId.tags());
+        }
+
+        // for testing only
+        MetricName metricName(String name) {
+            return metrics.metricName(name, groupId.groupName(), "", 
groupId.tags());
         }
 
         /**
@@ -275,7 +272,7 @@ public class ConnectMetrics {
          * <p>
          * Do not use this to add {@link Sensor Sensors}, since they will not 
be removed when this group is
          * {@link #close() closed}. Metrics can be added directly, as long as 
the metric names are obtained from
-         * this group via the {@link #metricName(String, String)} method.
+         * this group via the {@link #metricName(MetricNameTemplate)} method.
          *
          * @return the metrics; never null
          */
@@ -295,14 +292,12 @@ public class ConnectMetrics {
         /**
          * Add to this group an indicator metric with a function that will be 
used to obtain the indicator state.
          *
-         * @param name        the name of the metric; may not be null and must 
be a
-         *                    {@link #checkNameIsValid(String) valid name}
-         * @param description the description of the metric; may not be null
-         * @param predicate   the predicate function used to determine the 
indicator state; may not be null
+         * @param nameTemplate the name template for the metric; may not be 
null
+         * @param predicate    the predicate function used to determine the 
indicator state; may not be null
          * @throws IllegalArgumentException if the name is not valid
          */
-        public void addIndicatorMetric(String name, String description, final 
IndicatorPredicate predicate) {
-            MetricName metricName = metricName(name, description);
+        public void addIndicatorMetric(MetricNameTemplate nameTemplate, final 
IndicatorPredicate predicate) {
+            MetricName metricName = metricName(nameTemplate);
             if (metrics().metric(metricName) == null) {
                 metrics().addMetric(metricName, new Measurable() {
                     @Override
@@ -411,17 +406,13 @@ public class ConnectMetrics {
      * Create a set of tags using the supplied key and value pairs. Every tag 
name and value will be
      * {@link #makeValidName(String) made valid} before it is used. The order 
of the tags will be kept.
      *
-     * @param workerId the worker ID that should be included first in the 
tags; may be null if not to be included
      * @param keyValue the key and value pairs for the tags; must be an even 
number
      * @return the map of tags that can be supplied to the {@link Metrics} 
methods; never null
      */
-    static Map<String, String> tags(String workerId, String... keyValue) {
+    static Map<String, String> tags(String... keyValue) {
         if ((keyValue.length % 2) != 0)
             throw new IllegalArgumentException("keyValue needs to be specified 
in pairs");
         Map<String, String> tags = new LinkedHashMap<>();
-        if (workerId != null && !workerId.trim().isEmpty()) {
-            tags.put(WORKER_ID_TAG_NAME, makeValidName(workerId));
-        }
         for (int i = 0; i < keyValue.length; i += 2) {
             tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 
1]));
         }
@@ -457,4 +448,15 @@ public class ConnectMetrics {
             throw new IllegalArgumentException("The name '" + name + "' 
contains at least one invalid character");
         }
     }
+
+    /**
+     * Utility to generate the documentation for the Connect metrics.
+     *
+     * @param args the arguments
+     */
+    public static void main(String[] args) {
+        ConnectMetricsRegistry metrics = new ConnectMetricsRegistry();
+        System.out.println(Metrics.toHtmlTable("kafka.connect", 
metrics.getAllTemplates()));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
new file mode 100644
index 0000000..ee513c9
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConnectMetricsRegistry {
+
+    public static final String CONNECTOR_TAG_NAME = "connector";
+    public static final String TASK_TAG_NAME = "task";
+    public static final String CONNECTOR_GROUP_NAME = "connector-metrics";
+    public static final String TASK_GROUP_NAME = "connector-task-metrics";
+    public static final String SOURCE_TASK_GROUP_NAME = "source-task-metrics";
+    public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
+
+    private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
+    public final MetricNameTemplate connectorStatusRunning;
+    public final MetricNameTemplate connectorStatusPaused;
+    public final MetricNameTemplate connectorStatusFailed;
+    public final MetricNameTemplate taskStatusUnassigned;
+    public final MetricNameTemplate taskStatusRunning;
+    public final MetricNameTemplate taskStatusPaused;
+    public final MetricNameTemplate taskStatusFailed;
+    public final MetricNameTemplate taskStatusDestroyed;
+    public final MetricNameTemplate taskRunningRatio;
+    public final MetricNameTemplate taskPauseRatio;
+    public final MetricNameTemplate taskCommitTimeMax;
+    public final MetricNameTemplate taskCommitTimeAvg;
+    public final MetricNameTemplate taskBatchSizeMax;
+    public final MetricNameTemplate taskBatchSizeAvg;
+    public final MetricNameTemplate taskCommitFailurePercentage;
+    public final MetricNameTemplate taskCommitSuccessPercentage;
+    public final MetricNameTemplate sourceRecordPollRate;
+    public final MetricNameTemplate sourceRecordPollTotal;
+    public final MetricNameTemplate sourceRecordWriteRate;
+    public final MetricNameTemplate sourceRecordWriteTotal;
+    public final MetricNameTemplate sourceRecordPollBatchTimeMax;
+    public final MetricNameTemplate sourceRecordPollBatchTimeAvg;
+    public final MetricNameTemplate sourceRecordActiveCount;
+    public final MetricNameTemplate sourceRecordActiveCountMax;
+    public final MetricNameTemplate sourceRecordActiveCountAvg;
+    public final MetricNameTemplate sinkRecordReadRate;
+    public final MetricNameTemplate sinkRecordReadTotal;
+    public final MetricNameTemplate sinkRecordSendRate;
+    public final MetricNameTemplate sinkRecordSendTotal;
+    public final MetricNameTemplate sinkRecordLagMax;
+    public final MetricNameTemplate sinkRecordPartitionCount;
+    public final MetricNameTemplate sinkRecordOffsetCommitSeqNum;
+    public final MetricNameTemplate sinkRecordOffsetCommitCompletionRate;
+    public final MetricNameTemplate sinkRecordOffsetCommitCompletionTotal;
+    public final MetricNameTemplate sinkRecordOffsetCommitSkipRate;
+    public final MetricNameTemplate sinkRecordOffsetCommitSkipTotal;
+    public final MetricNameTemplate sinkRecordPutBatchTimeMax;
+    public final MetricNameTemplate sinkRecordPutBatchTimeAvg;
+    public final MetricNameTemplate sinkRecordActiveCount;
+    public final MetricNameTemplate sinkRecordActiveCountMax;
+    public final MetricNameTemplate sinkRecordActiveCountAvg;
+
+    public ConnectMetricsRegistry() {
+        this(new LinkedHashSet<String>());
+    }
+
+    public ConnectMetricsRegistry(Set<String> tags) {
+        /***** Connector level *****/
+        Set<String> connectorTags = new LinkedHashSet<>(tags);
+        connectorTags.add(CONNECTOR_TAG_NAME);
+
+        connectorStatusRunning = createTemplate("status-running", 
CONNECTOR_GROUP_NAME,
+                                                "Signals whether the connector 
is in the running state.", connectorTags);
+        connectorStatusPaused = createTemplate("status-paused", 
CONNECTOR_GROUP_NAME,
+                                               "Signals whether the connector 
is in the paused state.", connectorTags);
+        connectorStatusFailed = createTemplate("status-failed", 
CONNECTOR_GROUP_NAME,
+                                               "Signals whether the connector 
is in the failed state.", connectorTags);
+
+        /***** Worker task level *****/
+        Set<String> workerTaskTags = new LinkedHashSet<>(tags);
+        workerTaskTags.add(CONNECTOR_TAG_NAME);
+        workerTaskTags.add(TASK_TAG_NAME);
+
+        taskStatusUnassigned = createTemplate("status-unassigned", 
TASK_GROUP_NAME, "Signals whether this task is in the unassigned state.",
+                                              workerTaskTags);
+        taskStatusRunning = createTemplate("status-running", TASK_GROUP_NAME, 
"Signals whether this task is in the running state.",
+                                           workerTaskTags);
+        taskStatusPaused = createTemplate("status-paused", TASK_GROUP_NAME, 
"Signals whether this task is in the paused state.",
+                                          workerTaskTags);
+        taskStatusFailed = createTemplate("status-failed", TASK_GROUP_NAME, 
"Signals whether this task is in the failed state.",
+                                          workerTaskTags);
+        taskStatusDestroyed = createTemplate("status-destroyed", 
TASK_GROUP_NAME, "Signals whether this task is in the destroyed state.",
+                                             workerTaskTags);
+        taskRunningRatio = createTemplate("running-ratio", TASK_GROUP_NAME,
+                                          "The fraction of time this task has 
spent in the running state.", workerTaskTags);
+        taskPauseRatio = createTemplate("pause-ratio", TASK_GROUP_NAME, "The 
fraction of time this task has spent in the pause state.",
+                                        workerTaskTags);
+        taskCommitTimeMax = createTemplate("offset-commit-max-time-ms", 
TASK_GROUP_NAME,
+                                           "The maximum time in milliseconds 
taken by this task to commit offsets.", workerTaskTags);
+        taskCommitTimeAvg = createTemplate("offset-commit-avg-time-ms", 
TASK_GROUP_NAME,
+                                           "The average time in milliseconds 
taken by this task to commit offsets.", workerTaskTags);
+        taskBatchSizeMax = createTemplate("batch-size-max", TASK_GROUP_NAME, 
"The maximum size of the batches processed by the connector.",
+                                          workerTaskTags);
+        taskBatchSizeAvg = createTemplate("batch-size-avg", TASK_GROUP_NAME, 
"The average size of the batches processed by the connector.",
+                                          workerTaskTags);
+        taskCommitFailurePercentage = 
createTemplate("offset-commit-failure-percentage", TASK_GROUP_NAME,
+                                                     "The average percentage 
of this task's offset commit attempts that failed.",
+                                                     workerTaskTags);
+        taskCommitSuccessPercentage = 
createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME,
+                                                     "The average percentage 
of this task's offset commit attempts that succeeded.",
+                                                     workerTaskTags);
+
+        /***** Source worker task level *****/
+        Set<String> sourceTaskTags = new LinkedHashSet<>(tags);
+        sourceTaskTags.add(CONNECTOR_TAG_NAME);
+        sourceTaskTags.add(TASK_TAG_NAME);
+
+        sourceRecordPollRate = createTemplate("source-record-poll-rate", 
SOURCE_TASK_GROUP_NAME,
+                                              "The average per-second number 
of records produced/polled (before transformation) by " +
+                                              "this task belonging to the 
named source connector in this worker.",
+                                              sourceTaskTags);
+        sourceRecordPollTotal = createTemplate("source-record-poll-total", 
SOURCE_TASK_GROUP_NAME,
+                                               "The total number of records 
produced/polled (before transformation) by this task " +
+                                               "belonging to the named source 
connector in this worker.",
+                                               sourceTaskTags);
+        sourceRecordWriteRate = createTemplate("source-record-write-rate", 
SOURCE_TASK_GROUP_NAME,
+                                               "The average per-second number 
of records output from the transformations and written" +
+                                               " to Kafka for this task 
belonging to the named source connector in this worker. This" +
+                                               " is after transformations are 
applied and excludes any records filtered out by the " +
+                                               "transformations.",
+                                               sourceTaskTags);
+        sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
+                                                "The number of records output 
from the transformations and written to Kafka for this" +
+                                                " task belonging to the named 
source connector in this worker, since the task was " +
+                                                "last restarted.",
+                                                sourceTaskTags);
+        sourceRecordPollBatchTimeMax = 
createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME,
+                                                      "The maximum time in 
milliseconds taken by this task to poll for a batch of " +
+                                                      "source records.",
+                                                      sourceTaskTags);
+        sourceRecordPollBatchTimeAvg = 
createTemplate("poll-batch-avg-time-ms", SOURCE_TASK_GROUP_NAME,
+                                                      "The average time in 
milliseconds taken by this task to poll for a batch of " +
+                                                      "source records.",
+                                                      sourceTaskTags);
+        sourceRecordActiveCount = createTemplate("source-record-active-count", 
SOURCE_TASK_GROUP_NAME,
+                                                 "The number of records that 
have been produced by this task but not yet completely " +
+                                                 "written to Kafka.",
+                                                 sourceTaskTags);
+        sourceRecordActiveCountMax = 
createTemplate("source-record-active-count-max", SOURCE_TASK_GROUP_NAME,
+                                                    "The maximum number of 
records that have been produced by this task but not yet " +
+                                                    "completely written to 
Kafka.",
+                                                    sourceTaskTags);
+        sourceRecordActiveCountAvg = 
createTemplate("source-record-active-count-avg", SOURCE_TASK_GROUP_NAME,
+                                                    "The average number of 
records that have been produced by this task but not yet " +
+                                                    "completely written to 
Kafka.",
+                                                    sourceTaskTags);
+
+        /***** Sink worker task level *****/
+        Set<String> sinkTaskTags = new LinkedHashSet<>(tags);
+        sinkTaskTags.add(CONNECTOR_TAG_NAME);
+        sinkTaskTags.add(TASK_TAG_NAME);
+
+        sinkRecordReadRate = createTemplate("sink-record-read-rate", 
SINK_TASK_GROUP_NAME,
+                                            "The average per-second number of 
records read from Kafka for this task belonging to the" +
+                                            " named sink connector in this 
worker. This is before transformations are applied.",
+                                            sinkTaskTags);
+        sinkRecordReadTotal = createTemplate("sink-record-read-total", 
SINK_TASK_GROUP_NAME,
+                                             "The total number of records read 
from Kafka by this task belonging to the named sink " +
+                                             "connector in this worker, since 
the task was last restarted.",
+                                             sinkTaskTags);
+        sinkRecordSendRate = createTemplate("sink-record-send-rate", 
SINK_TASK_GROUP_NAME,
+                                            "The average per-second number of 
records output from the transformations and sent/put " +
+                                            "to this task belonging to the 
named sink connector in this worker. This is after " +
+                                            "transformations are applied and 
excludes any records filtered out by the " +
+                                            "transformations.",
+                                            sinkTaskTags);
+        sinkRecordSendTotal = createTemplate("sink-record-send-total", 
SINK_TASK_GROUP_NAME,
+                                             "The total number of records 
output from the transformations and sent/put to this task " +
+                                             "belonging to the named sink 
connector in this worker, since the task was last " +
+                                             "restarted.",
+                                             sinkTaskTags);
+        sinkRecordLagMax = createTemplate("sink-record-lag-max", 
SINK_TASK_GROUP_NAME,
+                                          "The maximum lag in terms of number 
of records that the sink task is behind the consumer's " +
+                                          "position for any topic partitions.",
+                                          sinkTaskTags);
+        sinkRecordPartitionCount = createTemplate("partition-count", 
SINK_TASK_GROUP_NAME,
+                                                  "The number of topic 
partitions assigned to this task belonging to the named sink " +
+                                                  "connector in this worker.",
+                                                  sinkTaskTags);
+        sinkRecordOffsetCommitSeqNum = createTemplate("offset-commit-seq-no", 
SINK_TASK_GROUP_NAME,
+                                                      "The current sequence 
number for offset commits.", sinkTaskTags);
+        sinkRecordOffsetCommitCompletionRate = 
createTemplate("offset-commit-completion-rate", SINK_TASK_GROUP_NAME,
+                                                              "The average 
per-second number of offset commit completions that were " +
+                                                              "completed 
successfully.",
+                                                              sinkTaskTags);
+        sinkRecordOffsetCommitCompletionTotal = 
createTemplate("offset-commit-completion-total", SINK_TASK_GROUP_NAME,
+                                                               "The total 
number of offset commit completions that were completed " +
+                                                               "successfully.",
+                                                               sinkTaskTags);
+        sinkRecordOffsetCommitSkipRate = 
createTemplate("offset-commit-skip-rate", SINK_TASK_GROUP_NAME,
+                                                        "The average 
per-second number of offset commit completions that were " +
+                                                        "received too late and 
skipped/ignored.",
+                                                        sinkTaskTags);
+        sinkRecordOffsetCommitSkipTotal = 
createTemplate("offset-commit-skip-total", SINK_TASK_GROUP_NAME,
+                                                         "The total number of 
offset commit completions that were received too late " +
+                                                         "and 
skipped/ignored.",
+                                                         sinkTaskTags);
+        sinkRecordPutBatchTimeMax = createTemplate("put-batch-max-time-ms", 
SINK_TASK_GROUP_NAME,
+                                                   "The maximum time taken by 
this task to put a batch of sinks records.", sinkTaskTags);
+        sinkRecordPutBatchTimeAvg = createTemplate("put-batch-avg-time-ms", 
SINK_TASK_GROUP_NAME,
+                                                   "The average time taken by 
this task to put a batch of sinks records.", sinkTaskTags);
+        sinkRecordActiveCount = createTemplate("sink-record-active-count", 
SINK_TASK_GROUP_NAME,
+                                               "The number of records that 
have been read from Kafka but not yet completely " +
+                                               "committed/flushed/acknowledged 
by the sink task.",
+                                               sinkTaskTags);
+        sinkRecordActiveCountMax = 
createTemplate("sink-record-active-count-max", SINK_TASK_GROUP_NAME,
+                                                  "The maximum number of 
records that have been read from Kafka but not yet completely " +
+                                                  
"committed/flushed/acknowledged by the sink task.",
+                                                  sinkTaskTags);
+        sinkRecordActiveCountAvg = 
createTemplate("sink-record-active-count-avg", SINK_TASK_GROUP_NAME,
+                                                  "The average number of 
records that have been read from Kafka but not yet completely " +
+                                                  
"committed/flushed/acknowledged by the sink task.",
+                                                  sinkTaskTags);
+    }
+
+    private MetricNameTemplate createTemplate(String name, String group, 
String doc, Set<String> tags) {
+        MetricNameTemplate template = new MetricNameTemplate(name, group, doc, 
tags);
+        allTemplates.add(template);
+        return template;
+    }
+
+    public List<MetricNameTemplate> getAllTemplates() {
+        return Collections.unmodifiableList(allTemplates);
+    }
+
+    public String connectorTagName() {
+        return CONNECTOR_TAG_NAME;
+    }
+
+    public String taskTagName() {
+        return TASK_TAG_NAME;
+    }
+
+    public String connectorGroupName() {
+        return CONNECTOR_GROUP_NAME;
+    }
+
+    public String taskGroupName() {
+        return TASK_GROUP_NAME;
+    }
+
+    public String sinkTaskGroupName() {
+        return SINK_TASK_GROUP_NAME;
+    }
+
+    public String sourceTaskGroupName() {
+        return SOURCE_TASK_GROUP_NAME;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 874edd5..21104bd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
@@ -225,19 +226,17 @@ public class WorkerConnector {
         public ConnectorMetricsGroup(ConnectMetrics connectMetrics, 
AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
             this.delegate = delegate;
             this.state = initialState;
-            this.metricGroup = connectMetrics.group("connector-metrics",
-                    "connector", connName);
-
-            addStateMetric(AbstractStatus.State.RUNNING, "status-running",
-                    "Signals whether the connector task is in the running 
state.");
-            addStateMetric(AbstractStatus.State.PAUSED, "status-paused",
-                    "Signals whether the connector task is in the paused 
state.");
-            addStateMetric(AbstractStatus.State.FAILED, "status-failed",
-                    "Signals whether the connector task is in the failed 
state.");
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            this.metricGroup = 
connectMetrics.group(registry.connectorGroupName(),
+                    registry.connectorTagName(), connName);
+
+            addStateMetric(AbstractStatus.State.RUNNING, 
registry.connectorStatusRunning);
+            addStateMetric(AbstractStatus.State.PAUSED, 
registry.connectorStatusPaused);
+            addStateMetric(AbstractStatus.State.FAILED, 
registry.connectorStatusFailed);
         }
 
-        private void addStateMetric(final AbstractStatus.State matchingState, 
String name, String description) {
-            metricGroup.addIndicatorMetric(name, description, new 
IndicatorPredicate() {
+        private void addStateMetric(final AbstractStatus.State matchingState, 
MetricNameTemplate nameTemplate) {
+            metricGroup.addIndicatorMetric(nameTemplate, new 
IndicatorPredicate() {
                 @Override
                 public boolean matches() {
                     return state == matchingState;

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3cb68b7..234ce8a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -268,7 +268,9 @@ class WorkerSinkTask extends WorkerTask {
         log.info("{} Sink task finished initialization and start", this);
     }
 
-    /** Poll for new messages with the given timeout. Should only be invoked 
by the worker thread. */
+    /**
+     * Poll for new messages with the given timeout. Should only be invoked by 
the worker thread.
+     */
     protected void poll(long timeoutMs) {
         rewind();
         long retryTimeout = context.timeout();
@@ -627,6 +629,8 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     static class SinkTaskMetricsGroup {
+        private final ConnectorTaskId id;
+        private final ConnectMetrics metrics;
         private final MetricGroup metricGroup;
         private final Sensor sinkRecordRead;
         private final Sensor sinkRecordSend;
@@ -641,77 +645,44 @@ class WorkerSinkTask extends WorkerTask {
         private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
 
         public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics) {
-            metricGroup = connectMetrics.group("sink-task-metrics",
-                    "connector", id.connector(), "task", 
Integer.toString(id.task()));
+            this.metrics = connectMetrics;
+            this.id = id;
+
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup = connectMetrics
+                                  .group(registry.sinkTaskGroupName(), 
registry.connectorTagName(), id.connector(), registry.taskTagName(),
+                                         Integer.toString(id.task()));
 
             sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
-            sinkRecordRead.add(metricGroup.metricName("sink-record-read-rate",
-                    "The average per-second number of records read from Kafka 
for this task belonging to the " +
-                            "named sink connector in this worker. This is 
before transformations are applied."),
-                    new Rate());
-            sinkRecordRead.add(metricGroup.metricName("sink-record-read-total",
-                    "The total number of records produced/polled (before 
transformation) by this task belonging " +
-                            "to the named sink connector in this worker, since 
the task was last restarted."),
-                    new Total());
+            
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new 
Rate());
+            
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new 
Total());
 
             sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
-            sinkRecordSend.add(metricGroup.metricName("sink-record-send-rate",
-                    "The average per-second number of records output from the 
transformations and sent/put to " +
-                            "this task belonging to the named sink connector 
in this worker. This is after transformations " +
-                            "are applied and excludes any records filtered out 
by the transformations."),
-                    new Rate());
-            sinkRecordSend.add(metricGroup.metricName("sink-record-send-total",
-                    "The total number of records output from the 
transformations and sent/put to this task " +
-                            "belonging to the named sink connector in this 
worker, since the task was last restarted."),
-                    new Total());
+            
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new 
Rate());
+            
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new 
Total());
 
             sinkRecordActiveCount = 
metricGroup.metrics().sensor("sink-record-active-count");
-            
sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count",
-                    "The number of records that have been read from Kafka but 
not yet completely committed/flushed/acknowledged" +
-                        "by the sink task"),
-                    new Value());
-            
sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-max",
-                    "The maximum number of records that have been read from 
Kafka but not yet completely committed/flushed/acknowledged" +
-                        "by the sink task"),
-                    new Max());
-            
sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-avg",
-                    "The average number of records that have been read from 
Kafka but not yet completely committed/flushed/acknowledged" +
-                        "by the sink task"),
-                    new Avg());
+            
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount),
 new Value());
+            
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax),
 new Max());
+            
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg),
 new Avg());
 
             partitionCount = metricGroup.metrics().sensor("partition-count");
-            partitionCount.add(metricGroup.metricName("partition-count",
-                    "The number of topic partitions assigned to this task 
belonging to the named sink connector in this worker."),
-                    new Value());
+            
partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), 
new Value());
 
             offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
-            offsetSeqNum.add(metricGroup.metricName("offset-commit-seq-no",
-                    "The current sequence number for offset commits"),
-                    new Value());
+            
offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), 
new Value());
 
             offsetCompletion = 
metricGroup.metrics().sensor("offset-commit-completion");
-            
offsetCompletion.add(metricGroup.metricName("offset-commit-completion-rate",
-                    "The average per-second number of offset commit 
completions that were completed successfully."),
-                    new Rate());
-            
offsetCompletion.add(metricGroup.metricName("offset-commit-completion-total",
-                    "The total number of offset commit completions that were 
completed successfully."),
-                    new Total());
+            
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate),
 new Rate());
+            
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal),
 new Total());
 
             offsetCompletionSkip = 
metricGroup.metrics().sensor("offset-commit-completion-skip");
-            
offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-rate",
-                    "The average per-second number of offset commit 
completions that were received too late and skipped/ignored."),
-                    new Rate());
-            
offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-total",
-                    "The total number of offset commit completions that were 
received too late and skipped/ignored."),
-                    new Total());
+            
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate),
 new Rate());
+            
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal),
 new Total());
 
             putBatchTime = metricGroup.metrics().sensor("put-batch-time");
-            putBatchTime.add(metricGroup.metricName("put-batch-max-time-ms",
-                    "The maximum time taken by this task to put a batch of 
sinks records."),
-                    new Max());
-            putBatchTime.add(metricGroup.metricName("put-batch-avg-time-ms",
-                    "The average time taken by this task to put a batch of 
sinks records."),
-                    new Avg());
+            
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), 
new Max());
+            
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), 
new Avg());
         }
 
         void computeSinkRecordLag() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9a187d3..9072cd4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -490,51 +490,27 @@ class WorkerSourceTask extends WorkerTask {
         private int activeRecordCount;
 
         public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics) {
-            metricGroup = connectMetrics.group("source-task-metrics",
-                    "connector", id.connector(), "task", 
Integer.toString(id.task()));
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
+                    registry.connectorTagName(), id.connector(),
+                    registry.taskTagName(), Integer.toString(id.task()));
 
             sourceRecordPoll = metricGroup.sensor("source-record-poll");
-            
sourceRecordPoll.add(metricGroup.metricName("source-record-poll-rate",
-                    "The average per-second number of records produced/polled 
(before transformation) by this " +
-                            "task belonging to the named source connector in 
this worker."),
-                    new Rate());
-            
sourceRecordPoll.add(metricGroup.metricName("source-record-poll-total",
-                    "The number of records produced/polled (before 
transformation) by this task belonging to " +
-                            "the named source connector in this worker, since 
the task was last restarted."),
-                    new Total());
+            
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new 
Rate());
+            
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), 
new Total());
 
             sourceRecordWrite = metricGroup.sensor("source-record-write");
-            
sourceRecordWrite.add(metricGroup.metricName("source-record-write-rate",
-                    "The average per-second number of records output from the 
transformations and written to " +
-                            "Kafka for this task belonging to the named source 
connector in this worker. " +
-                            "This is after transformations are applied and 
excludes any records filtered out " +
-                            "by the transformations."),
-                    new Rate());
-            
sourceRecordWrite.add(metricGroup.metricName("source-record-write-total",
-                    "The number of records output from the transformations and 
written to Kafka for this task " +
-                            "belonging to the named source connector in this 
worker, since the task was last " +
-                            "restarted."),
-                    new Total());
-
+            
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), 
new Rate());
+            
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), 
new Total());
 
             pollTime = metricGroup.sensor("poll-batch-time");
-            pollTime.add(metricGroup.metricName("poll-batch-max-time-ms",
-                    "The maximum time in milliseconds taken by this task to 
poll for a batch of source records"),
-                    new Max());
-            pollTime.add(metricGroup.metricName("poll-batch-avg-time-ms",
-                    "The average time in milliseconds taken by this task to 
poll for a batch of source records"),
-                    new Avg());
-
-            sourceRecordActiveCount = 
metricGroup.metrics().sensor("source-record-active-count");
-            
sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count",
-                    "The number of records that have been produced by this 
task but not yet completely written to Kafka."),
-                    new Value());
-            
sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-max",
-                    "The maximum number of records that have been produced by 
this task but not yet completely written to Kafka."),
-                    new Max());
-            
sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-avg",
-                    "The average number of records that have been produced by 
this task but not yet completely written to Kafka."),
-                    new Avg());
+            
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new 
Max());
+            
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new 
Avg());
+
+            sourceRecordActiveCount = 
metricGroup.metrics().sensor("sink-record-active-count");
+            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount),
 new Value());
+            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax),
 new Max());
+            
sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg),
 new Avg());
         }
 
         void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index e4af516..6499ac2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Sensor;
@@ -307,52 +308,37 @@ abstract class WorkerTask implements Runnable {
             delegateListener = statusListener;
             time = connectMetrics.time();
             taskStateTimer = new StateTracker();
-            metricGroup = connectMetrics.group("connector-tasks",
-                    "connector", id.connector(), "task", 
Integer.toString(id.task()));
-
-            addTaskStateMetric(State.UNASSIGNED, "status-unassigned",
-                    "Signals whether the connector task is in the unassigned 
state.");
-            addTaskStateMetric(State.RUNNING, "status-running",
-                    "Signals whether the connector task is in the running 
state.");
-            addTaskStateMetric(State.PAUSED, "status-paused",
-                    "Signals whether the connector task is in the paused 
state.");
-            addTaskStateMetric(State.FAILED, "status-failed",
-                    "Signals whether the connector task is in the failed 
state.");
-            addTaskStateMetric(State.DESTROYED, "status-destroyed",
-                    "Signals whether the connector task is in the destroyed 
state.");
-
-            addRatioMetric(State.RUNNING, "running-ratio",
-                    "The fraction of time this task has spent in the running 
state.");
-            addRatioMetric(State.PAUSED, "pause-ratio",
-                    "The fraction of time this task has spent in the paused 
state.");
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup = connectMetrics.group(registry.taskGroupName(),
+                    registry.connectorTagName(), id.connector(),
+                    registry.taskTagName(), Integer.toString(id.task()));
+
+            addTaskStateMetric(State.UNASSIGNED, 
registry.taskStatusUnassigned);
+            addTaskStateMetric(State.RUNNING, registry.taskStatusRunning);
+            addTaskStateMetric(State.PAUSED, registry.taskStatusPaused);
+            addTaskStateMetric(State.FAILED, registry.taskStatusDestroyed);
+            addTaskStateMetric(State.DESTROYED, registry.taskStatusDestroyed);
+
+            addRatioMetric(State.RUNNING, registry.taskRunningRatio);
+            addRatioMetric(State.PAUSED, registry.taskPauseRatio);
 
             commitTime = metricGroup.sensor("commit-time");
-            commitTime.add(metricGroup.metricName("offset-commit-max-time-ms",
-                    "The maximum time in milliseconds taken by this task to 
commit offsets"),
-                    new Max());
-            commitTime.add(metricGroup.metricName("offset-commit-avg-time-ms",
-                    "The average time in milliseconds taken by this task to 
commit offsets"),
-                    new Avg());
+            commitTime.add(metricGroup.metricName(registry.taskCommitTimeMax), 
new Max());
+            commitTime.add(metricGroup.metricName(registry.taskCommitTimeAvg), 
new Avg());
 
             batchSize = metricGroup.sensor("batch-size");
-            batchSize.add(metricGroup.metricName("batch-size-max",
-                    "The maximum size of the batches processed by the 
connector"),
-                    new Max());
-            batchSize.add(metricGroup.metricName("batch-size-avg",
-                    "The average size of the batches processed by the 
connector"),
-                    new Avg());
-
-            MetricName offsetCommitFailures = 
metricGroup.metricName("offset-commit-failure-percentage",
-                    "The average percentage of this task's offset commit 
attempts that failed");
-            MetricName offsetCommitSucceeds = 
metricGroup.metricName("offset-commit-success-percentage",
-                    "The average percentage of this task's offset commit 
attempts that failed");
+            batchSize.add(metricGroup.metricName(registry.taskBatchSizeMax), 
new Max());
+            batchSize.add(metricGroup.metricName(registry.taskBatchSizeAvg), 
new Avg());
+
+            MetricName offsetCommitFailures = 
metricGroup.metricName(registry.taskCommitFailurePercentage);
+            MetricName offsetCommitSucceeds = 
metricGroup.metricName(registry.taskCommitSuccessPercentage);
             Frequencies commitFrequencies = 
Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
             commitAttempts = metricGroup.sensor("offset-commit-completion");
             commitAttempts.add(commitFrequencies);
         }
 
-        private void addTaskStateMetric(final State matchingState, String 
name, String description) {
-            metricGroup.addIndicatorMetric(name, description, new 
IndicatorPredicate() {
+        private void addTaskStateMetric(final State matchingState, 
MetricNameTemplate template) {
+            metricGroup.addIndicatorMetric(template, new IndicatorPredicate() {
                 @Override
                 public boolean matches() {
                     return matchingState == taskStateTimer.currentState();
@@ -360,8 +346,8 @@ abstract class WorkerTask implements Runnable {
             });
         }
 
-        private void addRatioMetric(final State matchingState, String name, 
String description) {
-            MetricName metricName = metricGroup.metricName(name, description);
+        private void addRatioMetric(final State matchingState, 
MetricNameTemplate template) {
+            MetricName metricName = metricGroup.metricName(template);
             if (metricGroup.metrics().metric(metricName) == null) {
                 metricGroup.metrics().addMetric(metricName, new Measurable() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 6de0638..a16ab41 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -52,7 +52,8 @@ public class ConnectMetricsTest {
 
     @After
     public void tearDown() {
-        if (metrics != null) metrics.stop();
+        if (metrics != null)
+            metrics.stop();
     }
 
     @Test
@@ -85,53 +86,29 @@ public class ConnectMetricsTest {
     }
 
     @Test
-    public void testCreatingTagsWithNonNullWorkerId() {
-        Map<String, String> tags = ConnectMetrics.tags("name", "k1", "v1", 
"k2", "v2");
+    public void testCreatingTags() {
+        Map<String, String> tags = ConnectMetrics.tags("k1", "v1", "k2", "v2");
         assertEquals("v1", tags.get("k1"));
         assertEquals("v2", tags.get("k2"));
-        assertEquals("name", tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
-    }
-
-    @Test
-    public void testCreatingTagsWithNullWorkerId() {
-        Map<String, String> tags = ConnectMetrics.tags(null, "k1", "v1", "k2", 
"v2");
-        assertEquals("v1", tags.get("k1"));
-        assertEquals("v2", tags.get("k2"));
-        assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
-    }
-
-    @Test
-    public void testCreatingTagsWithEmptyWorkerId() {
-        Map<String, String> tags = ConnectMetrics.tags("", "k1", "v1", "k2", 
"v2");
-        assertEquals("v1", tags.get("k1"));
-        assertEquals("v2", tags.get("k2"));
-        assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
+        assertEquals(2, tags.size());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testCreatingTagsWithOddNumberOfTags() {
-        ConnectMetrics.tags("name", "k1", "v1", "k2", "v2", "extra");
+        ConnectMetrics.tags("k1", "v1", "k2", "v2", "extra");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testGettingGroupWithOddNumberOfTags() {
-        metrics.group("name", false, "k1", "v1", "k2", "v2", "extra");
+        metrics.group("name", "k1", "v1", "k2", "v2", "extra");
     }
 
     @Test
     public void testGettingGroupWithTags() {
-        MetricGroup group1 = metrics.group("name", false, "k1", "v1", "k2", 
"v2");
+        MetricGroup group1 = metrics.group("name", "k1", "v1", "k2", "v2");
         assertEquals("v1", group1.tags().get("k1"));
         assertEquals("v2", group1.tags().get("k2"));
-        assertEquals(null, 
group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
-    }
-
-    @Test
-    public void testGettingGroupWithWorkerIdAndTags() {
-        MetricGroup group1 = metrics.group("name", true, "k1", "v1", "k2", 
"v2");
-        assertEquals("v1", group1.tags().get("k1"));
-        assertEquals("v2", group1.tags().get("k2"));
-        assertEquals(metrics.workerId(), 
group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+        assertEquals(2, group1.tags().size());
     }
 
     @Test
@@ -156,9 +133,9 @@ public class ConnectMetricsTest {
 
     @Test
     public void testMetricGroupIdIdentity() {
-        MetricGroupId id1 = metrics.groupId("name", false, "k1", "v1");
-        MetricGroupId id2 = metrics.groupId("name", false, "k1", "v1");
-        MetricGroupId id3 = metrics.groupId("name", false, "k1", "v1", "k2", 
"v2");
+        MetricGroupId id1 = metrics.groupId("name", "k1", "v1");
+        MetricGroupId id2 = metrics.groupId("name", "k1", "v1");
+        MetricGroupId id3 = metrics.groupId("name", "k1", "v1", "k2", "v2");
 
         assertEquals(id1.hashCode(), id2.hashCode());
         assertEquals(id1, id2);
@@ -172,8 +149,8 @@ public class ConnectMetricsTest {
 
     @Test
     public void testMetricGroupIdWithoutTags() {
-        MetricGroupId id1 = metrics.groupId("name", false);
-        MetricGroupId id2 = metrics.groupId("name", false);
+        MetricGroupId id1 = metrics.groupId("name");
+        MetricGroupId id2 = metrics.groupId("name");
 
         assertEquals(id1.hashCode(), id2.hashCode());
         assertEquals(id1, id2);
@@ -183,16 +160,4 @@ public class ConnectMetricsTest {
         assertNotNull(id1.tags());
         assertNotNull(id2.tags());
     }
-
-    @Test
-    public void testMetricGroupIdWithWorkerId() {
-        MetricGroupId id1 = metrics.groupId("name", true);
-        assertNotNull(metrics.workerId(), 
id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
-        assertEquals("name;worker-id=worker1", id1.toString());
-
-        id1 = metrics.groupId("name", true, "k1", "v1", "k2", "v2");
-        assertNotNull(metrics.workerId(), 
id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
-        assertEquals("name;worker-id=worker1;k1=v1;k2=v2", id1.toString()); // 
maintain order of tags
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index f0bb1e2e..6cc6db7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -74,7 +74,7 @@ public class MockConnectMetrics extends ConnectMetrics {
      * @return the current value of the metric
      */
     public double currentMetricValue(MetricGroup metricGroup, String name) {
-        MetricName metricName = metricGroup.metricName(name, "desc");
+        MetricName metricName = metricGroup.metricName(name);
         for (MetricsReporter reporter : metrics().reporters()) {
             if (reporter instanceof MockMetricsReporter) {
                 return ((MockMetricsReporter) 
reporter).currentMetricValue(metricName);
@@ -92,7 +92,7 @@ public class MockConnectMetrics extends ConnectMetrics {
      * @return true if the metric is still register, or false if it has been 
removed
      */
     public boolean metricExists(MetricGroup metricGroup, String name) {
-        MetricName metricName = metricGroup.metricName(name, "desc");
+        MetricName metricName = metricGroup.metricName(name);
         KafkaMetric metric = metricGroup.metrics().metric(metricName);
         return metric != null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 290cdd0..782d66b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -251,8 +251,8 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 0.0);
         assertSinkMetricValue("offset-commit-completion-rate", 0.0);
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -270,8 +270,8 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 1.0);
         assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
-        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 0.0);
         assertTaskMetricValue("status-paused", 1.0);
         assertTaskMetricValue("running-ratio", 0.25);
@@ -331,8 +331,8 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 0.0);
         assertSinkMetricValue("offset-commit-completion-rate", 0.0);
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -491,7 +491,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count-avg", 0.33333);
         assertSinkMetricValue("offset-commit-seq-no", 1.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -559,7 +559,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
         assertSinkMetricValue("offset-commit-seq-no", 0.0);
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -587,7 +587,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count-avg", 0.2);
         assertSinkMetricValue("offset-commit-seq-no", 1.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertSinkMetricValue("offset-commit-skip-total", 0.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -990,7 +990,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count-avg", 0.71429);
         assertSinkMetricValue("offset-commit-seq-no", 2.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+        assertSinkMetricValue("offset-commit-skip-total", 1.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -1025,7 +1025,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count-avg", 0.5555555);
         assertSinkMetricValue("offset-commit-seq-no", 3.0);
         assertSinkMetricValue("offset-commit-completion-total", 2.0);
-        assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+        assertSinkMetricValue("offset-commit-skip-total", 1.0);
         assertTaskMetricValue("status-running", 1.0);
         assertTaskMetricValue("status-paused", 0.0);
         assertTaskMetricValue("running-ratio", 1.0);
@@ -1099,6 +1099,7 @@ public class WorkerSinkTaskTest {
         createTask(initialState);
 
         expectInitializeTask();
+        expectPollInitialAssignment();
         expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, 
TimestampType.CREATE_TIME);
         expectConversionAndTransformation(1);
 
@@ -1110,7 +1111,8 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.iteration();
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
 
         SinkRecord record = records.getValue().iterator().next();
 
@@ -1129,18 +1131,19 @@ public class WorkerSinkTaskTest {
         createTask(initialState);
 
         expectInitializeTask();
+        expectPollInitialAssignment();
         expectConsumerPoll(1, timestamp, timestampType);
         expectConversionAndTransformation(1);
 
         Capture<Collection<SinkRecord>> records = 
EasyMock.newCapture(CaptureType.ALL);
-
         sinkTask.put(EasyMock.capture(records));
 
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.iteration();
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
 
         SinkRecord record = records.getValue().iterator().next();
 
@@ -1309,8 +1312,8 @@ public class WorkerSinkTaskTest {
         sinkMetricValue("offset-commit-seq-no");
         sinkMetricValue("offset-commit-completion-rate");
         sinkMetricValue("offset-commit-completion-total");
-        sinkMetricValue("offset-commit-completion-skip-rate");
-        sinkMetricValue("offset-commit-completion-skip-total");
+        sinkMetricValue("offset-commit-skip-rate");
+        sinkMetricValue("offset-commit-skip-total");
         sinkMetricValue("put-batch-max-time-ms");
         sinkMetricValue("put-batch-avg-time-ms");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b7d21d5..ce29757 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -367,6 +367,7 @@ public class WorkerSinkTaskThreadedTest extends 
ThreadedTest {
         // converted
         expectInitializeTask();
 
+        expectPollInitialAssignment();
         expectOnePoll().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
@@ -423,6 +424,7 @@ public class WorkerSinkTaskThreadedTest extends 
ThreadedTest {
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
+        workerTask.iteration();
         workerTask.stop();
         workerTask.close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/11afff09/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index 60c17cd..5942c31 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1144,6 +1144,13 @@
 
   <!--#include virtual="generated/consumer_metrics.html" -->
 
+  <h4><a id="connect_monitoring" href="#connect_monitoring">Connect 
Monitoring</a></h4>
+
+  A Connect worker process contains all the producer and consumer metrics as 
well as metrics specific to Connect.
+  The worker process itself has a number of metrics, while each connector and 
task have additional metrics.
+
+  <!--#include virtual="generated/connect_metrics.html" -->
+
   <h4><a id="kafka_streams_monitoring" 
href="#kafka_streams_monitoring">Streams Monitoring</a></h4>
 
   A Kafka Streams instance contains all the producer and consumer metrics as 
well as additional metrics specific to streams.

Reply via email to