Repository: kafka
Updated Branches:
  refs/heads/trunk 8f90fd653 -> f4a1ca347


KAFKA-5899: Added Connect metrics for connectors (KIP-196)

This PR is the first of several subtasks for 
[KAFKA-2376](https://issues.apache.org/jira/browse/KAFKA-2376) to add metrics 
to Connect worker processes. See that issue and [KIP-196 for 
details](https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework).

This PR adds metrics for each connector using Kafka’s existing `Metrics` 
framework. This is the first of several changes to add several groups of 
metrics, this change starts by adding a very simple `ConnectMetrics` object 
that is owned by each worker and that makes it easy to define multiple groups 
of metrics, called `ConnectMetricGroup` objects. Each metric group maps to a 
JMX MBean, and each metric within the group maps to an MBean attribute.

Future PRs will build upon this simple pattern to add metrics for source and 
sink tasks, workers, and worker rebalances.

Author: Randall Hauch <rha...@gmail.com>

Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen 
Cheslack-Postava <ewe...@confluent.io>

Closes #3864 from rhauch/kafka-5899


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4a1ca34
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4a1ca34
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4a1ca34

Branch: refs/heads/trunk
Commit: f4a1ca347bd21cacf5906887f56001bce61c4544
Parents: 8f90fd6
Author: Randall Hauch <rha...@gmail.com>
Authored: Fri Sep 22 13:17:28 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Fri Sep 22 13:17:28 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/ConnectMetrics.java   | 296 +++++++++++++++++++
 .../apache/kafka/connect/runtime/Worker.java    |  13 +-
 .../kafka/connect/runtime/WorkerConfig.java     |  29 +-
 .../kafka/connect/runtime/WorkerConnector.java  | 105 ++++++-
 .../runtime/distributed/DistributedConfig.java  |  17 --
 .../connect/runtime/ConnectMetricsTest.java     | 146 +++++++++
 .../connect/runtime/MockConnectorMetrics.java   |  42 +++
 .../connect/runtime/WorkerConnectorTest.java    | 101 ++++++-
 8 files changed, 711 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
new file mode 100644
index 0000000..681a398
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The Connect metrics with JMX reporter.
+ */
+public class ConnectMetrics {
+
+    public static final String JMX_PREFIX = "kafka.connect";
+    public static final String WORKER_ID_TAG_NAME = "worker-id";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectMetrics.class);
+
+    private final Metrics metrics;
+    private final Time time;
+    private final String workerId;
+    private final ConcurrentMap<String, MetricGroup> groupsByName = new 
ConcurrentHashMap<>();
+
+    /**
+     * Create an instance.
+     *
+     * @param workerId the worker identifier; may not be null
+     * @param config   the worker configuration; may not be null
+     * @param time     the time; may not be null
+     */
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+        this.workerId = makeValidName(workerId);
+        this.time = time;
+
+        MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                                            
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
 TimeUnit.MILLISECONDS)
+                                            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
+        List<MetricsReporter> reporters = 
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
 MetricsReporter.class);
+        reporters.add(new JmxReporter(JMX_PREFIX));
+        this.metrics = new Metrics(metricConfig, reporters, time);
+        LOG.debug("Registering Connect metrics with JMX for worker '{}'", 
workerId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, workerId);
+    }
+
+    /**
+     * Get the worker identifier.
+     *
+     * @return the worker ID; never null
+     */
+    public String workerId() {
+        return workerId;
+    }
+
+    /**
+     * Get the {@link Metrics Kafka Metrics} that are managed by this object 
and that should be used to
+     * add sensors and individual metrics.
+     *
+     * @return the Kafka Metrics instance; never null
+     */
+    public Metrics metrics() {
+        return metrics;
+    }
+
+    /**
+     * Get or create a {@link MetricGroup} with the specified group name.
+     *
+     * @param groupName the name of the metric group; may not be null and must 
be a
+     *                  {@link #checkNameIsValid(String) valid name}
+     * @return the {@link MetricGroup} that can be used to create metrics; 
never null
+     * @throws IllegalArgumentException if the group name is not valid
+     */
+    public MetricGroup group(String groupName) {
+        return group(groupName, false);
+    }
+
+    /**
+     * Get or create a {@link MetricGroup} with the specified group name and 
the given tags.
+     *
+     * @param groupName    the name of the metric group; may not be null and 
must be a
+     *                     {@link #checkNameIsValid(String) valid name}
+     * @param tagKeyValues pairs of tag name and values
+     * @return the {@link MetricGroup} that can be used to create metrics; 
never null
+     * @throws IllegalArgumentException if the group name is not valid
+     */
+    public MetricGroup group(String groupName, String... tagKeyValues) {
+        return group(groupName, false, tagKeyValues);
+    }
+
+    /**
+     * Get or create a {@link MetricGroup} with the specified group name and 
the given tags.
+     *
+     * @param groupName       the name of the metric group; may not be null 
and must be a
+     *                        {@link #checkNameIsValid(String) valid name}
+     * @param includeWorkerId true if the tags should include the worker ID
+     * @param tagKeyValues    pairs of tag name and values
+     * @return the {@link MetricGroup} that can be used to create metrics; 
never null
+     * @throws IllegalArgumentException if the group name is not valid
+     */
+    public MetricGroup group(String groupName, boolean includeWorkerId, 
String... tagKeyValues) {
+        MetricGroup group = groupsByName.get(groupName);
+        if (group == null) {
+            Map<String, String> tags = tags(includeWorkerId ? workerId : null, 
tagKeyValues);
+            group = new MetricGroup(groupName, tags);
+            MetricGroup previous = groupsByName.putIfAbsent(groupName, group);
+            if (previous != null) group = previous;
+        }
+        return group;
+    }
+
+    /**
+     * Get the time.
+     *
+     * @return the time; never null
+     */
+    public Time time() {
+        return time;
+    }
+
+    /**
+     * Stop and unregister the metrics from any reporters.
+     */
+    public void stop() {
+        metrics.close();
+        LOG.debug("Unregistering Connect metrics with JMX for worker '{}'", 
workerId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
+    }
+
+    /**
+     * A group of metrics. Each group maps to a JMX MBean and each metric maps 
to an MBean attribute.
+     */
+    public class MetricGroup {
+        private final String groupName;
+        private final Map<String, String> tags;
+
+        /**
+         * Create a group of Connect metrics.
+         *
+         * @param groupName the name of the group; may not be null and must be 
valid
+         * @param tags      the tags; may not be null but may be empty
+         * @throws IllegalArgumentException if the name is not valid
+         */
+        protected MetricGroup(String groupName, Map<String, String> tags) {
+            checkNameIsValid(groupName);
+            this.groupName = groupName;
+            this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
+        }
+
+        /**
+         * Create the name of a metric that belongs to this group and has the 
group's tags.
+         *
+         * @param name the name of the metric/attribute; may not be null and 
must be valid
+         * @param desc the description for the metric/attribute; may not be 
null
+         * @return the metric name; never null
+         * @throws IllegalArgumentException if the name is not valid
+         */
+        public MetricName metricName(String name, String desc) {
+            checkNameIsValid(name);
+            return metrics.metricName(name, groupName, desc, tags);
+        }
+
+        /**
+         * The {@link Metrics} that this group belongs to.
+         *
+         * @return the metrics; never null
+         */
+        public Metrics metrics() {
+            return metrics;
+        }
+
+        /**
+         * The tags of this group.
+         *
+         * @return the unmodifiable tags; never null but may be empty
+         */
+        Map<String, String> tags() {
+            return tags;
+        }
+
+        /**
+         * Add to this group an indicator metric with a function that will be 
used to obtain the indicator state.
+         *
+         * @param name        the name of the metric; may not be null and must 
be a
+         *                    {@link #checkNameIsValid(String) valid name}
+         * @param description the description of the metric; may not be null
+         * @param predicate   the predicate function used to determine the 
indicator state; may not be null
+         * @throws IllegalArgumentException if the name is not valid
+         */
+        public void addIndicatorMetric(String name, String description, final 
IndicatorPredicate predicate) {
+            MetricName metricName = metricName(name, description);
+            if (metrics().metric(metricName) == null) {
+                metrics().addMetric(metricName, new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return predicate.matches() ? 1.0d : 0.0d;
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * A simple functional interface that determines whether an indicator 
metric is true.
+     */
+    public interface IndicatorPredicate {
+
+        /**
+         * Return whether the indicator metric is true.
+         *
+         * @return true if the indicator metric is satisfied, or false 
otherwise
+         */
+        boolean matches();
+    }
+
+    /**
+     * Create a set of tags using the supplied key and value pairs. Every tag 
name and value will be
+     * {@link #makeValidName(String) made valid} before it is used.
+     *
+     * @param workerId the worker ID that should be included first in the 
tags; may be null if not to be included
+     * @param keyValue the key and value pairs for the tags; must be an even 
number
+     * @return the map of tags that can be supplied to the {@link Metrics} 
methods; never null
+     */
+    static Map<String, String> tags(String workerId, String... keyValue) {
+        if ((keyValue.length % 2) != 0)
+            throw new IllegalArgumentException("keyValue needs to be specified 
in pairs");
+        Map<String, String> tags = new HashMap<>();
+        if (workerId != null && !workerId.trim().isEmpty()) {
+            tags.put(WORKER_ID_TAG_NAME, makeValidName(workerId));
+        }
+        for (int i = 0; i < keyValue.length; i += 2) {
+            tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 
1]));
+        }
+        return tags;
+    }
+
+    /**
+     * Utility to ensure the supplied name contains valid characters, 
replacing with a single '-' sequences of
+     * 1 or more characters <em>other than</em> word characters (e.g., 
"[a-zA-Z_0-9]").
+     *
+     * @param name the name; may not be null
+     * @return the validated name; never null
+     */
+    static String makeValidName(String name) {
+        Objects.requireNonNull(name);
+        name = name.trim();
+        if (!name.isEmpty()) {
+            name = name.replaceAll("[^\\w]+", "-");
+        }
+        return name;
+    }
+
+    /**
+     * Utility method that determines whether the supplied name contains only 
"[a-zA-Z0-9_-]" characters and thus
+     * would be unchanged by {@link #makeValidName(String)}.
+     *
+     * @param name the name; may not be null
+     * @return true if the name is valid, or false otherwise
+     * @throws IllegalArgumentException if the name is not valid
+     */
+    static void checkNameIsValid(String name) {
+        if (!name.equals(makeValidName(name))) {
+            throw new IllegalArgumentException("The name '" + name + "' 
contains at least one invalid character");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 01611ab..7ce9214 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -67,6 +67,7 @@ public class Worker {
     private final Time time;
     private final String workerId;
     private final Plugins plugins;
+    private final ConnectMetrics metrics;
     private final WorkerConfig config;
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
@@ -84,6 +85,7 @@ public class Worker {
             WorkerConfig config,
             OffsetBackingStore offsetBackingStore
     ) {
+        this.metrics = new ConnectMetrics(workerId, config, time);
         this.executor = Executors.newCachedThreadPool();
         this.workerId = workerId;
         this.time = time;
@@ -159,6 +161,7 @@ public class Worker {
         sourceTaskOffsetCommitter.close(timeoutMs);
 
         offsetBackingStore.stop();
+        metrics.stop();
 
         log.info("Worker stopped");
     }
@@ -190,7 +193,7 @@ public class Worker {
             final String connClass = 
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
             log.info("Creating connector {} of type {}", connName, connClass);
             final Connector connector = plugins.newConnector(connClass);
-            workerConnector = new WorkerConnector(connName, connector, ctx, 
statusListener);
+            workerConnector = new WorkerConnector(connName, connector, ctx, 
metrics,  statusListener);
             log.info("Instantiated connector {} with version {} of type {}", 
connName, connector.version(), connector.getClass());
             savedLoader = plugins.compareAndSwapLoaders(connector);
             workerConnector.initialize(connConfig);
@@ -536,6 +539,14 @@ public class Worker {
         return workerId;
     }
 
+    /**
+     * Get the {@link ConnectMetrics} that uses Kafka Metrics and manages the 
JMX reporter.
+     * @return the Connect-specific metrics; never null
+     */
+    public ConnectMetrics metrics() {
+        return metrics;
+    }
+
     public void setTargetState(String connName, TargetState state) {
         log.info("Setting connector {} state to {}", connName, state);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 9ac1b3b..dfae761 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -21,12 +21,16 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
 /**
  * Common base class providing configuration for Kafka Connect workers, 
whether standalone or distributed.
  */
@@ -138,6 +142,11 @@ public class WorkerConfig extends AbstractConfig {
             + "Examples: 
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
             + "/opt/connectors";
 
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+    public static final String METRICS_NUM_SAMPLES_CONFIG = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
     /**
      * Get a basic ConfigDef for a WorkerConfig. This includes all the common 
settings. Subclasses can use this to
      * bootstrap their own ConfigDef.
@@ -172,13 +181,25 @@ public class WorkerConfig extends AbstractConfig {
                 .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
                         ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
                         ACCESS_CONTROL_ALLOW_METHODS_DOC)
-                .define(
-                        PLUGIN_PATH_CONFIG,
+                .define(PLUGIN_PATH_CONFIG,
                         Type.LIST,
                         null,
                         Importance.LOW,
-                        PLUGIN_PATH_DOC
-                );
+                        PLUGIN_PATH_DOC)
+                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
+                        30000, atLeast(0), Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT,
+                        2, atLeast(1), Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(METRICS_RECORDING_LEVEL_CONFIG, Type.STRING,
+                        Sensor.RecordingLevel.INFO.toString(),
+                        in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
+                        Importance.LOW,
+                        CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
+                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
+                        "", Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 983db92..aa9cdd1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -18,6 +18,9 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@ public class WorkerConnector {
     private final ConnectorStatus.Listener statusListener;
     private final ConnectorContext ctx;
     private final Connector connector;
+    private final ConnectorMetricsGroup metrics;
 
     private Map<String, String> config;
     private State state;
@@ -57,12 +61,14 @@ public class WorkerConnector {
     public WorkerConnector(String connName,
                            Connector connector,
                            ConnectorContext ctx,
+                           ConnectMetrics metrics,
                            ConnectorStatus.Listener statusListener) {
         this.connName = connName;
         this.ctx = ctx;
         this.connector = connector;
-        this.statusListener = statusListener;
         this.state = State.INIT;
+        this.metrics = new ConnectorMetricsGroup(metrics, 
AbstractStatus.State.UNASSIGNED, statusListener);
+        this.statusListener = this.metrics;
     }
 
     public void initialize(ConnectorConfig connectorConfig) {
@@ -160,11 +166,11 @@ public class WorkerConnector {
             if (state == State.STARTED)
                 connector.stop();
             this.state = State.STOPPED;
+            statusListener.onShutdown(connName);
         } catch (Throwable t) {
             log.error("{} Error while shutting down connector", this, t);
             this.state = State.FAILED;
-        } finally {
-            statusListener.onShutdown(connName);
+            statusListener.onFailure(connName, t);
         }
     }
 
@@ -195,10 +201,99 @@ public class WorkerConnector {
         return connector;
     }
 
+    ConnectorMetricsGroup metrics() {
+        return metrics;
+    }
+
     @Override
     public String toString() {
         return "WorkerConnector{" +
-                "id=" + connName +
-                '}';
+                       "id=" + connName +
+                       '}';
+    }
+
+    class ConnectorMetricsGroup implements ConnectorStatus.Listener {
+        /**
+         * Use {@link AbstractStatus.State} since it has all of the states we 
want,
+         * unlike {@link WorkerConnector.State}.
+         */
+        private volatile AbstractStatus.State state;
+        private final MetricGroup metricGroup;
+        private final ConnectorStatus.Listener delegate;
+
+        public ConnectorMetricsGroup(ConnectMetrics connectMetrics, 
AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
+            this.delegate = delegate;
+            this.state = initialState;
+            this.metricGroup = connectMetrics.group("connector-metrics",
+                    "connector", connName);
+
+            addStateMetric(AbstractStatus.State.RUNNING, "status-running",
+                    "Signals whether the connector task is in the running 
state.");
+            addStateMetric(AbstractStatus.State.PAUSED, "status-paused",
+                    "Signals whether the connector task is in the paused 
state.");
+            addStateMetric(AbstractStatus.State.FAILED, "status-failed",
+                    "Signals whether the connector task is in the failed 
state.");
+        }
+
+        private void addStateMetric(final AbstractStatus.State matchingState, 
String name, String description) {
+            metricGroup.addIndicatorMetric(name, description, new 
IndicatorPredicate() {
+                @Override
+                public boolean matches() {
+                    return state == matchingState;
+                }
+            });
+        }
+
+        @Override
+        public void onStartup(String connector) {
+            state = AbstractStatus.State.RUNNING;
+            delegate.onStartup(connector);
+        }
+
+        @Override
+        public void onShutdown(String connector) {
+            state = AbstractStatus.State.UNASSIGNED;
+            delegate.onShutdown(connector);
+        }
+
+        @Override
+        public void onPause(String connector) {
+            state = AbstractStatus.State.PAUSED;
+            delegate.onPause(connector);
+        }
+
+        @Override
+        public void onResume(String connector) {
+            state = AbstractStatus.State.RUNNING;
+            delegate.onResume(connector);
+        }
+
+        @Override
+        public void onFailure(String connector, Throwable cause) {
+            state = AbstractStatus.State.FAILED;
+            delegate.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onDeletion(String connector) {
+            state = AbstractStatus.State.DESTROYED;
+            delegate.onDeletion(connector);
+        }
+
+        boolean isUnassigned() {
+            return state == AbstractStatus.State.UNASSIGNED;
+        }
+
+        boolean isRunning() {
+            return state == AbstractStatus.State.RUNNING;
+        }
+
+        boolean isPaused() {
+            return state == AbstractStatus.State.PAUSED;
+        }
+
+        boolean isFailed() {
+            return state == AbstractStatus.State.FAILED;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 650ef67..dc9017b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -194,23 +194,6 @@ public class DistributedConfig extends WorkerConfig {
                         atLeast(0L),
                         ConfigDef.Importance.LOW,
                         CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        30000,
-                        atLeast(0),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
-                        ConfigDef.Type.INT,
-                        2,
-                        atLeast(1),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
-                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
-                        ConfigDef.Type.LIST,
-                        "",
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                 .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
                         ConfigDef.Type.INT,
                         40 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
new file mode 100644
index 0000000..34997e6
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.util.MockTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+public class ConnectMetricsTest {
+
+    private static final Map<String, String> DEFAULT_WORKER_CONFIG = new 
HashMap<>();
+
+    static {
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+    }
+
+    private ConnectMetrics metrics;
+
+    @Before
+    public void setUp() {
+        metrics = new ConnectMetrics("worker1", new 
WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new 
MockTime());
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
+    @Test
+    public void testValidatingNameWithAllValidCharacters() {
+        String name = 
"abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-0123456789";
+        assertEquals(name, ConnectMetrics.makeValidName(name));
+    }
+
+    @Test
+    public void testValidatingEmptyName() {
+        String name = "";
+        assertSame(name, ConnectMetrics.makeValidName(name));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testValidatingNullName() {
+        ConnectMetrics.makeValidName(null);
+    }
+
+    @Test
+    public void testValidatingNameWithInvalidCharacters() {
+        assertEquals("a-b-c-d-e-f-g-h-i-j-k", 
ConnectMetrics.makeValidName("a:b;c/d\\e,f*.--..;;g?h[i]j=k"));
+        assertEquals("-a-b-c-d-e-f-g-h-", 
ConnectMetrics.makeValidName(":a:b;c/d\\e,f*g?[]=h:"));
+        assertEquals("a-f-h", ConnectMetrics.makeValidName("a:;/\\,f*?h"));
+    }
+
+    @Test
+    public void testKafkaMetricsNotNull() {
+        assertNotNull(metrics.metrics());
+    }
+
+    @Test
+    public void testCreatingTagsWithNonNullWorkerId() {
+        Map<String, String> tags = ConnectMetrics.tags("name", "k1", "v1", 
"k2", "v2");
+        assertEquals("v1", tags.get("k1"));
+        assertEquals("v2", tags.get("k2"));
+        assertEquals("name", tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
+    }
+
+    @Test
+    public void testCreatingTagsWithNullWorkerId() {
+        Map<String, String> tags = ConnectMetrics.tags(null, "k1", "v1", "k2", 
"v2");
+        assertEquals("v1", tags.get("k1"));
+        assertEquals("v2", tags.get("k2"));
+        assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
+    }
+
+    @Test
+    public void testCreatingTagsWithEmptyWorkerId() {
+        Map<String, String> tags = ConnectMetrics.tags("", "k1", "v1", "k2", 
"v2");
+        assertEquals("v1", tags.get("k1"));
+        assertEquals("v2", tags.get("k2"));
+        assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCreatingTagsWithOddNumberOfTags() {
+        ConnectMetrics.tags("name", "k1", "v1", "k2", "v2", "extra");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGettingGroupWithOddNumberOfTags() {
+        metrics.group("name", false, "k1", "v1", "k2", "v2", "extra");
+    }
+
+    @Test
+    public void testGettingGroupWithTags() {
+        MetricGroup group1 = metrics.group("name", false, "k1", "v1", "k2", 
"v2");
+        assertEquals("v1", group1.tags().get("k1"));
+        assertEquals("v2", group1.tags().get("k2"));
+        assertEquals(null, 
group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+    }
+
+    @Test
+    public void testGettingGroupWithWorkerIdAndTags() {
+        MetricGroup group1 = metrics.group("name", true, "k1", "v1", "k2", 
"v2");
+        assertEquals("v1", group1.tags().get("k1"));
+        assertEquals("v2", group1.tags().get("k2"));
+        assertEquals(metrics.workerId(), 
group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+    }
+
+    @Test
+    public void testGettingGroupMultipleTimes() {
+        MetricGroup group1 = metrics.group("name");
+        MetricGroup group2 = metrics.group("name");
+        assertNotNull(group1);
+        assertSame(group1, group2);
+        MetricGroup group3 = metrics.group("other");
+        assertNotNull(group3);
+        assertNotSame(group1, group3);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
new file mode 100644
index 0000000..a717492
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.MockTime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockConnectorMetrics extends ConnectMetrics {
+
+    private static final Map<String, String> DEFAULT_WORKER_CONFIG = new 
HashMap<>();
+    static {
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+    }
+
+    public MockConnectorMetrics() {
+        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), 
DEFAULT_WORKER_CONFIG), new MockTime());
+    }
+
+    @Override
+    public MockTime time() {
+        return (MockTime) super.time();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4a1ca34/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 11b05ee..2101a33 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -23,6 +23,7 @@ import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -31,6 +32,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
 public class WorkerConnectorTest extends EasyMockSupport {
@@ -42,6 +45,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
         CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
     }
     public ConnectorConfig connectorConfig;
+    public ConnectMetrics metrics;
 
     @Mock Plugins plugins;
     @Mock Connector connector;
@@ -51,6 +55,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
     @Before
     public void setup() {
         connectorConfig = new ConnectorConfig(plugins, CONFIG);
+        metrics = new MockConnectorMetrics();
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
     }
 
     @Test
@@ -68,10 +78,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertFailedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -93,11 +105,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertFailedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertFailedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -121,11 +136,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -152,12 +170,16 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
+        assertPausedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -184,12 +206,16 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
+        assertPausedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -209,11 +235,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
+        assertPausedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -236,11 +265,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertFailedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -261,16 +293,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
         connector.stop();
         expectLastCall().andThrow(exception);
 
-        listener.onShutdown(CONNECTOR);
+        listener.onFailure(CONNECTOR, exception);
         expectLastCall();
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.shutdown();
+        assertFailedMetric(workerConnector);
 
         verifyAll();
     }
@@ -295,12 +330,16 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
@@ -327,17 +366,57 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, ctx, metrics, listener);
 
         workerConnector.initialize(connectorConfig);
+        assertInitializedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
+        assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
+        assertPausedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
+        assertPausedMetric(workerConnector);
         workerConnector.shutdown();
+        assertStoppedMetric(workerConnector);
 
         verifyAll();
     }
 
+    protected void assertFailedMetric(WorkerConnector workerConnector) {
+        assertFalse(workerConnector.metrics().isUnassigned());
+        assertTrue(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isPaused());
+        assertFalse(workerConnector.metrics().isRunning());
+    }
+
+    protected void assertPausedMetric(WorkerConnector workerConnector) {
+        assertFalse(workerConnector.metrics().isUnassigned());
+        assertFalse(workerConnector.metrics().isFailed());
+        assertTrue(workerConnector.metrics().isPaused());
+        assertFalse(workerConnector.metrics().isRunning());
+    }
+
+    protected void assertRunningMetric(WorkerConnector workerConnector) {
+        assertFalse(workerConnector.metrics().isUnassigned());
+        assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isPaused());
+        assertTrue(workerConnector.metrics().isRunning());
+    }
+
+    protected void assertStoppedMetric(WorkerConnector workerConnector) {
+        assertTrue(workerConnector.metrics().isUnassigned());
+        assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isPaused());
+        assertFalse(workerConnector.metrics().isRunning());
+    }
+
+    protected void assertInitializedMetric(WorkerConnector workerConnector) {
+        assertTrue(workerConnector.metrics().isUnassigned());
+        assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isPaused());
+        assertFalse(workerConnector.metrics().isRunning());
+    }
+
     private static abstract class TestConnector extends Connector {
     }
 

Reply via email to