This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new da8b9da KAFKA-13719: Fix connector restart cause duplicate tasks
(#11869)
da8b9da is described below
commit da8b9dac586408f852f092ecbd0aae94589a6324
Author: sunshujie1990 <[email protected]>
AuthorDate: Wed Mar 30 20:58:58 2022 +0800
KAFKA-13719: Fix connector restart cause duplicate tasks (#11869)
Reviewers: Mickael Maison <[email protected]>, Luke Chen
<[email protected]>, Chris Egerton <[email protected]>
Co-authored-by: Chris Egerton <[email protected]>
---
.../connect/runtime/distributed/DistributedHerder.java | 6 +++---
.../runtime/distributed/DistributedHerderTest.java | 17 +++++++++--------
2 files changed, 12 insertions(+), 11 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5aa327e..e024879 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1180,8 +1180,8 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
if (restartTasks) {
- log.debug("Restarting {} of {} tasks for {}",
plan.restartTaskCount(), plan.totalTaskCount(), request);
- plan.taskIdsToRestart().forEach(taskId -> {
+ log.debug("Restarting {} of {} tasks for {}",
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
+ assignedIdsToRestart.forEach(taskId -> {
try {
if (startTask(taskId)) {
log.info("Task '{}' restart successful", taskId);
@@ -1192,7 +1192,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
log.error("Task '{}' restart failed", taskId, t);
}
});
- log.debug("Restarted {} of {} tasks for {} as requested",
plan.restartTaskCount(), plan.totalTaskCount(), request);
+ log.debug("Restarted {} of {} tasks for {} as requested",
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
}
log.info("Completed {}", plan);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 245bb75..36f3f80 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1289,25 +1289,26 @@ public class DistributedHerderTest {
}
@Test
- public void testDoRestartConnectorAndTasksOnlyTasks() throws Exception {
- ConnectorTaskId taskId = new ConnectorTaskId(CONN1, 0);
+ public void testDoRestartConnectorAndTasksOnlyTasks() {
RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
- EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
+ // The connector has three tasks
+
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0,
TASK1, TASK2)).anyTimes();
+
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(3).anyTimes();
+ EasyMock.expect(restartPlan.totalTaskCount()).andReturn(3).anyTimes();
EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
herder.assignment = PowerMock.createMock(ExtendedAssignment.class);
EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes();
-
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes();
+ // But only one task is assigned to this worker
+
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();
- worker.stopAndAwaitTasks(Collections.singletonList(taskId));
+ worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
PowerMock.expectLastCall();
- herder.onRestart(taskId);
+ herder.onRestart(TASK0);
EasyMock.expectLastCall();
worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),