This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d6133f6997e KAFKA-18988: Connect Multiversion Support (Updates to 
status and metrics) (#17988)
d6133f6997e is described below

commit d6133f6997e0de3931f3d98b30950aeeefd23e70
Author: snehashisp <[email protected]>
AuthorDate: Thu Apr 24 22:53:31 2025 +0530

    KAFKA-18988: Connect Multiversion Support (Updates to status and metrics) 
(#17988)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  50 ++++----
 .../kafka/connect/runtime/AbstractStatus.java      |  22 +++-
 .../connect/runtime/AbstractWorkerSourceTask.java  |  10 +-
 .../connect/runtime/ConnectMetricsRegistry.java    |  72 +++++++++++
 .../kafka/connect/runtime/ConnectorConfig.java     |  22 +++-
 .../kafka/connect/runtime/ConnectorStatus.java     |   8 +-
 .../runtime/ExactlyOnceWorkerSourceTask.java       |   3 +-
 .../kafka/connect/runtime/TaskPluginsMetadata.java | 131 +++++++++++++++++++++
 .../apache/kafka/connect/runtime/TaskStatus.java   |   4 +-
 .../kafka/connect/runtime/TransformationChain.java |   5 +
 .../kafka/connect/runtime/TransformationStage.java |  57 ++++++++-
 .../org/apache/kafka/connect/runtime/Worker.java   |  50 ++++++--
 .../kafka/connect/runtime/WorkerConnector.java     |  17 ++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  10 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |   3 +-
 .../apache/kafka/connect/runtime/WorkerTask.java   |  62 +++++++++-
 .../runtime/rest/entities/ConnectorStateInfo.java  |  21 +++-
 .../connect/storage/KafkaStatusBackingStore.java   |   9 +-
 .../runtime/AbstractWorkerSourceTaskTest.java      |   2 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |  12 +-
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |   5 +-
 .../kafka/connect/runtime/RestartPlanTest.java     |  32 ++---
 .../connect/runtime/TransformationStageTest.java   |   4 +
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   2 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   2 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   2 +-
 .../kafka/connect/runtime/WorkerTaskTest.java      |   7 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |  14 ++-
 .../kafka/connect/runtime/WorkerTestUtils.java     |   4 +
 .../runtime/distributed/DistributedHerderTest.java |   5 +
 .../rest/resources/ConnectorsResourceTest.java     |   4 +-
 .../runtime/standalone/StandaloneHerderTest.java   |   7 ++
 32 files changed, 556 insertions(+), 102 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 5a787909925..bce67129388 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -203,83 +203,91 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
     @Override
     public void onStartup(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.RUNNING,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     @Override
     public void onStop(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
AbstractStatus.State.STOPPED,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     @Override
     public void onPause(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.PAUSED,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     @Override
     public void onResume(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
TaskStatus.State.RUNNING,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     @Override
     public void onShutdown(String connector) {
         statusBackingStore.putSafe(new ConnectorStatus(connector, 
ConnectorStatus.State.UNASSIGNED,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     @Override
     public void onFailure(String connector, Throwable cause) {
         statusBackingStore.putSafe(new ConnectorStatus(connector, 
ConnectorStatus.State.FAILED,
-                trace(cause), workerId, generation()));
+                trace(cause), workerId, generation(), 
worker.connectorVersion(connector)));
     }
 
     @Override
     public void onStartup(ConnectorTaskId id) {
-        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, 
workerId, generation()));
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, 
workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     @Override
     public void onFailure(ConnectorTaskId id, Throwable cause) {
-        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, 
workerId, generation(), trace(cause)));
+        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, 
workerId, generation(), trace(cause),
+                worker.taskVersion(id)));
     }
 
     @Override
     public void onShutdown(ConnectorTaskId id) {
-        statusBackingStore.putSafe(new TaskStatus(id, 
TaskStatus.State.UNASSIGNED, workerId, generation()));
+        statusBackingStore.putSafe(new TaskStatus(id, 
TaskStatus.State.UNASSIGNED, workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     @Override
     public void onResume(ConnectorTaskId id) {
-        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, 
workerId, generation()));
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, 
workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     @Override
     public void onPause(ConnectorTaskId id) {
-        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, 
workerId, generation()));
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, 
workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     @Override
     public void onDeletion(String connector) {
         for (TaskStatus status : statusBackingStore.getAll(connector))
             onDeletion(status.id());
-        statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.DESTROYED, workerId, generation()));
+        statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.DESTROYED, workerId, generation(),
+                worker.connectorVersion(connector)));
     }
 
     @Override
     public void onDeletion(ConnectorTaskId id) {
-        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation()));
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     public void onRestart(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.RESTARTING,
-                workerId, generation()));
+                workerId, generation(), worker.connectorVersion(connector)));
     }
 
     public void onRestart(ConnectorTaskId id) {
-        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, 
workerId, generation()));
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, 
workerId, generation(), null,
+                worker.taskVersion(id)));
     }
 
     @Override
@@ -347,12 +355,12 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
 
         ConnectorStateInfo.ConnectorState connectorState = new 
ConnectorStateInfo.ConnectorState(
-                connector.state().toString(), connector.workerId(), 
connector.trace());
+                connector.state().toString(), connector.workerId(), 
connector.trace(), connector.version());
         List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
 
         for (TaskStatus status : tasks) {
             taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
-                    status.state().toString(), status.workerId(), 
status.trace()));
+                    status.state().toString(), status.workerId(), 
status.trace(), status.version()));
         }
 
         Collections.sort(taskStates);
@@ -388,7 +396,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
             throw new NotFoundException("No status found for task " + id);
 
         return new ConnectorStateInfo.TaskState(id.task(), 
status.state().toString(),
-                status.workerId(), status.trace());
+                status.workerId(), status.trace(), status.version());
     }
 
     @Override
@@ -626,7 +634,8 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         ConnectorStateInfo.ConnectorState connectorInfoState = new 
ConnectorStateInfo.ConnectorState(
                 connectorState.toString(),
                 connectorStatus.workerId(),
-                connectorStatus.trace()
+                connectorStatus.trace(),
+                connectorStatus.version()
         );
 
         // Collect the task states, If requested, mark the task as restarting
@@ -638,7 +647,8 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                             taskStatus.id().task(),
                             taskState.toString(),
                             taskStatus.workerId(),
-                            taskStatus.trace()
+                            taskStatus.trace(),
+                            taskStatus.version()
                     );
                 })
                 .collect(Collectors.toList());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index 76036d610d7..fc8bc7ca050 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -34,18 +34,29 @@ public abstract class AbstractStatus<T> {
     private final State state;
     private final String trace;
     private final String workerId;
+    private final String version;
     private final int generation;
 
     public AbstractStatus(T id,
                           State state,
                           String workerId,
                           int generation,
-                          String trace) {
+                          String trace,
+                          String version) {
         this.id = id;
         this.state = state;
         this.workerId = workerId;
         this.generation = generation;
         this.trace = trace;
+        this.version = version;
+    }
+
+    public AbstractStatus(T id,
+                          State state,
+                          String workerId,
+                          int generation,
+                          String trace) {
+        this(id, state, workerId, generation, trace, null);
     }
 
     public T id() {
@@ -68,12 +79,17 @@ public abstract class AbstractStatus<T> {
         return generation;
     }
 
+    public String version() {
+        return version;
+    }
+
     @Override
     public String toString() {
         return "Status{" +
                 "id=" + id +
                 ", state=" + state +
                 ", workerId='" + workerId + '\'' +
+                ", version='" + version + '\'' +
                 ", generation=" + generation +
                 '}';
     }
@@ -89,7 +105,8 @@ public abstract class AbstractStatus<T> {
                 && Objects.equals(id, that.id)
                 && state == that.state
                 && Objects.equals(trace, that.trace)
-                && Objects.equals(workerId, that.workerId);
+                && Objects.equals(workerId, that.workerId)
+                && Objects.equals(version, that.version);
     }
 
     @Override
@@ -98,6 +115,7 @@ public abstract class AbstractStatus<T> {
         result = 31 * result + (state != null ? state.hashCode() : 0);
         result = 31 * result + (trace != null ? trace.hashCode() : 0);
         result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+        result = 31 * result + (version != null ? version.hashCode() : 0);
         result = 31 * result + generation;
         return result;
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 683eb3abed0..9a74d81770f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -203,6 +203,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
     private final boolean topicTrackingEnabled;
     private final TopicCreation topicCreation;
     private final Executor closeExecutor;
+    private final String version;
 
     // Visible for testing
     List<SourceRecord> toSend;
@@ -236,11 +237,12 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
                                        StatusBackingStore statusBackingStore,
                                        Executor closeExecutor,
                                        
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+                                       TaskPluginsMetadata pluginsMetadata,
                                        Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
 
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
                 retryWithToleranceOperator, transformationChain, 
errorReportersSupplier,
-                time, statusBackingStore, pluginLoaderSwapper);
+                time, statusBackingStore, pluginsMetadata, 
pluginLoaderSwapper);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -258,6 +260,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.topicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.topicCreation = TopicCreation.newTopicCreation(workerConfig, 
topicGroups);
+        this.version = task.version();
     }
 
     @Override
@@ -391,6 +394,11 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
         finalOffsetCommit(false);
     }
 
+    @Override
+    public String taskVersion() {
+        return version;
+    }
+
     /**
      * Try to send a batch of records. If a send fails and is retriable, this 
saves the remainder of the batch so it can
      * be retried after backing off. If a send fails and is not retriable, 
this will throw a ConnectException.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index 1d144440f2c..496c838cf45 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -37,6 +37,10 @@ public class ConnectMetricsRegistry {
     public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
     public static final String WORKER_REBALANCE_GROUP_NAME = 
"connect-worker-rebalance-metrics";
     public static final String TASK_ERROR_HANDLING_GROUP_NAME = 
"task-error-metrics";
+    public static final String TRANSFORMS_GROUP = 
"connector-transform-metrics";
+    public static final String PREDICATES_GROUP = 
"connector-predicate-metrics";
+    public static final String TRANSFORM_TAG_NAME = "transform";
+    public static final String PREDICATE_TAG_NAME = "predicate";
 
     private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
     public final MetricNameTemplate connectorStatus;
@@ -59,6 +63,17 @@ public class ConnectMetricsRegistry {
     public final MetricNameTemplate taskBatchSizeAvg;
     public final MetricNameTemplate taskCommitFailurePercentage;
     public final MetricNameTemplate taskCommitSuccessPercentage;
+    public final MetricNameTemplate taskConnectorClass;
+    public final MetricNameTemplate taskConnectorClassVersion;
+    public final MetricNameTemplate taskConnectorType;
+    public final MetricNameTemplate taskClass;
+    public final MetricNameTemplate taskVersion;
+    public final MetricNameTemplate taskKeyConverterClass;
+    public final MetricNameTemplate taskValueConverterClass;
+    public final MetricNameTemplate taskKeyConverterVersion;
+    public final MetricNameTemplate taskValueConverterVersion;
+    public final MetricNameTemplate taskHeaderConverterClass;
+    public final MetricNameTemplate taskHeaderConverterVersion;
     public final MetricNameTemplate sourceRecordPollRate;
     public final MetricNameTemplate sourceRecordPollTotal;
     public final MetricNameTemplate sourceRecordWriteRate;
@@ -115,6 +130,10 @@ public class ConnectMetricsRegistry {
     public final MetricNameTemplate transactionSizeMin;
     public final MetricNameTemplate transactionSizeMax;
     public final MetricNameTemplate transactionSizeAvg;
+    public final MetricNameTemplate transformClass;
+    public final MetricNameTemplate transformVersion;
+    public final MetricNameTemplate predicateClass;
+    public final MetricNameTemplate predicateVersion;
 
     public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
 
@@ -164,6 +183,43 @@ public class ConnectMetricsRegistry {
         taskCommitSuccessPercentage = 
createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME,
                                                      "The average percentage 
of this task's offset commit attempts that succeeded.",
                                                      workerTaskTags);
+        taskConnectorClass = createTemplate("connector-class", 
TASK_GROUP_NAME, "The name of the connector class.", workerTaskTags);
+        taskConnectorClassVersion = createTemplate("connector-version", 
TASK_GROUP_NAME,
+                                                   "The version of the 
connector class, as reported by the connector.", workerTaskTags);
+        taskConnectorType = createTemplate("connector-type", TASK_GROUP_NAME, 
"The type of the connector. One of 'source' or 'sink'.",
+                                           workerTaskTags);
+        taskClass = createTemplate("task-class", TASK_GROUP_NAME, "The class 
name of the task.", workerTaskTags);
+        taskVersion = createTemplate("task-version", TASK_GROUP_NAME, "The 
version of the task.", workerTaskTags);
+        taskKeyConverterClass = createTemplate("key-converter-class", 
TASK_GROUP_NAME,
+                                            "The fully qualified class name 
from key.converter", workerTaskTags);
+        taskValueConverterClass = createTemplate("value-converter-class", 
TASK_GROUP_NAME,
+                                            "The fully qualified class name 
from value.converter", workerTaskTags);
+        taskKeyConverterVersion = createTemplate("key-converter-version", 
TASK_GROUP_NAME,
+                                            "The version instantiated for 
key.converter. May be undefined", workerTaskTags);
+        taskValueConverterVersion = createTemplate("value-converter-version", 
TASK_GROUP_NAME,
+                                                "The version instantiated for 
value.converter. May be undefined", workerTaskTags);
+        taskHeaderConverterClass = createTemplate("header-converter-class", 
TASK_GROUP_NAME,
+                                                "The fully qualified class 
name from header.converter", workerTaskTags);
+        taskHeaderConverterVersion = 
createTemplate("header-converter-version", TASK_GROUP_NAME,
+                                                    "The version instantiated 
for header.converter. May be undefined", workerTaskTags);
+
+        /* Transformation Metrics */
+        Set<String> transformTags = new LinkedHashSet<>(tags);
+        transformTags.addAll(workerTaskTags);
+        transformTags.add(TRANSFORM_TAG_NAME);
+        transformClass = createTemplate("transform-class", TRANSFORMS_GROUP,
+                "The class name of the transformation class", transformTags);
+        transformVersion = createTemplate("transform-version", 
TRANSFORMS_GROUP,
+                "The version of the transformation class", transformTags);
+
+        /* Predicate Metrics */
+        Set<String> predicateTags = new LinkedHashSet<>(tags);
+        predicateTags.addAll(workerTaskTags);
+        predicateTags.add(PREDICATE_TAG_NAME);
+        predicateClass = createTemplate("predicate-class", PREDICATES_GROUP,
+                "The class name of the predicate class", predicateTags);
+        predicateVersion = createTemplate("predicate-version", 
PREDICATES_GROUP,
+                "The version of the predicate class", predicateTags);
 
         /* Source worker task level */
         Set<String> sourceTaskTags = new LinkedHashSet<>(tags);
@@ -426,4 +482,20 @@ public class ConnectMetricsRegistry {
     public String taskErrorHandlingGroupName() {
         return TASK_ERROR_HANDLING_GROUP_NAME;
     }
+
+    public String transformsGroupName() {
+        return TRANSFORMS_GROUP;
+    }
+
+    public String transformsTagName() {
+        return TRANSFORM_TAG_NAME;
+    }
+
+    public String predicatesGroupName() {
+        return PREDICATES_GROUP;
+    }
+
+    public String predicateTagName() {
+        return PREDICATE_TAG_NAME;
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 3a301335502..efd421bd2e2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -374,7 +374,7 @@ public class ConnectorConfig extends AbstractConfig {
                 final String versionConfig = prefix + 
WorkerConfig.PLUGIN_VERSION_SUFFIX;
                 final Transformation<R> transformation = 
getTransformationOrPredicate(plugins, typeConfig, versionConfig);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+                String predicateAlias = (String) 
configs.remove(TransformationStage.PREDICATE_CONFIG);
                 Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 Plugin<Transformation<R>> transformationPlugin = 
metrics.wrap(transformation, connectorTaskId, alias);
@@ -384,10 +384,24 @@ public class ConnectorConfig extends AbstractConfig {
                     final String predicateVersionConfig = predicatePrefix + 
WorkerConfig.PLUGIN_VERSION_SUFFIX;
                     Predicate<R> predicate = 
getTransformationOrPredicate(plugins, predicateTypeConfig, 
predicateVersionConfig);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    Plugin<Predicate<R>> predicatePlugin = 
metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
-                    transformations.add(new 
TransformationStage<>(predicatePlugin, negate != null && 
Boolean.parseBoolean(negate.toString()), transformationPlugin, 
plugins.safeLoaderSwapper()));
+                    Plugin<Predicate<R>> predicatePlugin = 
metrics.wrap(predicate, connectorTaskId, predicateAlias);
+                    transformations.add(new TransformationStage<>(
+                        predicatePlugin,
+                        predicateAlias,
+                        plugins.pluginVersion(predicate.getClass().getName(), 
predicate.getClass().getClassLoader(), PluginType.PREDICATE),
+                        negate != null && 
Boolean.parseBoolean(negate.toString()),
+                        transformationPlugin,
+                        alias,
+                        
plugins.pluginVersion(transformation.getClass().getName(), 
transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION),
+                        plugins.safeLoaderSwapper())
+                    );
                 } else {
-                    transformations.add(new 
TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper()));
+                    transformations.add(new TransformationStage<>(
+                        transformationPlugin,
+                        alias,
+                        
plugins.pluginVersion(transformation.getClass().getName(), 
transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION),
+                        plugins.safeLoaderSwapper())
+                    );
                 }
             } catch (Exception e) {
                 throw new ConnectException(e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
index 10ed188cdf8..d704a3374e2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -19,8 +19,12 @@ package org.apache.kafka.connect.runtime;
 
 public class ConnectorStatus extends AbstractStatus<String> {
 
-    public ConnectorStatus(String connector, State state, String msg, String 
workerUrl, int generation) {
-        super(connector, state, workerUrl, generation, msg);
+    public ConnectorStatus(String connector, State state, String msg, String 
workerUrl, int generation, String version) {
+        super(connector, state, workerUrl, generation, msg, version);
+    }
+
+    public ConnectorStatus(String connector, State state, String workerUrl, 
int generation, String version) {
+        super(connector, state, workerUrl, generation, null, version);
     }
 
     public ConnectorStatus(String connector, State state, String workerUrl, 
int generation) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 42e43babe55..d6f4ffbd4b9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -104,11 +104,12 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                                        Runnable preProducerCheck,
                                        Runnable postProducerCheck,
                                        
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+                                       TaskPluginsMetadata pluginsMetadata,
                                        Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
         super(id, task, statusListener, initialState, configState, 
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 buildTransactionContext(sourceConfig),
                 producer, admin, topicGroups, offsetReader, offsetWriter, 
offsetStore, workerConfig, connectMetrics, errorMetrics,
-                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
+                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper);
 
         this.transactionOpen = false;
         this.committableRecords = new LinkedHashMap<>();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
new file mode 100644
index 00000000000..14e6cb9b7a7
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TaskPluginsMetadata {
+
+    private final String connectorClass;
+    private final String connectorVersion;
+    private final ConnectorType connectorType;
+    private final String taskClass;
+    private final String taskVersion;
+    private final String keyConverterClass;
+    private final String keyConverterVersion;
+    private final String valueConverterClass;
+    private final String valueConverterVersion;
+    private final String headerConverterClass;
+    private final String headerConverterVersion;
+    private final Set<TransformationStage.AliasedPluginInfo> transformations;
+    private final Set<TransformationStage.AliasedPluginInfo> predicates;
+
+    public TaskPluginsMetadata(
+            Class<? extends Connector> connectorClass,
+            Task task,
+            Converter keyConverter,
+            Converter valueConverter,
+            HeaderConverter headerConverter,
+            List<TransformationStage.StageInfo> transformationStageInfo,
+            Plugins plugins
+    ) {
+
+        assert connectorClass != null;
+        assert task != null;
+        assert keyConverter != null;
+        assert valueConverter != null;
+        assert headerConverter != null;
+        assert transformationStageInfo != null;
+
+        this.connectorClass = connectorClass.getName();
+        this.connectorVersion = 
plugins.pluginVersion(connectorClass.getName(), 
connectorClass.getClassLoader(), PluginType.SINK, PluginType.SOURCE);
+        this.connectorType = ConnectorType.from(connectorClass);
+        this.taskClass = task.getClass().getName();
+        this.taskVersion = task.version();
+        this.keyConverterClass = keyConverter.getClass().getName();
+        this.keyConverterVersion = 
plugins.pluginVersion(keyConverter.getClass().getName(), 
keyConverter.getClass().getClassLoader(), PluginType.CONVERTER);
+        this.valueConverterClass = valueConverter.getClass().getName();
+        this.valueConverterVersion = 
plugins.pluginVersion(valueConverter.getClass().getName(), 
valueConverter.getClass().getClassLoader(), PluginType.CONVERTER);
+        this.headerConverterClass = headerConverter.getClass().getName();
+        this.headerConverterVersion = 
plugins.pluginVersion(headerConverter.getClass().getName(), 
headerConverter.getClass().getClassLoader(), PluginType.HEADER_CONVERTER);
+        this.transformations = 
transformationStageInfo.stream().map(TransformationStage.StageInfo::transform).collect(Collectors.toSet());
+        this.predicates = 
transformationStageInfo.stream().map(TransformationStage.StageInfo::predicate).filter(Objects::nonNull).collect(Collectors.toSet());
+    }
+
+    public String connectorClass() {
+        return connectorClass;
+    }
+
+    public String connectorVersion() {
+        return connectorVersion;
+    }
+
+    public ConnectorType connectorType() {
+        return connectorType;
+    }
+
+    public String taskClass() {
+        return taskClass;
+    }
+
+    public String taskVersion() {
+        return taskVersion;
+    }
+
+    public String keyConverterClass() {
+        return keyConverterClass;
+    }
+
+    public String keyConverterVersion() {
+        return keyConverterVersion;
+    }
+
+    public String valueConverterClass() {
+        return valueConverterClass;
+    }
+
+    public String valueConverterVersion() {
+        return valueConverterVersion;
+    }
+
+    public String headerConverterClass() {
+        return headerConverterClass;
+    }
+
+    public String headerConverterVersion() {
+        return headerConverterVersion;
+    }
+
+    public Set<TransformationStage.AliasedPluginInfo> transformations() {
+        return transformations;
+    }
+
+    public Set<TransformationStage.AliasedPluginInfo> predicates() {
+        return predicates;
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
index e35efcafe2e..45150ef7ef5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -20,8 +20,8 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 
 public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
 
-    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int 
generation, String trace) {
-        super(id, state, workerUrl, generation, trace);
+    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int 
generation, String trace, String version) {
+        super(id, state, workerUrl, generation, trace, version);
     }
 
     public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int 
generation) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index f6b92697c44..68d52f2c1ca 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Objects;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 /**
  * Represents a chain of {@link Transformation}s to be applied to a {@link 
ConnectRecord} serially.
@@ -89,4 +90,8 @@ public class TransformationChain<T, R extends 
ConnectRecord<R>> implements AutoC
         }
         return chain.toString();
     }
+
+    public List<TransformationStage.StageInfo> transformationChainInfo() {
+        return 
transformationStages.stream().map(TransformationStage::transformationStageInfo).collect(Collectors.toList());
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index a86c4878ab3..56293e03632 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
+import java.util.Objects;
 import java.util.function.Function;
 
 /**
@@ -39,18 +40,40 @@ public class TransformationStage<R extends 
ConnectRecord<R>> implements AutoClos
     private final Plugin<Predicate<R>> predicatePlugin;
     private final Plugin<Transformation<R>> transformationPlugin;
     private final boolean negate;
+    private final String transformAlias;
+    private final String predicateAlias;
+    private final String transformVersion;
+    private final String predicateVersion;
     private final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
 
 
-    TransformationStage(Plugin<Transformation<R>> transformationPlugin, 
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
-        this(null, false, transformationPlugin, pluginLoaderSwapper);
+    TransformationStage(
+        Plugin<Transformation<R>> transformationPlugin,
+        String transformAlias,
+        String transformVersion,
+        Function<ClassLoader, LoaderSwap> pluginLoaderSwapper
+    ) {
+        this(null, null, null, false, transformationPlugin, transformAlias, 
transformVersion, pluginLoaderSwapper);
     }
 
-    TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate, 
Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader, 
LoaderSwap> pluginLoaderSwapper) {
+    TransformationStage(
+        Plugin<Predicate<R>> predicatePlugin,
+        String predicateAlias,
+        String predicateVersion,
+        boolean negate,
+        Plugin<Transformation<R>> transformationPlugin,
+        String transformAlias,
+        String transformVersion,
+        Function<ClassLoader, LoaderSwap> pluginLoaderSwapper
+    ) {
         this.predicatePlugin = predicatePlugin;
         this.negate = negate;
         this.transformationPlugin = transformationPlugin;
         this.pluginLoaderSwapper = pluginLoaderSwapper;
+        this.transformAlias = transformAlias;
+        this.predicateAlias = predicateAlias;
+        this.transformVersion = transformVersion;
+        this.predicateVersion = predicateVersion;
     }
 
     public Class<? extends Transformation<R>> transformClass() {
@@ -89,4 +112,32 @@ public class TransformationStage<R extends 
ConnectRecord<R>> implements AutoClos
                 ", negate=" + negate +
                 '}';
     }
+
+    public record AliasedPluginInfo(String alias, String className, String 
version) {
+        public AliasedPluginInfo {
+            Objects.requireNonNull(alias, "alias cannot be null");
+            Objects.requireNonNull(className, "className cannot be null");
+        }
+    }
+
+
+    public record StageInfo(AliasedPluginInfo transform, AliasedPluginInfo 
predicate) {
+        public StageInfo {
+            Objects.requireNonNull(transform, "transform cannot be null");
+        }
+    }
+
+
+    public StageInfo transformationStageInfo() {
+        AliasedPluginInfo transformInfo = new AliasedPluginInfo(
+            transformAlias,
+            transformationPlugin.get().getClass().getName(),
+            transformVersion
+        );
+        AliasedPluginInfo predicateInfo = predicatePlugin != null ? new 
AliasedPluginInfo(
+            predicateAlias,
+            predicatePlugin.get().getClass().getName(), predicateVersion
+        ) : null;
+        return new StageInfo(transformInfo, predicateInfo);
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c1acc5647e..a3e914d3f90 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -346,7 +346,7 @@ public final class Worker {
                     }
                     workerConnector = new WorkerConnector(
                         connName, connector, connConfig, ctx, metrics, 
connectorStatusListener, offsetReader, offsetStore, connectorLoader);
-                    log.info("Instantiated connector {} with version {} of 
type {}", connName, connector.version(), connector.getClass());
+                    log.info("Instantiated connector {} with version {} of 
type {}", connName, workerConnector.connectorVersion(), connector.getClass());
                     workerConnector.transitionTo(initialState, 
onConnectorStateChange);
                 }
             } catch (Throwable t) {
@@ -562,6 +562,22 @@ public final class Worker {
         return workerConnector != null && workerConnector.isRunning();
     }
 
+    public String connectorVersion(String connName) {
+        WorkerConnector conn = connectors.get(connName);
+        if (conn == null) {
+            return null;
+        }
+        return conn.connectorVersion();
+    }
+
+    public String taskVersion(ConnectorTaskId taskId) {
+        WorkerTask<?, ?> task = tasks.get(taskId);
+        if (task == null) {
+            return null;
+        }
+        return task.taskVersion();
+    }
+
     /**
      * Start a sink task managed by this worker.
      *
@@ -714,7 +730,7 @@ public final class Worker {
                         .withKeyConverterPlugin(metrics.wrap(keyConverter, id, 
true))
                         .withValueConverterPlugin(metrics.wrap(valueConverter, 
id, false))
                         
.withHeaderConverterPlugin(metrics.wrap(headerConverter, id))
-                        .withClassloader(connectorLoader)
+                        .withClassLoader(connectorLoader)
                         .build();
 
                     workerTask.initialize(taskConfig);
@@ -1814,11 +1830,12 @@ public final class Worker {
             return this;
         }
 
-        public TaskBuilder<T, R> withClassloader(ClassLoader classLoader) {
+        public TaskBuilder<T, R> withClassLoader(ClassLoader classLoader) {
             this.classLoader = classLoader;
             return this;
         }
 
+
         public WorkerTask<T, R> build() {
             Objects.requireNonNull(task, "Task cannot be null");
             Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
@@ -1836,10 +1853,13 @@ public final class Worker {
             TransformationChain<T, R> transformationChain = new 
TransformationChain<>(connectorConfig.<R>transformationStages(plugins, id, 
metrics), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
+            TaskPluginsMetadata taskPluginsMetadata = new TaskPluginsMetadata(
+                    connectorClass, task, keyConverterPlugin.get(), 
valueConverterPlugin.get(), headerConverterPlugin.get(), 
transformationChain.transformationChainInfo(), plugins);
+
             return doBuild(task, id, configState, statusListener, initialState,
-                    connectorConfig, keyConverterPlugin, valueConverterPlugin, 
headerConverterPlugin, classLoader,
-                    retryWithToleranceOperator, transformationChain,
-                    errorHandlingMetrics, connectorClass);
+                connectorConfig, keyConverterPlugin, valueConverterPlugin, 
headerConverterPlugin, classLoader,
+                retryWithToleranceOperator, transformationChain,
+                errorHandlingMetrics, connectorClass, taskPluginsMetadata);
         }
 
         abstract WorkerTask<T, R> doBuild(
@@ -1856,7 +1876,8 @@ public final class Worker {
                 RetryWithToleranceOperator<T> retryWithToleranceOperator,
                 TransformationChain<T, R> transformationChain,
                 ErrorHandlingMetrics errorHandlingMetrics,
-                Class<? extends Connector> connectorClass
+                Class<? extends Connector> connectorClass,
+                TaskPluginsMetadata pluginsMetadata
         );
 
     }
@@ -1884,7 +1905,8 @@ public final class Worker {
                 RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> 
retryWithToleranceOperator,
                 TransformationChain<ConsumerRecord<byte[], byte[]>, 
SinkRecord> transformationChain,
                 ErrorHandlingMetrics errorHandlingMetrics,
-                Class<? extends Connector> connectorClass
+                Class<? extends Connector> connectorClass,
+                TaskPluginsMetadata taskPluginsMetadata
         ) {
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
             WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
@@ -1898,7 +1920,7 @@ public final class Worker {
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverterPlugin,
                     valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin, transformationChain, consumer, classLoader, time,
                     retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore(),
-                    () -> sinkTaskReporters(id, sinkConfig, 
errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper());
+                    () -> sinkTaskReporters(id, sinkConfig, 
errorHandlingMetrics, connectorClass), taskPluginsMetadata, 
plugins.safeLoaderSwapper());
         }
     }
 
@@ -1925,7 +1947,8 @@ public final class Worker {
                 RetryWithToleranceOperator<SourceRecord> 
retryWithToleranceOperator,
                 TransformationChain<SourceRecord, SourceRecord> 
transformationChain,
                 ErrorHandlingMetrics errorHandlingMetrics,
-                Class<? extends Connector> connectorClass
+                Class<? extends Connector> connectorClass,
+                TaskPluginsMetadata pluginsMetadata
         ) {
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
@@ -1958,7 +1981,7 @@ public final class Worker {
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
                     headerConverterPlugin, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
-                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), 
plugins.safeLoaderSwapper());
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), 
pluginsMetadata, plugins.safeLoaderSwapper());
         }
     }
 
@@ -1992,7 +2015,8 @@ public final class Worker {
                 RetryWithToleranceOperator<SourceRecord> 
retryWithToleranceOperator,
                 TransformationChain<SourceRecord, SourceRecord> 
transformationChain,
                 ErrorHandlingMetrics errorHandlingMetrics,
-                Class<? extends Connector> connectorClass
+                Class<? extends Connector> connectorClass,
+                TaskPluginsMetadata pluginsMetadata
         ) {
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
@@ -2023,7 +2047,7 @@ public final class Worker {
                     headerConverterPlugin, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, errorHandlingMetrics, classLoader, time, 
retryWithToleranceOperator,
                     herder.statusBackingStore(), sourceConfig, executor, 
preProducerCheck, postProducerCheck,
-                    () -> sourceTaskReporters(id, sourceConfig, 
errorHandlingMetrics), plugins.safeLoaderSwapper());
+                    () -> sourceTaskReporters(id, sourceConfig, 
errorHandlingMetrics), pluginsMetadata, plugins.safeLoaderSwapper());
         }
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index e2473dbbf71..3faf70f898c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -78,6 +78,7 @@ public class WorkerConnector implements Runnable {
     private volatile Throwable externalFailure;
     private volatile boolean stopping;  // indicates whether the Worker has 
asked the connector to stop
     private volatile boolean cancelled; // indicates whether the Worker has 
cancelled the connector (e.g. because of slow shutdown)
+    private final String version;
 
     private State state;
     private final CloseableOffsetStorageReader offsetStorageReader;
@@ -97,8 +98,9 @@ public class WorkerConnector implements Runnable {
         this.loader = loader;
         this.ctx = ctx;
         this.connector = connector;
+        this.version = connector.version();
         this.state = State.INIT;
-        this.metrics = new ConnectorMetricsGroup(connectMetrics, 
AbstractStatus.State.UNASSIGNED, statusListener);
+        this.metrics = new ConnectorMetricsGroup(connectMetrics, 
AbstractStatus.State.UNASSIGNED, this.version, statusListener);
         this.statusListener = this.metrics;
         this.offsetStorageReader = offsetStorageReader;
         this.offsetStore = offsetStore;
@@ -418,6 +420,10 @@ public class WorkerConnector implements Runnable {
         return ConnectUtils.isSourceConnector(connector);
     }
 
+    public String connectorVersion() {
+        return version;
+    }
+
     protected final String connectorType() {
         if (isSinkConnector())
             return "sink";
@@ -450,7 +456,12 @@ public class WorkerConnector implements Runnable {
         private final MetricGroup metricGroup;
         private final ConnectorStatus.Listener delegate;
 
-        public ConnectorMetricsGroup(ConnectMetrics connectMetrics, 
AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
+        public ConnectorMetricsGroup(
+            ConnectMetrics connectMetrics,
+            AbstractStatus.State initialState,
+            String connectorVersion,
+            ConnectorStatus.Listener delegate
+        ) {
             Objects.requireNonNull(connectMetrics);
             Objects.requireNonNull(connector);
             Objects.requireNonNull(initialState);
@@ -465,7 +476,7 @@ public class WorkerConnector implements Runnable {
 
             metricGroup.addImmutableValueMetric(registry.connectorType, 
connectorType());
             metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.getClass().getName());
-            metricGroup.addImmutableValueMetric(registry.connectorVersion, 
connector.version());
+            metricGroup.addImmutableValueMetric(registry.connectorVersion, 
connectorVersion);
             metricGroup.addValueMetric(registry.connectorStatus, now -> 
state.toString().toLowerCase(Locale.getDefault()));
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 4b8256115ed..14b093c9123 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -105,6 +105,7 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
     private boolean committing;
     private boolean taskStopped;
     private final WorkerErrantRecordReporter workerErrantRecordReporter;
+    private final String version;
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
@@ -125,9 +126,10 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
                           WorkerErrantRecordReporter 
workerErrantRecordReporter,
                           StatusBackingStore statusBackingStore,
                           Supplier<List<ErrorReporter<ConsumerRecord<byte[], 
byte[]>>>> errorReportersSupplier,
+                          TaskPluginsMetadata pluginsMetadata,
                           Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
-                retryWithToleranceOperator, transformationChain, 
errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper);
+                retryWithToleranceOperator, transformationChain, 
errorReportersSupplier, time, statusBackingStore, pluginsMetadata, 
pluginLoaderSwapper);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -153,6 +155,7 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.taskStopped = false;
         this.workerErrantRecordReporter = workerErrantRecordReporter;
+        this.version = task.version();
     }
 
     @Override
@@ -227,6 +230,11 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
         }
     }
 
+    @Override
+    public String taskVersion() {
+        return version;
+    }
+
     protected void iteration() {
         final long offsetCommitIntervalMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0806e887735..3ccd530be39 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -94,12 +94,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
                             StatusBackingStore statusBackingStore,
                             Executor closeExecutor,
                             Supplier<List<ErrorReporter<SourceRecord>>> 
errorReportersSupplier,
+                            TaskPluginsMetadata pluginsMetadata,
                             Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
 
         super(id, task, statusListener, initialState, configState, 
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 null, producer,
                 admin, topicGroups, offsetReader, offsetWriter, offsetStore, 
workerConfig, connectMetrics, errorMetrics, loader,
-                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
+                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper);
 
         this.committableOffsets = CommittableOffsets.EMPTY;
         this.submittedRecords = new SubmittedRecords();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index fa28a4e7b0e..1661d710a86 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -41,6 +41,7 @@ import org.apache.kafka.connect.util.LoggingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
@@ -94,9 +95,10 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
                       Supplier<List<ErrorReporter<T>>> errorReportersSupplier,
                       Time time,
                       StatusBackingStore statusBackingStore,
+                      TaskPluginsMetadata pluginsMetadata,
                       Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
         this.id = id;
-        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, 
statusListener);
+        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, 
statusListener, pluginsMetadata);
         this.errorMetrics = errorMetrics;
         this.statusListener = taskMetricsGroup;
         this.loader = loader;
@@ -196,6 +198,8 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
 
     protected abstract void close();
 
+    protected abstract String taskVersion();
+
     protected boolean isFailed() {
         return failed;
     }
@@ -397,14 +401,25 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
     static class TaskMetricsGroup implements TaskStatus.Listener {
         private final TaskStatus.Listener delegateListener;
         private final MetricGroup metricGroup;
+        private final List<MetricGroup> transformationGroups = new 
ArrayList<>();
+        private final List<MetricGroup> predicateGroups = new ArrayList<>();
         private final Time time;
         private final StateTracker taskStateTimer;
         private final Sensor commitTime;
         private final Sensor batchSize;
         private final Sensor commitAttempts;
+        private final ConnectMetrics connectMetrics;
+        private final ConnectorTaskId id;
 
         public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics, TaskStatus.Listener statusListener) {
+            this(id, connectMetrics, statusListener, null);
+        }
+
+        public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics, TaskStatus.Listener statusListener, TaskPluginsMetadata 
pluginsMetadata) {
             delegateListener = statusListener;
+            this.connectMetrics = connectMetrics;
+            this.id = id;
+
             time = connectMetrics.time();
             taskStateTimer = new StateTracker();
             ConnectMetricsRegistry registry = connectMetrics.registry();
@@ -434,6 +449,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
             Frequencies commitFrequencies = 
Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
             commitAttempts = metricGroup.sensor("offset-commit-completion");
             commitAttempts.add(commitFrequencies);
+            addPluginInfoMetric(pluginsMetadata);
         }
 
         private void addRatioMetric(final State matchingState, 
MetricNameTemplate template) {
@@ -442,8 +458,52 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
                     taskStateTimer.durationRatio(matchingState, now));
         }
 
+        private void addPluginInfoMetric(TaskPluginsMetadata pluginsMetadata) {
+            if (pluginsMetadata == null) {
+                return;
+            }
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup.addValueMetric(registry.taskConnectorClass, now -> 
pluginsMetadata.connectorClass());
+            metricGroup.addValueMetric(registry.taskConnectorClassVersion, now 
-> pluginsMetadata.connectorVersion());
+            metricGroup.addValueMetric(registry.taskConnectorType, now -> 
pluginsMetadata.connectorType());
+            metricGroup.addValueMetric(registry.taskClass, now -> 
pluginsMetadata.taskClass());
+            metricGroup.addValueMetric(registry.taskVersion, now -> 
pluginsMetadata.taskVersion());
+            metricGroup.addValueMetric(registry.taskKeyConverterClass, now -> 
pluginsMetadata.keyConverterClass());
+            metricGroup.addValueMetric(registry.taskKeyConverterVersion, now 
-> pluginsMetadata.keyConverterVersion());
+            metricGroup.addValueMetric(registry.taskValueConverterClass, now 
-> pluginsMetadata.valueConverterClass());
+            metricGroup.addValueMetric(registry.taskValueConverterVersion, now 
-> pluginsMetadata.valueConverterVersion());
+            metricGroup.addValueMetric(registry.taskHeaderConverterClass, now 
-> pluginsMetadata.headerConverterClass());
+            metricGroup.addValueMetric(registry.taskHeaderConverterVersion, 
now -> pluginsMetadata.headerConverterVersion());
+
+            if (!pluginsMetadata.transformations().isEmpty()) {
+                for (TransformationStage.AliasedPluginInfo entry : 
pluginsMetadata.transformations()) {
+                    MetricGroup transformationGroup = 
connectMetrics.group(registry.transformsGroupName(),
+                            registry.connectorTagName(), id.connector(),
+                            registry.taskTagName(), 
Integer.toString(id.task()),
+                            registry.transformsTagName(), entry.alias());
+                    
transformationGroup.addValueMetric(registry.transformClass, now -> 
entry.className());
+                    
transformationGroup.addValueMetric(registry.transformVersion, now -> 
entry.version());
+                    this.transformationGroups.add(transformationGroup);
+                }
+            }
+
+            if (!pluginsMetadata.predicates().isEmpty()) {
+                for (TransformationStage.AliasedPluginInfo entry : 
pluginsMetadata.predicates()) {
+                    MetricGroup predicateGroup = 
connectMetrics.group(registry.predicatesGroupName(),
+                            registry.connectorTagName(), id.connector(),
+                            registry.taskTagName(), 
Integer.toString(id.task()),
+                            registry.predicateTagName(), entry.alias());
+                    predicateGroup.addValueMetric(registry.predicateClass, now 
-> entry.className());
+                    predicateGroup.addValueMetric(registry.predicateVersion, 
now -> entry.version());
+                    this.predicateGroups.add(predicateGroup);
+                }
+            }
+        }
+
         void close() {
             metricGroup.close();
+            transformationGroups.forEach(MetricGroup::close);
+            predicateGroups.forEach(MetricGroup::close);
         }
 
         void recordCommit(long duration, boolean success) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index 82d9957b40d..8d94d562045 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -65,11 +65,13 @@ public class ConnectorStateInfo {
         private final String state;
         private final String trace;
         private final String workerId;
+        private final String version;
 
-        public AbstractState(String state, String workerId, String trace) {
+        public AbstractState(String state, String workerId, String trace, 
String version) {
             this.state = state;
             this.workerId = workerId;
             this.trace = trace;
+            this.version = version;
         }
 
         @JsonProperty
@@ -87,14 +89,22 @@ public class ConnectorStateInfo {
         public String trace() {
             return trace;
         }
+
+        @JsonProperty
+        @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = 
PluginInfo.NoVersionFilter.class)
+        public String version() {
+            return version;
+        }
     }
 
     public static class ConnectorState extends AbstractState {
+
         @JsonCreator
         public ConnectorState(@JsonProperty("state") String state,
                               @JsonProperty("worker_id") String worker,
-                              @JsonProperty("msg") String msg) {
-            super(state, worker, msg);
+                              @JsonProperty("msg") String msg,
+                              @JsonProperty("version") String version) {
+            super(state, worker, msg, version);
         }
     }
 
@@ -105,8 +115,9 @@ public class ConnectorStateInfo {
         public TaskState(@JsonProperty("id") int id,
                          @JsonProperty("state") String state,
                          @JsonProperty("worker_id") String worker,
-                         @JsonProperty("msg") String msg) {
-            super(state, worker, msg);
+                         @JsonProperty("msg") String msg,
+                         @JsonProperty("version") String version) {
+            super(state, worker, msg, version);
             this.id = id;
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 0a9e3837006..b9382399cf4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -101,6 +101,7 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
     public static final String TRACE_KEY_NAME = "trace";
     public static final String WORKER_ID_KEY_NAME = "worker_id";
     public static final String GENERATION_KEY_NAME = "generation";
+    public static final String VERSION_KEY_NAME = "version";
 
     public static final String TOPIC_STATE_KEY = "topic";
     public static final String TOPIC_NAME_KEY = "name";
@@ -113,6 +114,7 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
             .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
             .field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
             .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+            .field(VERSION_KEY_NAME, Schema.OPTIONAL_STRING_SCHEMA)
             .build();
 
     private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 = 
SchemaBuilder.struct()
@@ -428,7 +430,8 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
             String trace = (String) statusMap.get(TRACE_KEY_NAME);
             String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
             int generation = ((Long) 
statusMap.get(GENERATION_KEY_NAME)).intValue();
-            return new ConnectorStatus(connector, state, trace, workerUrl, 
generation);
+            String version = (String) statusMap.get(VERSION_KEY_NAME);
+            return new ConnectorStatus(connector, state, trace, workerUrl, 
generation, version);
         } catch (Exception e) {
             log.error("Failed to deserialize connector status", e);
             return null;
@@ -448,7 +451,8 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
             String trace = (String) statusMap.get(TRACE_KEY_NAME);
             String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
             int generation = ((Long) 
statusMap.get(GENERATION_KEY_NAME)).intValue();
-            return new TaskStatus(taskId, state, workerUrl, generation, trace);
+            String version = (String) statusMap.get(VERSION_KEY_NAME);
+            return new TaskStatus(taskId, state, workerUrl, generation, trace, 
version);
         } catch (Exception e) {
             log.error("Failed to deserialize task status", e);
             return null;
@@ -487,6 +491,7 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
             struct.put(TRACE_KEY_NAME, status.trace());
         struct.put(WORKER_ID_KEY_NAME, status.workerId());
         struct.put(GENERATION_KEY_NAME, status.generation());
+        struct.put(VERSION_KEY_NAME, status.version());
         return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0, 
struct);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 9ad8690ca1c..97b8edf7605 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -964,7 +964,7 @@ public class AbstractWorkerSourceTaskTest {
                 taskId, sourceTask, statusListener, TargetState.STARTED, 
configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 workerTransactionContext, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
                 config, metrics, errorHandlingMetrics,  
plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
-                statusBackingStore, Runnable::run, errorReportersSupplier, 
TestPlugins.noOpLoaderSwap()) {
+                statusBackingStore, Runnable::run, errorReportersSupplier, 
null, TestPlugins.noOpLoaderSwap()) {
             @Override
             protected void prepareToInitializeTask() {
             }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 70edfb0f598..340585f0fd6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -426,9 +426,9 @@ public class ErrorHandlingTaskTest {
         oo.put("schemas.enable", "false");
         converter.configure(oo);
 
-        Plugin<Transformation<SinkRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "");
+        Plugin<Transformation<SinkRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "test");
         TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord> 
sinkTransforms =
-                new TransformationChain<>(singletonList(new 
TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), 
retryWithToleranceOperator);
+                new TransformationChain<>(singletonList(new 
TransformationStage<>(transformationPlugin, "test", null, 
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
 
         Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, 
 true);
         Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, 
taskId,  false);
@@ -438,7 +438,7 @@ public class ErrorHandlingTaskTest {
             ClusterConfigState.EMPTY, metrics, keyConverterPlugin, 
valueConverterPlugin, errorHandlingMetrics,
                 headerConverterPlugin, sinkTransforms, consumer, pluginLoader, 
time,
             retryWithToleranceOperator, workerErrantRecordReporter,
-                statusBackingStore, () -> errorReporters, 
TestPlugins.noOpLoaderSwap());
+                statusBackingStore, () -> errorReporters, null, 
TestPlugins.noOpLoaderSwap());
     }
 
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, 
List<ErrorReporter<SourceRecord>> errorReporters) {
@@ -462,9 +462,9 @@ public class ErrorHandlingTaskTest {
 
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
                                   List<ErrorReporter<SourceRecord>> 
errorReporters, Converter converter) {
-        Plugin<Transformation<SourceRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "");
+        Plugin<Transformation<SourceRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "test");
         TransformationChain<SourceRecord, SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(
-                new TransformationStage<>(transformationPlugin, 
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
+                new TransformationStage<>(transformationPlugin, "test", null, 
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
 
         Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, 
 true);
         Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, 
taskId,  false);
@@ -477,7 +477,7 @@ public class ErrorHandlingTaskTest {
                 offsetReader, offsetWriter, offsetStore, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time,
                 retryWithToleranceOperator,
-                statusBackingStore, Runnable::run, () -> errorReporters, 
TestPlugins.noOpLoaderSwap()));
+                statusBackingStore, Runnable::run, () -> errorReporters, null, 
TestPlugins.noOpLoaderSwap()));
 
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index a6375398d29..f45ec27b46e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -198,6 +198,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
             Thread.sleep(10);
             return result;
         });
+        when(sourceTask.version()).thenReturn(null);
     }
 
     @AfterEach
@@ -222,8 +223,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
         }
 
         verify(statusBackingStore, MockitoUtils.anyTimes()).getTopic(any(), 
any());
-
         verify(offsetStore, MockitoUtils.anyTimes()).primaryOffsetsTopic();
+        verify(sourceTask).version();
 
         verifyNoMoreInteractions(statusListener, producer, sourceTask, admin, 
offsetWriter, statusBackingStore, offsetStore, preProducerCheck, 
postProducerCheck);
         if (metrics != null) metrics.stop();
@@ -284,7 +285,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, 
statusListener, initialState, keyConverterPlugin, valueConverterPlugin, 
headerConverterPlugin,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
                 config, clusterConfigState, metrics, errorHandlingMetrics, 
plugins.delegatingLoader(), time, 
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
-                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap());
+                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
     }
 
     @ParameterizedTest
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
index 8d6f54ce258..d0f3f974c63 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
@@ -35,17 +35,15 @@ public class RestartPlanTest {
     @Test
     public void testRestartPlan() {
         ConnectorStateInfo.ConnectorState state = new 
ConnectorStateInfo.ConnectorState(
-                AbstractStatus.State.RESTARTING.name(),
-                "foo",
-                null
+                AbstractStatus.State.RESTARTING.name(), "foo", null, null
         );
         List<TaskState> tasks = new ArrayList<>();
-        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null));
-        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null));
-        tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(), 
"worker1", null));
-        tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(), 
"worker1", null));
-        tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(), 
"worker1", null));
-        tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(), 
"worker1", null));
+        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(), 
"worker1", null, null));
         ConnectorStateInfo connectorStateInfo = new 
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
 
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
@@ -61,13 +59,11 @@ public class RestartPlanTest {
     @Test
     public void testNoRestartsPlan() {
         ConnectorStateInfo.ConnectorState state = new 
ConnectorStateInfo.ConnectorState(
-                AbstractStatus.State.RUNNING.name(),
-                "foo",
-                null
+                AbstractStatus.State.RUNNING.name(), "foo", null, null
         );
         List<TaskState> tasks = new ArrayList<>();
-        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null));
-        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null));
+        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null, null));
         ConnectorStateInfo connectorStateInfo = new 
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
         RestartPlan restartPlan = new RestartPlan(restartRequest, 
connectorStateInfo);
@@ -81,13 +77,11 @@ public class RestartPlanTest {
     @Test
     public void testRestartsOnlyConnector() {
         ConnectorStateInfo.ConnectorState state = new 
ConnectorStateInfo.ConnectorState(
-                AbstractStatus.State.RESTARTING.name(),
-                "foo",
-                null
+                AbstractStatus.State.RESTARTING.name(), "foo", null, null
         );
         List<TaskState> tasks = new ArrayList<>();
-        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null));
-        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null));
+        tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), 
"worker1", null, null));
+        tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), 
"worker1", null, null));
         ConnectorStateInfo connectorStateInfo = new 
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
         RestartPlan restartPlan = new RestartPlan(restartRequest, 
connectorStateInfo);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index e2791a63f7b..959c5d2ac01 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -61,8 +61,12 @@ public class TransformationStageTest {
         }
         TransformationStage<SourceRecord> stage = new TransformationStage<>(
                 predicatePlugin,
+                "testPredicate",
+                null,
                 negate,
                 transformationPlugin,
+                "testTransformation",
+                null,
                 TestPlugins.noOpLoaderSwap()
         );
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 2607ee8b03b..539960badec 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -229,7 +229,7 @@ public class WorkerSinkTaskTest {
                 taskId, task, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, connectMetrics,
                 keyConverterPlugin, valueConverterPlugin, errorMetrics, 
headerConverterPlugin,
                 transformationChain, consumer, loader, time,
-                retryWithToleranceOperator, null, statusBackingStore, 
errorReportersSupplier, TestPlugins.noOpLoaderSwap());
+                retryWithToleranceOperator, null, statusBackingStore, 
errorReportersSupplier, null, TestPlugins.noOpLoaderSwap());
     }
 
     @AfterEach
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 74021118098..6c2c593c35b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -183,7 +183,7 @@ public class WorkerSinkTaskThreadedTest {
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
                 valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin, transformationChain,
                 consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
-                Collections::emptyList, TestPlugins.noOpLoaderSwap());
+                Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
         recordsReturned = 0;
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 23fb3618f81..d2fd923fdb9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -255,7 +255,7 @@ public class WorkerSourceTaskTest {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, offsetStore, config, 
clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                retryWithToleranceOperator, statusBackingStore, Runnable::run, 
Collections::emptyList, TestPlugins.noOpLoaderSwap());
+                retryWithToleranceOperator, statusBackingStore, Runnable::run, 
Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
     }
 
     @ParameterizedTest
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index eae9c96998b..fa445454fd0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -300,7 +300,7 @@ public class WorkerTaskTest {
                               Supplier<List<ErrorReporter<Object>>> 
errorReporterSupplier,
                               Time time, StatusBackingStore 
statusBackingStore) {
             super(id, statusListener, initialState, loader, connectMetrics, 
errorHandlingMetrics,
-                    retryWithToleranceOperator, transformationChain, 
errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap());
+                    retryWithToleranceOperator, transformationChain, 
errorReporterSupplier, time, statusBackingStore, null, 
TestPlugins.noOpLoaderSwap());
         }
 
         @Override
@@ -318,6 +318,11 @@ public class WorkerTaskTest {
         @Override
         protected void close() {
         }
+
+        @Override
+        protected String taskVersion() {
+            return null;
+        }
     }
 
     protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 71dcabbedb6..38bcab1b594 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -805,11 +806,11 @@ public class WorkerTest {
 
         // Each time we check the task metrics, the worker will call the herder
         when(herder.taskStatus(TASK_ID)).thenReturn(
-                new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", 
"msg"),
-                new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"),
-                new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"),
-                new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", 
"msg"),
-                new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", 
"msg")
+                new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", 
"msg", null),
+                new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg", 
null),
+                new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg", 
null),
+                new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", 
"msg", null),
+                new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", 
"msg", null)
         );
 
         worker = new Worker(WORKER_ID,
@@ -3072,6 +3073,7 @@ public class WorkerTest {
         when(plugins.pluginLoader(connectorClass.getName(), 
range)).thenReturn(pluginLoader);
         when(plugins.connectorClass(connectorClass.getName(), 
range)).thenReturn((Class) connectorClass);
         when(plugins.newTask(taskClass)).thenReturn(task);
+        
when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap());
         when(task.version()).thenReturn(range == null ? "unknown" : 
range.toString());
     }
 
@@ -3087,7 +3089,7 @@ public class WorkerTest {
         verify(plugins).pluginLoader(connectorClass.getName(), range);
         verify(plugins).connectorClass(connectorClass.getName(), range);
         verify(plugins).newTask(taskClass);
-        verify(task).version();
+        verify(task, times(2)).version();
     }
 
     private void mockExecutorRealSubmit(Class<? extends Runnable> 
runnableClass) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 3574137b6c3..3e1e75480b5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -195,8 +195,12 @@ public class WorkerTestUtils {
         when(transformationPlugin.get()).thenReturn(transformation);
         TransformationStage<R> stage = new TransformationStage<>(
                 predicatePlugin,
+                "testPredicate",
+                null,
                 false,
                 transformationPlugin,
+                "testTransformation",
+                null,
                 TestPlugins.noOpLoaderSwap());
         TransformationChain<T, R> realTransformationChainRetriableException = 
new TransformationChain<>(List.of(stage), toleranceOperator);
         return Mockito.spy(realTransformationChainRetriableException);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 73eaf93961e..18589d66855 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1348,6 +1348,7 @@ public class DistributedHerderTest {
             return true;
         }).when(worker).startConnector(eq(CONN1), any(), any(), eq(herder), 
any(), stateCallback.capture());
         doNothing().when(member).wakeup();
+        when(worker.connectorVersion(any())).thenReturn(null);
 
         herder.doRestartConnectorAndTasks(restartRequest);
 
@@ -1378,6 +1379,7 @@ public class DistributedHerderTest {
         doNothing().when(statusBackingStore).put(eq(status));
 
         when(worker.startSourceTask(eq(TASK0), any(), any(), any(), 
eq(herder), any())).thenReturn(true);
+        when(worker.taskVersion(any())).thenReturn(null);
 
         herder.doRestartConnectorAndTasks(restartRequest);
 
@@ -1419,6 +1421,8 @@ public class DistributedHerderTest {
         doNothing().when(statusBackingStore).put(eq(taskStatus));
 
         when(worker.startSourceTask(eq(TASK0), any(), any(), any(), 
eq(herder), any())).thenReturn(true);
+        when(worker.taskVersion(any())).thenReturn(null);
+        when(worker.connectorVersion(any())).thenReturn(null);
 
         herder.doRestartConnectorAndTasks(restartRequest);
 
@@ -1670,6 +1674,7 @@ public class DistributedHerderTest {
         when(member.memberId()).thenReturn("member");
         when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         when(worker.isSinkConnector(CONN1)).thenReturn(Boolean.TRUE);
+        when(worker.connectorVersion(CONN1)).thenReturn(null);
 
         WorkerConfigTransformer configTransformer = 
mock(WorkerConfigTransformer.class);
         // join
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 9dfead77220..e38cd2da60d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -688,9 +688,7 @@ public class ConnectorsResourceTest {
     @Test
     public void testRestartConnectorAndTasksRequestAccepted() throws Throwable 
{
         ConnectorStateInfo.ConnectorState state = new 
ConnectorStateInfo.ConnectorState(
-                AbstractStatus.State.RESTARTING.name(),
-                "foo",
-                null
+                AbstractStatus.State.RESTARTING.name(), "foo", null, null
         );
         ConnectorStateInfo connectorStateInfo = new 
ConnectorStateInfo(CONNECTOR_NAME, state, Collections.emptyList(), 
ConnectorType.SOURCE);
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 9e893e79eba..76b25131cc5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -280,6 +280,7 @@ public class StandaloneHerderTest {
         expectConfigValidation(SourceSink.SOURCE, config);
 
         
when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList());
+        when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
@@ -533,6 +534,7 @@ public class StandaloneHerderTest {
         expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+        when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null);
 
         mockStartConnector(connectorConfig, null, TargetState.STARTED, null);
 
@@ -563,6 +565,7 @@ public class StandaloneHerderTest {
         
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
         expectAdd(SourceSink.SINK);
+        when(worker.taskVersion(any())).thenReturn(null);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
         expectConfigValidation(SourceSink.SINK, connectorConfig);
@@ -616,6 +619,8 @@ public class StandaloneHerderTest {
         ArgumentCaptor<TaskStatus> taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
 
         expectAdd(SourceSink.SINK, false);
+        when(worker.connectorVersion(any())).thenReturn(null);
+        when(worker.taskVersion(any())).thenReturn(null);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
         expectConfigValidation(SourceSink.SINK, connectorConfig);
@@ -1124,6 +1129,7 @@ public class StandaloneHerderTest {
         }
 
         when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
+
         if (sourceSink == SourceSink.SOURCE) {
             when(worker.isTopicCreationEnabled()).thenReturn(true);
         }
@@ -1152,6 +1158,7 @@ public class StandaloneHerderTest {
             transformer);
 
         if (sourceSink.equals(SourceSink.SOURCE) && mockStartSourceTask) {
+            when(worker.taskVersion(any())).thenReturn(null);
             when(worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 
0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, 
TargetState.STARTED)).thenReturn(true);
         }
 


Reply via email to