This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d6133f6997e KAFKA-18988: Connect Multiversion Support (Updates to
status and metrics) (#17988)
d6133f6997e is described below
commit d6133f6997e0de3931f3d98b30950aeeefd23e70
Author: snehashisp <[email protected]>
AuthorDate: Thu Apr 24 22:53:31 2025 +0530
KAFKA-18988: Connect Multiversion Support (Updates to status and metrics)
(#17988)
Reviewers: Greg Harris <[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 50 ++++----
.../kafka/connect/runtime/AbstractStatus.java | 22 +++-
.../connect/runtime/AbstractWorkerSourceTask.java | 10 +-
.../connect/runtime/ConnectMetricsRegistry.java | 72 +++++++++++
.../kafka/connect/runtime/ConnectorConfig.java | 22 +++-
.../kafka/connect/runtime/ConnectorStatus.java | 8 +-
.../runtime/ExactlyOnceWorkerSourceTask.java | 3 +-
.../kafka/connect/runtime/TaskPluginsMetadata.java | 131 +++++++++++++++++++++
.../apache/kafka/connect/runtime/TaskStatus.java | 4 +-
.../kafka/connect/runtime/TransformationChain.java | 5 +
.../kafka/connect/runtime/TransformationStage.java | 57 ++++++++-
.../org/apache/kafka/connect/runtime/Worker.java | 50 ++++++--
.../kafka/connect/runtime/WorkerConnector.java | 17 ++-
.../kafka/connect/runtime/WorkerSinkTask.java | 10 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 3 +-
.../apache/kafka/connect/runtime/WorkerTask.java | 62 +++++++++-
.../runtime/rest/entities/ConnectorStateInfo.java | 21 +++-
.../connect/storage/KafkaStatusBackingStore.java | 9 +-
.../runtime/AbstractWorkerSourceTaskTest.java | 2 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 12 +-
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 5 +-
.../kafka/connect/runtime/RestartPlanTest.java | 32 ++---
.../connect/runtime/TransformationStageTest.java | 4 +
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 2 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 2 +-
.../connect/runtime/WorkerSourceTaskTest.java | 2 +-
.../kafka/connect/runtime/WorkerTaskTest.java | 7 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 14 ++-
.../kafka/connect/runtime/WorkerTestUtils.java | 4 +
.../runtime/distributed/DistributedHerderTest.java | 5 +
.../rest/resources/ConnectorsResourceTest.java | 4 +-
.../runtime/standalone/StandaloneHerderTest.java | 7 ++
32 files changed, 556 insertions(+), 102 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 5a787909925..bce67129388 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -203,83 +203,91 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
@Override
public void onStartup(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.RUNNING,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
@Override
public void onStop(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
AbstractStatus.State.STOPPED,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
@Override
public void onPause(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.PAUSED,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
@Override
public void onResume(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
TaskStatus.State.RUNNING,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
@Override
public void onShutdown(String connector) {
statusBackingStore.putSafe(new ConnectorStatus(connector,
ConnectorStatus.State.UNASSIGNED,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
@Override
public void onFailure(String connector, Throwable cause) {
statusBackingStore.putSafe(new ConnectorStatus(connector,
ConnectorStatus.State.FAILED,
- trace(cause), workerId, generation()));
+ trace(cause), workerId, generation(),
worker.connectorVersion(connector)));
}
@Override
public void onStartup(ConnectorTaskId id) {
- statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING,
workerId, generation()));
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING,
workerId, generation(), null,
+ worker.taskVersion(id)));
}
@Override
public void onFailure(ConnectorTaskId id, Throwable cause) {
- statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED,
workerId, generation(), trace(cause)));
+ statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED,
workerId, generation(), trace(cause),
+ worker.taskVersion(id)));
}
@Override
public void onShutdown(ConnectorTaskId id) {
- statusBackingStore.putSafe(new TaskStatus(id,
TaskStatus.State.UNASSIGNED, workerId, generation()));
+ statusBackingStore.putSafe(new TaskStatus(id,
TaskStatus.State.UNASSIGNED, workerId, generation(), null,
+ worker.taskVersion(id)));
}
@Override
public void onResume(ConnectorTaskId id) {
- statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING,
workerId, generation()));
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING,
workerId, generation(), null,
+ worker.taskVersion(id)));
}
@Override
public void onPause(ConnectorTaskId id) {
- statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED,
workerId, generation()));
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED,
workerId, generation(), null,
+ worker.taskVersion(id)));
}
@Override
public void onDeletion(String connector) {
for (TaskStatus status : statusBackingStore.getAll(connector))
onDeletion(status.id());
- statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.DESTROYED, workerId, generation()));
+ statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.DESTROYED, workerId, generation(),
+ worker.connectorVersion(connector)));
}
@Override
public void onDeletion(ConnectorTaskId id) {
- statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED,
workerId, generation()));
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED,
workerId, generation(), null,
+ worker.taskVersion(id)));
}
public void onRestart(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.RESTARTING,
- workerId, generation()));
+ workerId, generation(), worker.connectorVersion(connector)));
}
public void onRestart(ConnectorTaskId id) {
- statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING,
workerId, generation()));
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING,
workerId, generation(), null,
+ worker.taskVersion(id)));
}
@Override
@@ -347,12 +355,12 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
ConnectorStateInfo.ConnectorState connectorState = new
ConnectorStateInfo.ConnectorState(
- connector.state().toString(), connector.workerId(),
connector.trace());
+ connector.state().toString(), connector.workerId(),
connector.trace(), connector.version());
List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
for (TaskStatus status : tasks) {
taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
- status.state().toString(), status.workerId(),
status.trace()));
+ status.state().toString(), status.workerId(),
status.trace(), status.version()));
}
Collections.sort(taskStates);
@@ -388,7 +396,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
throw new NotFoundException("No status found for task " + id);
return new ConnectorStateInfo.TaskState(id.task(),
status.state().toString(),
- status.workerId(), status.trace());
+ status.workerId(), status.trace(), status.version());
}
@Override
@@ -626,7 +634,8 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
ConnectorStateInfo.ConnectorState connectorInfoState = new
ConnectorStateInfo.ConnectorState(
connectorState.toString(),
connectorStatus.workerId(),
- connectorStatus.trace()
+ connectorStatus.trace(),
+ connectorStatus.version()
);
// Collect the task states, If requested, mark the task as restarting
@@ -638,7 +647,8 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
taskStatus.id().task(),
taskState.toString(),
taskStatus.workerId(),
- taskStatus.trace()
+ taskStatus.trace(),
+ taskStatus.version()
);
})
.collect(Collectors.toList());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index 76036d610d7..fc8bc7ca050 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -34,18 +34,29 @@ public abstract class AbstractStatus<T> {
private final State state;
private final String trace;
private final String workerId;
+ private final String version;
private final int generation;
public AbstractStatus(T id,
State state,
String workerId,
int generation,
- String trace) {
+ String trace,
+ String version) {
this.id = id;
this.state = state;
this.workerId = workerId;
this.generation = generation;
this.trace = trace;
+ this.version = version;
+ }
+
+ public AbstractStatus(T id,
+ State state,
+ String workerId,
+ int generation,
+ String trace) {
+ this(id, state, workerId, generation, trace, null);
}
public T id() {
@@ -68,12 +79,17 @@ public abstract class AbstractStatus<T> {
return generation;
}
+ public String version() {
+ return version;
+ }
+
@Override
public String toString() {
return "Status{" +
"id=" + id +
", state=" + state +
", workerId='" + workerId + '\'' +
+ ", version='" + version + '\'' +
", generation=" + generation +
'}';
}
@@ -89,7 +105,8 @@ public abstract class AbstractStatus<T> {
&& Objects.equals(id, that.id)
&& state == that.state
&& Objects.equals(trace, that.trace)
- && Objects.equals(workerId, that.workerId);
+ && Objects.equals(workerId, that.workerId)
+ && Objects.equals(version, that.version);
}
@Override
@@ -98,6 +115,7 @@ public abstract class AbstractStatus<T> {
result = 31 * result + (state != null ? state.hashCode() : 0);
result = 31 * result + (trace != null ? trace.hashCode() : 0);
result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+ result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + generation;
return result;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 683eb3abed0..9a74d81770f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -203,6 +203,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
private final boolean topicTrackingEnabled;
private final TopicCreation topicCreation;
private final Executor closeExecutor;
+ private final String version;
// Visible for testing
List<SourceRecord> toSend;
@@ -236,11 +237,12 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+ TaskPluginsMetadata pluginsMetadata,
Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics,
errorMetrics,
retryWithToleranceOperator, transformationChain,
errorReportersSupplier,
- time, statusBackingStore, pluginLoaderSwapper);
+ time, statusBackingStore, pluginsMetadata,
pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@@ -258,6 +260,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id,
connectMetrics);
this.topicTrackingEnabled =
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.topicCreation = TopicCreation.newTopicCreation(workerConfig,
topicGroups);
+ this.version = task.version();
}
@Override
@@ -391,6 +394,11 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
finalOffsetCommit(false);
}
+ @Override
+ public String taskVersion() {
+ return version;
+ }
+
/**
* Try to send a batch of records. If a send fails and is retriable, this
saves the remainder of the batch so it can
* be retried after backing off. If a send fails and is not retriable,
this will throw a ConnectException.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index 1d144440f2c..496c838cf45 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -37,6 +37,10 @@ public class ConnectMetricsRegistry {
public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
public static final String WORKER_REBALANCE_GROUP_NAME =
"connect-worker-rebalance-metrics";
public static final String TASK_ERROR_HANDLING_GROUP_NAME =
"task-error-metrics";
+ public static final String TRANSFORMS_GROUP =
"connector-transform-metrics";
+ public static final String PREDICATES_GROUP =
"connector-predicate-metrics";
+ public static final String TRANSFORM_TAG_NAME = "transform";
+ public static final String PREDICATE_TAG_NAME = "predicate";
private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
public final MetricNameTemplate connectorStatus;
@@ -59,6 +63,17 @@ public class ConnectMetricsRegistry {
public final MetricNameTemplate taskBatchSizeAvg;
public final MetricNameTemplate taskCommitFailurePercentage;
public final MetricNameTemplate taskCommitSuccessPercentage;
+ public final MetricNameTemplate taskConnectorClass;
+ public final MetricNameTemplate taskConnectorClassVersion;
+ public final MetricNameTemplate taskConnectorType;
+ public final MetricNameTemplate taskClass;
+ public final MetricNameTemplate taskVersion;
+ public final MetricNameTemplate taskKeyConverterClass;
+ public final MetricNameTemplate taskValueConverterClass;
+ public final MetricNameTemplate taskKeyConverterVersion;
+ public final MetricNameTemplate taskValueConverterVersion;
+ public final MetricNameTemplate taskHeaderConverterClass;
+ public final MetricNameTemplate taskHeaderConverterVersion;
public final MetricNameTemplate sourceRecordPollRate;
public final MetricNameTemplate sourceRecordPollTotal;
public final MetricNameTemplate sourceRecordWriteRate;
@@ -115,6 +130,10 @@ public class ConnectMetricsRegistry {
public final MetricNameTemplate transactionSizeMin;
public final MetricNameTemplate transactionSizeMax;
public final MetricNameTemplate transactionSizeAvg;
+ public final MetricNameTemplate transformClass;
+ public final MetricNameTemplate transformVersion;
+ public final MetricNameTemplate predicateClass;
+ public final MetricNameTemplate predicateVersion;
public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
@@ -164,6 +183,43 @@ public class ConnectMetricsRegistry {
taskCommitSuccessPercentage =
createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME,
"The average percentage
of this task's offset commit attempts that succeeded.",
workerTaskTags);
+ taskConnectorClass = createTemplate("connector-class",
TASK_GROUP_NAME, "The name of the connector class.", workerTaskTags);
+ taskConnectorClassVersion = createTemplate("connector-version",
TASK_GROUP_NAME,
+ "The version of the
connector class, as reported by the connector.", workerTaskTags);
+ taskConnectorType = createTemplate("connector-type", TASK_GROUP_NAME,
"The type of the connector. One of 'source' or 'sink'.",
+ workerTaskTags);
+ taskClass = createTemplate("task-class", TASK_GROUP_NAME, "The class
name of the task.", workerTaskTags);
+ taskVersion = createTemplate("task-version", TASK_GROUP_NAME, "The
version of the task.", workerTaskTags);
+ taskKeyConverterClass = createTemplate("key-converter-class",
TASK_GROUP_NAME,
+ "The fully qualified class name
from key.converter", workerTaskTags);
+ taskValueConverterClass = createTemplate("value-converter-class",
TASK_GROUP_NAME,
+ "The fully qualified class name
from value.converter", workerTaskTags);
+ taskKeyConverterVersion = createTemplate("key-converter-version",
TASK_GROUP_NAME,
+ "The version instantiated for
key.converter. May be undefined", workerTaskTags);
+ taskValueConverterVersion = createTemplate("value-converter-version",
TASK_GROUP_NAME,
+ "The version instantiated for
value.converter. May be undefined", workerTaskTags);
+ taskHeaderConverterClass = createTemplate("header-converter-class",
TASK_GROUP_NAME,
+ "The fully qualified class
name from header.converter", workerTaskTags);
+ taskHeaderConverterVersion =
createTemplate("header-converter-version", TASK_GROUP_NAME,
+ "The version instantiated
for header.converter. May be undefined", workerTaskTags);
+
+ /* Transformation Metrics */
+ Set<String> transformTags = new LinkedHashSet<>(tags);
+ transformTags.addAll(workerTaskTags);
+ transformTags.add(TRANSFORM_TAG_NAME);
+ transformClass = createTemplate("transform-class", TRANSFORMS_GROUP,
+ "The class name of the transformation class", transformTags);
+ transformVersion = createTemplate("transform-version",
TRANSFORMS_GROUP,
+ "The version of the transformation class", transformTags);
+
+ /* Predicate Metrics */
+ Set<String> predicateTags = new LinkedHashSet<>(tags);
+ predicateTags.addAll(workerTaskTags);
+ predicateTags.add(PREDICATE_TAG_NAME);
+ predicateClass = createTemplate("predicate-class", PREDICATES_GROUP,
+ "The class name of the predicate class", predicateTags);
+ predicateVersion = createTemplate("predicate-version",
PREDICATES_GROUP,
+ "The version of the predicate class", predicateTags);
/* Source worker task level */
Set<String> sourceTaskTags = new LinkedHashSet<>(tags);
@@ -426,4 +482,20 @@ public class ConnectMetricsRegistry {
public String taskErrorHandlingGroupName() {
return TASK_ERROR_HANDLING_GROUP_NAME;
}
+
+ public String transformsGroupName() {
+ return TRANSFORMS_GROUP;
+ }
+
+ public String transformsTagName() {
+ return TRANSFORM_TAG_NAME;
+ }
+
+ public String predicatesGroupName() {
+ return PREDICATES_GROUP;
+ }
+
+ public String predicateTagName() {
+ return PREDICATE_TAG_NAME;
+ }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 3a301335502..efd421bd2e2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -374,7 +374,7 @@ public class ConnectorConfig extends AbstractConfig {
final String versionConfig = prefix +
WorkerConfig.PLUGIN_VERSION_SUFFIX;
final Transformation<R> transformation =
getTransformationOrPredicate(plugins, typeConfig, versionConfig);
Map<String, Object> configs = originalsWithPrefix(prefix);
- Object predicateAlias =
configs.remove(TransformationStage.PREDICATE_CONFIG);
+ String predicateAlias = (String)
configs.remove(TransformationStage.PREDICATE_CONFIG);
Object negate =
configs.remove(TransformationStage.NEGATE_CONFIG);
transformation.configure(configs);
Plugin<Transformation<R>> transformationPlugin =
metrics.wrap(transformation, connectorTaskId, alias);
@@ -384,10 +384,24 @@ public class ConnectorConfig extends AbstractConfig {
final String predicateVersionConfig = predicatePrefix +
WorkerConfig.PLUGIN_VERSION_SUFFIX;
Predicate<R> predicate =
getTransformationOrPredicate(plugins, predicateTypeConfig,
predicateVersionConfig);
predicate.configure(originalsWithPrefix(predicatePrefix));
- Plugin<Predicate<R>> predicatePlugin =
metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
- transformations.add(new
TransformationStage<>(predicatePlugin, negate != null &&
Boolean.parseBoolean(negate.toString()), transformationPlugin,
plugins.safeLoaderSwapper()));
+ Plugin<Predicate<R>> predicatePlugin =
metrics.wrap(predicate, connectorTaskId, predicateAlias);
+ transformations.add(new TransformationStage<>(
+ predicatePlugin,
+ predicateAlias,
+ plugins.pluginVersion(predicate.getClass().getName(),
predicate.getClass().getClassLoader(), PluginType.PREDICATE),
+ negate != null &&
Boolean.parseBoolean(negate.toString()),
+ transformationPlugin,
+ alias,
+
plugins.pluginVersion(transformation.getClass().getName(),
transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION),
+ plugins.safeLoaderSwapper())
+ );
} else {
- transformations.add(new
TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper()));
+ transformations.add(new TransformationStage<>(
+ transformationPlugin,
+ alias,
+
plugins.pluginVersion(transformation.getClass().getName(),
transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION),
+ plugins.safeLoaderSwapper())
+ );
}
} catch (Exception e) {
throw new ConnectException(e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
index 10ed188cdf8..d704a3374e2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -19,8 +19,12 @@ package org.apache.kafka.connect.runtime;
public class ConnectorStatus extends AbstractStatus<String> {
- public ConnectorStatus(String connector, State state, String msg, String
workerUrl, int generation) {
- super(connector, state, workerUrl, generation, msg);
+ public ConnectorStatus(String connector, State state, String msg, String
workerUrl, int generation, String version) {
+ super(connector, state, workerUrl, generation, msg, version);
+ }
+
+ public ConnectorStatus(String connector, State state, String workerUrl,
int generation, String version) {
+ super(connector, state, workerUrl, generation, null, version);
}
public ConnectorStatus(String connector, State state, String workerUrl,
int generation) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 42e43babe55..d6f4ffbd4b9 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -104,11 +104,12 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
Runnable preProducerCheck,
Runnable postProducerCheck,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+ TaskPluginsMetadata pluginsMetadata,
Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState,
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
buildTransactionContext(sourceConfig),
producer, admin, topicGroups, offsetReader, offsetWriter,
offsetStore, workerConfig, connectMetrics, errorMetrics,
- loader, time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
+ loader, time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper);
this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
new file mode 100644
index 00000000000..14e6cb9b7a7
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TaskPluginsMetadata {
+
+ private final String connectorClass;
+ private final String connectorVersion;
+ private final ConnectorType connectorType;
+ private final String taskClass;
+ private final String taskVersion;
+ private final String keyConverterClass;
+ private final String keyConverterVersion;
+ private final String valueConverterClass;
+ private final String valueConverterVersion;
+ private final String headerConverterClass;
+ private final String headerConverterVersion;
+ private final Set<TransformationStage.AliasedPluginInfo> transformations;
+ private final Set<TransformationStage.AliasedPluginInfo> predicates;
+
+ public TaskPluginsMetadata(
+ Class<? extends Connector> connectorClass,
+ Task task,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ List<TransformationStage.StageInfo> transformationStageInfo,
+ Plugins plugins
+ ) {
+
+ assert connectorClass != null;
+ assert task != null;
+ assert keyConverter != null;
+ assert valueConverter != null;
+ assert headerConverter != null;
+ assert transformationStageInfo != null;
+
+ this.connectorClass = connectorClass.getName();
+ this.connectorVersion =
plugins.pluginVersion(connectorClass.getName(),
connectorClass.getClassLoader(), PluginType.SINK, PluginType.SOURCE);
+ this.connectorType = ConnectorType.from(connectorClass);
+ this.taskClass = task.getClass().getName();
+ this.taskVersion = task.version();
+ this.keyConverterClass = keyConverter.getClass().getName();
+ this.keyConverterVersion =
plugins.pluginVersion(keyConverter.getClass().getName(),
keyConverter.getClass().getClassLoader(), PluginType.CONVERTER);
+ this.valueConverterClass = valueConverter.getClass().getName();
+ this.valueConverterVersion =
plugins.pluginVersion(valueConverter.getClass().getName(),
valueConverter.getClass().getClassLoader(), PluginType.CONVERTER);
+ this.headerConverterClass = headerConverter.getClass().getName();
+ this.headerConverterVersion =
plugins.pluginVersion(headerConverter.getClass().getName(),
headerConverter.getClass().getClassLoader(), PluginType.HEADER_CONVERTER);
+ this.transformations =
transformationStageInfo.stream().map(TransformationStage.StageInfo::transform).collect(Collectors.toSet());
+ this.predicates =
transformationStageInfo.stream().map(TransformationStage.StageInfo::predicate).filter(Objects::nonNull).collect(Collectors.toSet());
+ }
+
+ public String connectorClass() {
+ return connectorClass;
+ }
+
+ public String connectorVersion() {
+ return connectorVersion;
+ }
+
+ public ConnectorType connectorType() {
+ return connectorType;
+ }
+
+ public String taskClass() {
+ return taskClass;
+ }
+
+ public String taskVersion() {
+ return taskVersion;
+ }
+
+ public String keyConverterClass() {
+ return keyConverterClass;
+ }
+
+ public String keyConverterVersion() {
+ return keyConverterVersion;
+ }
+
+ public String valueConverterClass() {
+ return valueConverterClass;
+ }
+
+ public String valueConverterVersion() {
+ return valueConverterVersion;
+ }
+
+ public String headerConverterClass() {
+ return headerConverterClass;
+ }
+
+ public String headerConverterVersion() {
+ return headerConverterVersion;
+ }
+
+ public Set<TransformationStage.AliasedPluginInfo> transformations() {
+ return transformations;
+ }
+
+ public Set<TransformationStage.AliasedPluginInfo> predicates() {
+ return predicates;
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
index e35efcafe2e..45150ef7ef5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -20,8 +20,8 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
- public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int
generation, String trace) {
- super(id, state, workerUrl, generation, trace);
+ public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int
generation, String trace, String version) {
+ super(id, state, workerUrl, generation, trace, version);
}
public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int
generation) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index f6b92697c44..68d52f2c1ca 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
/**
* Represents a chain of {@link Transformation}s to be applied to a {@link
ConnectRecord} serially.
@@ -89,4 +90,8 @@ public class TransformationChain<T, R extends
ConnectRecord<R>> implements AutoC
}
return chain.toString();
}
+
+ public List<TransformationStage.StageInfo> transformationChainInfo() {
+ return
transformationStages.stream().map(TransformationStage::transformationStageInfo).collect(Collectors.toList());
+ }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index a86c4878ab3..56293e03632 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
+import java.util.Objects;
import java.util.function.Function;
/**
@@ -39,18 +40,40 @@ public class TransformationStage<R extends
ConnectRecord<R>> implements AutoClos
private final Plugin<Predicate<R>> predicatePlugin;
private final Plugin<Transformation<R>> transformationPlugin;
private final boolean negate;
+ private final String transformAlias;
+ private final String predicateAlias;
+ private final String transformVersion;
+ private final String predicateVersion;
private final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
- TransformationStage(Plugin<Transformation<R>> transformationPlugin,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
- this(null, false, transformationPlugin, pluginLoaderSwapper);
+ TransformationStage(
+ Plugin<Transformation<R>> transformationPlugin,
+ String transformAlias,
+ String transformVersion,
+ Function<ClassLoader, LoaderSwap> pluginLoaderSwapper
+ ) {
+ this(null, null, null, false, transformationPlugin, transformAlias,
transformVersion, pluginLoaderSwapper);
}
- TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate,
Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader,
LoaderSwap> pluginLoaderSwapper) {
+ TransformationStage(
+ Plugin<Predicate<R>> predicatePlugin,
+ String predicateAlias,
+ String predicateVersion,
+ boolean negate,
+ Plugin<Transformation<R>> transformationPlugin,
+ String transformAlias,
+ String transformVersion,
+ Function<ClassLoader, LoaderSwap> pluginLoaderSwapper
+ ) {
this.predicatePlugin = predicatePlugin;
this.negate = negate;
this.transformationPlugin = transformationPlugin;
this.pluginLoaderSwapper = pluginLoaderSwapper;
+ this.transformAlias = transformAlias;
+ this.predicateAlias = predicateAlias;
+ this.transformVersion = transformVersion;
+ this.predicateVersion = predicateVersion;
}
public Class<? extends Transformation<R>> transformClass() {
@@ -89,4 +112,32 @@ public class TransformationStage<R extends
ConnectRecord<R>> implements AutoClos
", negate=" + negate +
'}';
}
+
+ public record AliasedPluginInfo(String alias, String className, String
version) {
+ public AliasedPluginInfo {
+ Objects.requireNonNull(alias, "alias cannot be null");
+ Objects.requireNonNull(className, "className cannot be null");
+ }
+ }
+
+
+ public record StageInfo(AliasedPluginInfo transform, AliasedPluginInfo
predicate) {
+ public StageInfo {
+ Objects.requireNonNull(transform, "transform cannot be null");
+ }
+ }
+
+
+ public StageInfo transformationStageInfo() {
+ AliasedPluginInfo transformInfo = new AliasedPluginInfo(
+ transformAlias,
+ transformationPlugin.get().getClass().getName(),
+ transformVersion
+ );
+ AliasedPluginInfo predicateInfo = predicatePlugin != null ? new
AliasedPluginInfo(
+ predicateAlias,
+ predicatePlugin.get().getClass().getName(), predicateVersion
+ ) : null;
+ return new StageInfo(transformInfo, predicateInfo);
+ }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c1acc5647e..a3e914d3f90 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -346,7 +346,7 @@ public final class Worker {
}
workerConnector = new WorkerConnector(
connName, connector, connConfig, ctx, metrics,
connectorStatusListener, offsetReader, offsetStore, connectorLoader);
- log.info("Instantiated connector {} with version {} of
type {}", connName, connector.version(), connector.getClass());
+ log.info("Instantiated connector {} with version {} of
type {}", connName, workerConnector.connectorVersion(), connector.getClass());
workerConnector.transitionTo(initialState,
onConnectorStateChange);
}
} catch (Throwable t) {
@@ -562,6 +562,22 @@ public final class Worker {
return workerConnector != null && workerConnector.isRunning();
}
+ public String connectorVersion(String connName) {
+ WorkerConnector conn = connectors.get(connName);
+ if (conn == null) {
+ return null;
+ }
+ return conn.connectorVersion();
+ }
+
+ public String taskVersion(ConnectorTaskId taskId) {
+ WorkerTask<?, ?> task = tasks.get(taskId);
+ if (task == null) {
+ return null;
+ }
+ return task.taskVersion();
+ }
+
/**
* Start a sink task managed by this worker.
*
@@ -714,7 +730,7 @@ public final class Worker {
.withKeyConverterPlugin(metrics.wrap(keyConverter, id,
true))
.withValueConverterPlugin(metrics.wrap(valueConverter,
id, false))
.withHeaderConverterPlugin(metrics.wrap(headerConverter, id))
- .withClassloader(connectorLoader)
+ .withClassLoader(connectorLoader)
.build();
workerTask.initialize(taskConfig);
@@ -1814,11 +1830,12 @@ public final class Worker {
return this;
}
- public TaskBuilder<T, R> withClassloader(ClassLoader classLoader) {
+ public TaskBuilder<T, R> withClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
+
public WorkerTask<T, R> build() {
Objects.requireNonNull(task, "Task cannot be null");
Objects.requireNonNull(connectorConfig, "Connector config used by
task cannot be null");
@@ -1836,10 +1853,13 @@ public final class Worker {
TransformationChain<T, R> transformationChain = new
TransformationChain<>(connectorConfig.<R>transformationStages(plugins, id,
metrics), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
+ TaskPluginsMetadata taskPluginsMetadata = new TaskPluginsMetadata(
+ connectorClass, task, keyConverterPlugin.get(),
valueConverterPlugin.get(), headerConverterPlugin.get(),
transformationChain.transformationChainInfo(), plugins);
+
return doBuild(task, id, configState, statusListener, initialState,
- connectorConfig, keyConverterPlugin, valueConverterPlugin,
headerConverterPlugin, classLoader,
- retryWithToleranceOperator, transformationChain,
- errorHandlingMetrics, connectorClass);
+ connectorConfig, keyConverterPlugin, valueConverterPlugin,
headerConverterPlugin, classLoader,
+ retryWithToleranceOperator, transformationChain,
+ errorHandlingMetrics, connectorClass, taskPluginsMetadata);
}
abstract WorkerTask<T, R> doBuild(
@@ -1856,7 +1876,8 @@ public final class Worker {
RetryWithToleranceOperator<T> retryWithToleranceOperator,
TransformationChain<T, R> transformationChain,
ErrorHandlingMetrics errorHandlingMetrics,
- Class<? extends Connector> connectorClass
+ Class<? extends Connector> connectorClass,
+ TaskPluginsMetadata pluginsMetadata
);
}
@@ -1884,7 +1905,8 @@ public final class Worker {
RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>>
retryWithToleranceOperator,
TransformationChain<ConsumerRecord<byte[], byte[]>,
SinkRecord> transformationChain,
ErrorHandlingMetrics errorHandlingMetrics,
- Class<? extends Connector> connectorClass
+ Class<? extends Connector> connectorClass,
+ TaskPluginsMetadata taskPluginsMetadata
) {
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connectorConfig.originalsStrings());
WorkerErrantRecordReporter workerErrantRecordReporter =
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
@@ -1898,7 +1920,7 @@ public final class Worker {
return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain, consumer, classLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter,
herder.statusBackingStore(),
- () -> sinkTaskReporters(id, sinkConfig,
errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper());
+ () -> sinkTaskReporters(id, sinkConfig,
errorHandlingMetrics, connectorClass), taskPluginsMetadata,
plugins.safeLoaderSwapper());
}
}
@@ -1925,7 +1947,8 @@ public final class Worker {
RetryWithToleranceOperator<SourceRecord>
retryWithToleranceOperator,
TransformationChain<SourceRecord, SourceRecord>
transformationChain,
ErrorHandlingMetrics errorHandlingMetrics,
- Class<? extends Connector> connectorClass
+ Class<? extends Connector> connectorClass,
+ TaskPluginsMetadata pluginsMetadata
) {
SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(),
config.topicCreationEnable());
@@ -1958,7 +1981,7 @@ public final class Worker {
return new WorkerSourceTask(id, (SourceTask) task, statusListener,
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain, producer,
topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config,
configState, metrics, classLoader, time,
- retryWithToleranceOperator, herder.statusBackingStore(),
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics),
plugins.safeLoaderSwapper());
+ retryWithToleranceOperator, herder.statusBackingStore(),
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics),
pluginsMetadata, plugins.safeLoaderSwapper());
}
}
@@ -1992,7 +2015,8 @@ public final class Worker {
RetryWithToleranceOperator<SourceRecord>
retryWithToleranceOperator,
TransformationChain<SourceRecord, SourceRecord>
transformationChain,
ErrorHandlingMetrics errorHandlingMetrics,
- Class<? extends Connector> connectorClass
+ Class<? extends Connector> connectorClass,
+ TaskPluginsMetadata pluginsMetadata
) {
SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(),
config.topicCreationEnable());
@@ -2023,7 +2047,7 @@ public final class Worker {
headerConverterPlugin, transformationChain, producer,
topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config,
configState, metrics, errorHandlingMetrics, classLoader, time,
retryWithToleranceOperator,
herder.statusBackingStore(), sourceConfig, executor,
preProducerCheck, postProducerCheck,
- () -> sourceTaskReporters(id, sourceConfig,
errorHandlingMetrics), plugins.safeLoaderSwapper());
+ () -> sourceTaskReporters(id, sourceConfig,
errorHandlingMetrics), pluginsMetadata, plugins.safeLoaderSwapper());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index e2473dbbf71..3faf70f898c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -78,6 +78,7 @@ public class WorkerConnector implements Runnable {
private volatile Throwable externalFailure;
private volatile boolean stopping; // indicates whether the Worker has
asked the connector to stop
private volatile boolean cancelled; // indicates whether the Worker has
cancelled the connector (e.g. because of slow shutdown)
+ private final String version;
private State state;
private final CloseableOffsetStorageReader offsetStorageReader;
@@ -97,8 +98,9 @@ public class WorkerConnector implements Runnable {
this.loader = loader;
this.ctx = ctx;
this.connector = connector;
+ this.version = connector.version();
this.state = State.INIT;
- this.metrics = new ConnectorMetricsGroup(connectMetrics,
AbstractStatus.State.UNASSIGNED, statusListener);
+ this.metrics = new ConnectorMetricsGroup(connectMetrics,
AbstractStatus.State.UNASSIGNED, this.version, statusListener);
this.statusListener = this.metrics;
this.offsetStorageReader = offsetStorageReader;
this.offsetStore = offsetStore;
@@ -418,6 +420,10 @@ public class WorkerConnector implements Runnable {
return ConnectUtils.isSourceConnector(connector);
}
+ public String connectorVersion() {
+ return version;
+ }
+
protected final String connectorType() {
if (isSinkConnector())
return "sink";
@@ -450,7 +456,12 @@ public class WorkerConnector implements Runnable {
private final MetricGroup metricGroup;
private final ConnectorStatus.Listener delegate;
- public ConnectorMetricsGroup(ConnectMetrics connectMetrics,
AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
+ public ConnectorMetricsGroup(
+ ConnectMetrics connectMetrics,
+ AbstractStatus.State initialState,
+ String connectorVersion,
+ ConnectorStatus.Listener delegate
+ ) {
Objects.requireNonNull(connectMetrics);
Objects.requireNonNull(connector);
Objects.requireNonNull(initialState);
@@ -465,7 +476,7 @@ public class WorkerConnector implements Runnable {
metricGroup.addImmutableValueMetric(registry.connectorType,
connectorType());
metricGroup.addImmutableValueMetric(registry.connectorClass,
connector.getClass().getName());
- metricGroup.addImmutableValueMetric(registry.connectorVersion,
connector.version());
+ metricGroup.addImmutableValueMetric(registry.connectorVersion,
connectorVersion);
metricGroup.addValueMetric(registry.connectorStatus, now ->
state.toString().toLowerCase(Locale.getDefault()));
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 4b8256115ed..14b093c9123 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -105,6 +105,7 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
private boolean committing;
private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;
+ private final String version;
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
@@ -125,9 +126,10 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
WorkerErrantRecordReporter
workerErrantRecordReporter,
StatusBackingStore statusBackingStore,
Supplier<List<ErrorReporter<ConsumerRecord<byte[],
byte[]>>>> errorReportersSupplier,
+ TaskPluginsMetadata pluginsMetadata,
Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics,
errorMetrics,
- retryWithToleranceOperator, transformationChain,
errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper);
+ retryWithToleranceOperator, transformationChain,
errorReportersSupplier, time, statusBackingStore, pluginsMetadata,
pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@@ -153,6 +155,7 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
this.isTopicTrackingEnabled =
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
+ this.version = task.version();
}
@Override
@@ -227,6 +230,11 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
}
}
+ @Override
+ public String taskVersion() {
+ return version;
+ }
+
protected void iteration() {
final long offsetCommitIntervalMs =
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0806e887735..3ccd530be39 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -94,12 +94,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
StatusBackingStore statusBackingStore,
Executor closeExecutor,
Supplier<List<ErrorReporter<SourceRecord>>>
errorReportersSupplier,
+ TaskPluginsMetadata pluginsMetadata,
Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState,
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
null, producer,
admin, topicGroups, offsetReader, offsetWriter, offsetStore,
workerConfig, connectMetrics, errorMetrics, loader,
- time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
+ time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper);
this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index fa28a4e7b0e..1661d710a86 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -41,6 +41,7 @@ import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
@@ -94,9 +95,10 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
Supplier<List<ErrorReporter<T>>> errorReportersSupplier,
Time time,
StatusBackingStore statusBackingStore,
+ TaskPluginsMetadata pluginsMetadata,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
this.id = id;
- this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics,
statusListener);
+ this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics,
statusListener, pluginsMetadata);
this.errorMetrics = errorMetrics;
this.statusListener = taskMetricsGroup;
this.loader = loader;
@@ -196,6 +198,8 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
protected abstract void close();
+ protected abstract String taskVersion();
+
protected boolean isFailed() {
return failed;
}
@@ -397,14 +401,25 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
static class TaskMetricsGroup implements TaskStatus.Listener {
private final TaskStatus.Listener delegateListener;
private final MetricGroup metricGroup;
+ private final List<MetricGroup> transformationGroups = new
ArrayList<>();
+ private final List<MetricGroup> predicateGroups = new ArrayList<>();
private final Time time;
private final StateTracker taskStateTimer;
private final Sensor commitTime;
private final Sensor batchSize;
private final Sensor commitAttempts;
+ private final ConnectMetrics connectMetrics;
+ private final ConnectorTaskId id;
public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics
connectMetrics, TaskStatus.Listener statusListener) {
+ this(id, connectMetrics, statusListener, null);
+ }
+
+ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics
connectMetrics, TaskStatus.Listener statusListener, TaskPluginsMetadata
pluginsMetadata) {
delegateListener = statusListener;
+ this.connectMetrics = connectMetrics;
+ this.id = id;
+
time = connectMetrics.time();
taskStateTimer = new StateTracker();
ConnectMetricsRegistry registry = connectMetrics.registry();
@@ -434,6 +449,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
Frequencies commitFrequencies =
Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
commitAttempts = metricGroup.sensor("offset-commit-completion");
commitAttempts.add(commitFrequencies);
+ addPluginInfoMetric(pluginsMetadata);
}
private void addRatioMetric(final State matchingState,
MetricNameTemplate template) {
@@ -442,8 +458,52 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
taskStateTimer.durationRatio(matchingState, now));
}
+ private void addPluginInfoMetric(TaskPluginsMetadata pluginsMetadata) {
+ if (pluginsMetadata == null) {
+ return;
+ }
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup.addValueMetric(registry.taskConnectorClass, now ->
pluginsMetadata.connectorClass());
+ metricGroup.addValueMetric(registry.taskConnectorClassVersion, now
-> pluginsMetadata.connectorVersion());
+ metricGroup.addValueMetric(registry.taskConnectorType, now ->
pluginsMetadata.connectorType());
+ metricGroup.addValueMetric(registry.taskClass, now ->
pluginsMetadata.taskClass());
+ metricGroup.addValueMetric(registry.taskVersion, now ->
pluginsMetadata.taskVersion());
+ metricGroup.addValueMetric(registry.taskKeyConverterClass, now ->
pluginsMetadata.keyConverterClass());
+ metricGroup.addValueMetric(registry.taskKeyConverterVersion, now
-> pluginsMetadata.keyConverterVersion());
+ metricGroup.addValueMetric(registry.taskValueConverterClass, now
-> pluginsMetadata.valueConverterClass());
+ metricGroup.addValueMetric(registry.taskValueConverterVersion, now
-> pluginsMetadata.valueConverterVersion());
+ metricGroup.addValueMetric(registry.taskHeaderConverterClass, now
-> pluginsMetadata.headerConverterClass());
+ metricGroup.addValueMetric(registry.taskHeaderConverterVersion,
now -> pluginsMetadata.headerConverterVersion());
+
+ if (!pluginsMetadata.transformations().isEmpty()) {
+ for (TransformationStage.AliasedPluginInfo entry :
pluginsMetadata.transformations()) {
+ MetricGroup transformationGroup =
connectMetrics.group(registry.transformsGroupName(),
+ registry.connectorTagName(), id.connector(),
+ registry.taskTagName(),
Integer.toString(id.task()),
+ registry.transformsTagName(), entry.alias());
+
transformationGroup.addValueMetric(registry.transformClass, now ->
entry.className());
+
transformationGroup.addValueMetric(registry.transformVersion, now ->
entry.version());
+ this.transformationGroups.add(transformationGroup);
+ }
+ }
+
+ if (!pluginsMetadata.predicates().isEmpty()) {
+ for (TransformationStage.AliasedPluginInfo entry :
pluginsMetadata.predicates()) {
+ MetricGroup predicateGroup =
connectMetrics.group(registry.predicatesGroupName(),
+ registry.connectorTagName(), id.connector(),
+ registry.taskTagName(),
Integer.toString(id.task()),
+ registry.predicateTagName(), entry.alias());
+ predicateGroup.addValueMetric(registry.predicateClass, now
-> entry.className());
+ predicateGroup.addValueMetric(registry.predicateVersion,
now -> entry.version());
+ this.predicateGroups.add(predicateGroup);
+ }
+ }
+ }
+
void close() {
metricGroup.close();
+ transformationGroups.forEach(MetricGroup::close);
+ predicateGroups.forEach(MetricGroup::close);
}
void recordCommit(long duration, boolean success) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index 82d9957b40d..8d94d562045 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -65,11 +65,13 @@ public class ConnectorStateInfo {
private final String state;
private final String trace;
private final String workerId;
+ private final String version;
- public AbstractState(String state, String workerId, String trace) {
+ public AbstractState(String state, String workerId, String trace,
String version) {
this.state = state;
this.workerId = workerId;
this.trace = trace;
+ this.version = version;
}
@JsonProperty
@@ -87,14 +89,22 @@ public class ConnectorStateInfo {
public String trace() {
return trace;
}
+
+ @JsonProperty
+ @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter =
PluginInfo.NoVersionFilter.class)
+ public String version() {
+ return version;
+ }
}
public static class ConnectorState extends AbstractState {
+
@JsonCreator
public ConnectorState(@JsonProperty("state") String state,
@JsonProperty("worker_id") String worker,
- @JsonProperty("msg") String msg) {
- super(state, worker, msg);
+ @JsonProperty("msg") String msg,
+ @JsonProperty("version") String version) {
+ super(state, worker, msg, version);
}
}
@@ -105,8 +115,9 @@ public class ConnectorStateInfo {
public TaskState(@JsonProperty("id") int id,
@JsonProperty("state") String state,
@JsonProperty("worker_id") String worker,
- @JsonProperty("msg") String msg) {
- super(state, worker, msg);
+ @JsonProperty("msg") String msg,
+ @JsonProperty("version") String version) {
+ super(state, worker, msg, version);
this.id = id;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 0a9e3837006..b9382399cf4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -101,6 +101,7 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
public static final String TRACE_KEY_NAME = "trace";
public static final String WORKER_ID_KEY_NAME = "worker_id";
public static final String GENERATION_KEY_NAME = "generation";
+ public static final String VERSION_KEY_NAME = "version";
public static final String TOPIC_STATE_KEY = "topic";
public static final String TOPIC_NAME_KEY = "name";
@@ -113,6 +114,7 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
.field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
.field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
.field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+ .field(VERSION_KEY_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.build();
private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 =
SchemaBuilder.struct()
@@ -428,7 +430,8 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long)
statusMap.get(GENERATION_KEY_NAME)).intValue();
- return new ConnectorStatus(connector, state, trace, workerUrl,
generation);
+ String version = (String) statusMap.get(VERSION_KEY_NAME);
+ return new ConnectorStatus(connector, state, trace, workerUrl,
generation, version);
} catch (Exception e) {
log.error("Failed to deserialize connector status", e);
return null;
@@ -448,7 +451,8 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long)
statusMap.get(GENERATION_KEY_NAME)).intValue();
- return new TaskStatus(taskId, state, workerUrl, generation, trace);
+ String version = (String) statusMap.get(VERSION_KEY_NAME);
+ return new TaskStatus(taskId, state, workerUrl, generation, trace,
version);
} catch (Exception e) {
log.error("Failed to deserialize task status", e);
return null;
@@ -487,6 +491,7 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
struct.put(TRACE_KEY_NAME, status.trace());
struct.put(WORKER_ID_KEY_NAME, status.workerId());
struct.put(GENERATION_KEY_NAME, status.generation());
+ struct.put(VERSION_KEY_NAME, status.version());
return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0,
struct);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 9ad8690ca1c..97b8edf7605 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -964,7 +964,7 @@ public class AbstractWorkerSourceTaskTest {
taskId, sourceTask, statusListener, TargetState.STARTED,
configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
workerTransactionContext, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
config, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
- statusBackingStore, Runnable::run, errorReportersSupplier,
TestPlugins.noOpLoaderSwap()) {
+ statusBackingStore, Runnable::run, errorReportersSupplier,
null, TestPlugins.noOpLoaderSwap()) {
@Override
protected void prepareToInitializeTask() {
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 70edfb0f598..340585f0fd6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -426,9 +426,9 @@ public class ErrorHandlingTaskTest {
oo.put("schemas.enable", "false");
converter.configure(oo);
- Plugin<Transformation<SinkRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "");
+ Plugin<Transformation<SinkRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "test");
TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord>
sinkTransforms =
- new TransformationChain<>(singletonList(new
TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())),
retryWithToleranceOperator);
+ new TransformationChain<>(singletonList(new
TransformationStage<>(transformationPlugin, "test", null,
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId,
true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter,
taskId, false);
@@ -438,7 +438,7 @@ public class ErrorHandlingTaskTest {
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, sinkTransforms, consumer, pluginLoader,
time,
retryWithToleranceOperator, workerErrantRecordReporter,
- statusBackingStore, () -> errorReporters,
TestPlugins.noOpLoaderSwap());
+ statusBackingStore, () -> errorReporters, null,
TestPlugins.noOpLoaderSwap());
}
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
List<ErrorReporter<SourceRecord>> errorReporters) {
@@ -462,9 +462,9 @@ public class ErrorHandlingTaskTest {
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
List<ErrorReporter<SourceRecord>>
errorReporters, Converter converter) {
- Plugin<Transformation<SourceRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "");
+ Plugin<Transformation<SourceRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "test");
TransformationChain<SourceRecord, SourceRecord> sourceTransforms = new
TransformationChain<>(singletonList(
- new TransformationStage<>(transformationPlugin,
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
+ new TransformationStage<>(transformationPlugin, "test", null,
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId,
true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter,
taskId, false);
@@ -477,7 +477,7 @@ public class ErrorHandlingTaskTest {
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
- statusBackingStore, Runnable::run, () -> errorReporters,
TestPlugins.noOpLoaderSwap()));
+ statusBackingStore, Runnable::run, () -> errorReporters, null,
TestPlugins.noOpLoaderSwap()));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index a6375398d29..f45ec27b46e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -198,6 +198,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
Thread.sleep(10);
return result;
});
+ when(sourceTask.version()).thenReturn(null);
}
@AfterEach
@@ -222,8 +223,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
}
verify(statusBackingStore, MockitoUtils.anyTimes()).getTopic(any(),
any());
-
verify(offsetStore, MockitoUtils.anyTimes()).primaryOffsetsTopic();
+ verify(sourceTask).version();
verifyNoMoreInteractions(statusListener, producer, sourceTask, admin,
offsetWriter, statusBackingStore, offsetStore, preProducerCheck,
postProducerCheck);
if (metrics != null) metrics.stop();
@@ -284,7 +285,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask,
statusListener, initialState, keyConverterPlugin, valueConverterPlugin,
headerConverterPlugin,
transformationChain, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), time,
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
- sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap());
+ sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck, Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
index 8d6f54ce258..d0f3f974c63 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
@@ -35,17 +35,15 @@ public class RestartPlanTest {
@Test
public void testRestartPlan() {
ConnectorStateInfo.ConnectorState state = new
ConnectorStateInfo.ConnectorState(
- AbstractStatus.State.RESTARTING.name(),
- "foo",
- null
+ AbstractStatus.State.RESTARTING.name(), "foo", null, null
);
List<TaskState> tasks = new ArrayList<>();
- tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null));
- tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null));
- tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(),
"worker1", null));
- tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(),
"worker1", null));
- tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(),
"worker1", null));
- tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(),
"worker1", null));
+ tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null, null));
+ tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null, null));
+ tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(),
"worker1", null, null));
+ tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(),
"worker1", null, null));
+ tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(),
"worker1", null, null));
+ tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(),
"worker1", null, null));
ConnectorStateInfo connectorStateInfo = new
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
@@ -61,13 +59,11 @@ public class RestartPlanTest {
@Test
public void testNoRestartsPlan() {
ConnectorStateInfo.ConnectorState state = new
ConnectorStateInfo.ConnectorState(
- AbstractStatus.State.RUNNING.name(),
- "foo",
- null
+ AbstractStatus.State.RUNNING.name(), "foo", null, null
);
List<TaskState> tasks = new ArrayList<>();
- tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null));
- tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null));
+ tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null, null));
+ tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null, null));
ConnectorStateInfo connectorStateInfo = new
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
RestartPlan restartPlan = new RestartPlan(restartRequest,
connectorStateInfo);
@@ -81,13 +77,11 @@ public class RestartPlanTest {
@Test
public void testRestartsOnlyConnector() {
ConnectorStateInfo.ConnectorState state = new
ConnectorStateInfo.ConnectorState(
- AbstractStatus.State.RESTARTING.name(),
- "foo",
- null
+ AbstractStatus.State.RESTARTING.name(), "foo", null, null
);
List<TaskState> tasks = new ArrayList<>();
- tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null));
- tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null));
+ tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(),
"worker1", null, null));
+ tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(),
"worker1", null, null));
ConnectorStateInfo connectorStateInfo = new
ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE);
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
RestartPlan restartPlan = new RestartPlan(restartRequest,
connectorStateInfo);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index e2791a63f7b..959c5d2ac01 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -61,8 +61,12 @@ public class TransformationStageTest {
}
TransformationStage<SourceRecord> stage = new TransformationStage<>(
predicatePlugin,
+ "testPredicate",
+ null,
negate,
transformationPlugin,
+ "testTransformation",
+ null,
TestPlugins.noOpLoaderSwap()
);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 2607ee8b03b..539960badec 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -229,7 +229,7 @@ public class WorkerSinkTaskTest {
taskId, task, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, connectMetrics,
keyConverterPlugin, valueConverterPlugin, errorMetrics,
headerConverterPlugin,
transformationChain, consumer, loader, time,
- retryWithToleranceOperator, null, statusBackingStore,
errorReportersSupplier, TestPlugins.noOpLoaderSwap());
+ retryWithToleranceOperator, null, statusBackingStore,
errorReportersSupplier, null, TestPlugins.noOpLoaderSwap());
}
@AfterEach
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 74021118098..6c2c593c35b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -183,7 +183,7 @@ public class WorkerSinkTaskThreadedTest {
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain,
consumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
- Collections::emptyList, TestPlugins.noOpLoaderSwap());
+ Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
recordsReturned = 0;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 23fb3618f81..d2fd923fdb9 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -255,7 +255,7 @@ public class WorkerSourceTaskTest {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener,
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin,
transformationChain, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, config,
clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
- retryWithToleranceOperator, statusBackingStore, Runnable::run,
Collections::emptyList, TestPlugins.noOpLoaderSwap());
+ retryWithToleranceOperator, statusBackingStore, Runnable::run,
Collections::emptyList, null, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index eae9c96998b..fa445454fd0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -300,7 +300,7 @@ public class WorkerTaskTest {
Supplier<List<ErrorReporter<Object>>>
errorReporterSupplier,
Time time, StatusBackingStore
statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics,
errorHandlingMetrics,
- retryWithToleranceOperator, transformationChain,
errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap());
+ retryWithToleranceOperator, transformationChain,
errorReporterSupplier, time, statusBackingStore, null,
TestPlugins.noOpLoaderSwap());
}
@Override
@@ -318,6 +318,11 @@ public class WorkerTaskTest {
@Override
protected void close() {
}
+
+ @Override
+ protected String taskVersion() {
+ return null;
+ }
}
protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 71dcabbedb6..38bcab1b594 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -805,11 +806,11 @@ public class WorkerTest {
// Each time we check the task metrics, the worker will call the herder
when(herder.taskStatus(TASK_ID)).thenReturn(
- new ConnectorStateInfo.TaskState(0, "RUNNING", "worker",
"msg"),
- new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"),
- new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"),
- new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker",
"msg"),
- new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker",
"msg")
+ new ConnectorStateInfo.TaskState(0, "RUNNING", "worker",
"msg", null),
+ new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg",
null),
+ new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg",
null),
+ new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker",
"msg", null),
+ new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker",
"msg", null)
);
worker = new Worker(WORKER_ID,
@@ -3072,6 +3073,7 @@ public class WorkerTest {
when(plugins.pluginLoader(connectorClass.getName(),
range)).thenReturn(pluginLoader);
when(plugins.connectorClass(connectorClass.getName(),
range)).thenReturn((Class) connectorClass);
when(plugins.newTask(taskClass)).thenReturn(task);
+
when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap());
when(task.version()).thenReturn(range == null ? "unknown" :
range.toString());
}
@@ -3087,7 +3089,7 @@ public class WorkerTest {
verify(plugins).pluginLoader(connectorClass.getName(), range);
verify(plugins).connectorClass(connectorClass.getName(), range);
verify(plugins).newTask(taskClass);
- verify(task).version();
+ verify(task, times(2)).version();
}
private void mockExecutorRealSubmit(Class<? extends Runnable>
runnableClass) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 3574137b6c3..3e1e75480b5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -195,8 +195,12 @@ public class WorkerTestUtils {
when(transformationPlugin.get()).thenReturn(transformation);
TransformationStage<R> stage = new TransformationStage<>(
predicatePlugin,
+ "testPredicate",
+ null,
false,
transformationPlugin,
+ "testTransformation",
+ null,
TestPlugins.noOpLoaderSwap());
TransformationChain<T, R> realTransformationChainRetriableException =
new TransformationChain<>(List.of(stage), toleranceOperator);
return Mockito.spy(realTransformationChainRetriableException);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 73eaf93961e..18589d66855 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1348,6 +1348,7 @@ public class DistributedHerderTest {
return true;
}).when(worker).startConnector(eq(CONN1), any(), any(), eq(herder),
any(), stateCallback.capture());
doNothing().when(member).wakeup();
+ when(worker.connectorVersion(any())).thenReturn(null);
herder.doRestartConnectorAndTasks(restartRequest);
@@ -1378,6 +1379,7 @@ public class DistributedHerderTest {
doNothing().when(statusBackingStore).put(eq(status));
when(worker.startSourceTask(eq(TASK0), any(), any(), any(),
eq(herder), any())).thenReturn(true);
+ when(worker.taskVersion(any())).thenReturn(null);
herder.doRestartConnectorAndTasks(restartRequest);
@@ -1419,6 +1421,8 @@ public class DistributedHerderTest {
doNothing().when(statusBackingStore).put(eq(taskStatus));
when(worker.startSourceTask(eq(TASK0), any(), any(), any(),
eq(herder), any())).thenReturn(true);
+ when(worker.taskVersion(any())).thenReturn(null);
+ when(worker.connectorVersion(any())).thenReturn(null);
herder.doRestartConnectorAndTasks(restartRequest);
@@ -1670,6 +1674,7 @@ public class DistributedHerderTest {
when(member.memberId()).thenReturn("member");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
when(worker.isSinkConnector(CONN1)).thenReturn(Boolean.TRUE);
+ when(worker.connectorVersion(CONN1)).thenReturn(null);
WorkerConfigTransformer configTransformer =
mock(WorkerConfigTransformer.class);
// join
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 9dfead77220..e38cd2da60d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -688,9 +688,7 @@ public class ConnectorsResourceTest {
@Test
public void testRestartConnectorAndTasksRequestAccepted() throws Throwable
{
ConnectorStateInfo.ConnectorState state = new
ConnectorStateInfo.ConnectorState(
- AbstractStatus.State.RESTARTING.name(),
- "foo",
- null
+ AbstractStatus.State.RESTARTING.name(), "foo", null, null
);
ConnectorStateInfo connectorStateInfo = new
ConnectorStateInfo(CONNECTOR_NAME, state, Collections.emptyList(),
ConnectorType.SOURCE);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 9e893e79eba..76b25131cc5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -280,6 +280,7 @@ public class StandaloneHerderTest {
expectConfigValidation(SourceSink.SOURCE, config);
when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList());
+ when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null);
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
@@ -533,6 +534,7 @@ public class StandaloneHerderTest {
expectConfigValidation(SourceSink.SINK, connectorConfig);
doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+ when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null);
mockStartConnector(connectorConfig, null, TargetState.STARTED, null);
@@ -563,6 +565,7 @@ public class StandaloneHerderTest {
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
expectAdd(SourceSink.SINK);
+ when(worker.taskVersion(any())).thenReturn(null);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
expectConfigValidation(SourceSink.SINK, connectorConfig);
@@ -616,6 +619,8 @@ public class StandaloneHerderTest {
ArgumentCaptor<TaskStatus> taskStatus =
ArgumentCaptor.forClass(TaskStatus.class);
expectAdd(SourceSink.SINK, false);
+ when(worker.connectorVersion(any())).thenReturn(null);
+ when(worker.taskVersion(any())).thenReturn(null);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
expectConfigValidation(SourceSink.SINK, connectorConfig);
@@ -1124,6 +1129,7 @@ public class StandaloneHerderTest {
}
when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
+
if (sourceSink == SourceSink.SOURCE) {
when(worker.isTopicCreationEnabled()).thenReturn(true);
}
@@ -1152,6 +1158,7 @@ public class StandaloneHerderTest {
transformer);
if (sourceSink.equals(SourceSink.SOURCE) && mockStartSourceTask) {
+ when(worker.taskVersion(any())).thenReturn(null);
when(worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME,
0), configState, connectorConfig(sourceSink), generatedTaskProps, herder,
TargetState.STARTED)).thenReturn(true);
}