This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new f2920b2 KAFKA-13719: Fix connector restart cause duplicate tasks
(#11869)
f2920b2 is described below
commit f2920b2477d77fd52d3d828012c600e4c2ed20a5
Author: sunshujie1990 <[email protected]>
AuthorDate: Wed Mar 30 20:58:58 2022 +0800
KAFKA-13719: Fix connector restart cause duplicate tasks (#11869)
Reviewers: Mickael Maison <[email protected]>, Luke Chen
<[email protected]>, Chris Egerton <[email protected]>
Co-authored-by: Chris Egerton <[email protected]>
---
.../connect/runtime/distributed/DistributedHerder.java | 6 +++---
.../runtime/distributed/DistributedHerderTest.java | 15 ++++++++-------
2 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 357796c..65a8e7e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1180,8 +1180,8 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
if (restartTasks) {
- log.debug("Restarting {} of {} tasks for {}",
plan.restartTaskCount(), plan.totalTaskCount(), request);
- plan.taskIdsToRestart().forEach(taskId -> {
+ log.debug("Restarting {} of {} tasks for {}",
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
+ assignedIdsToRestart.forEach(taskId -> {
try {
if (startTask(taskId)) {
log.info("Task '{}' restart successful", taskId);
@@ -1192,7 +1192,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
log.error("Task '{}' restart failed", taskId, t);
}
});
- log.debug("Restarted {} of {} tasks for {} as requested",
plan.restartTaskCount(), plan.totalTaskCount(), request);
+ log.debug("Restarted {} of {} tasks for {} as requested",
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
}
log.info("Completed {}", plan);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 6ddf047..996c840 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1289,24 +1289,25 @@ public class DistributedHerderTest {
@Test
public void testDoRestartConnectorAndTasksOnlyTasks() {
- ConnectorTaskId taskId = new ConnectorTaskId(CONN1, 0);
RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
- EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
+ // The connector has three tasks
+
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0,
TASK1, TASK2)).anyTimes();
+
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(3).anyTimes();
+ EasyMock.expect(restartPlan.totalTaskCount()).andReturn(3).anyTimes();
EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
herder.assignment = PowerMock.createMock(ExtendedAssignment.class);
EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes();
-
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes();
+ // But only one task is assigned to this worker
+
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();
- worker.stopAndAwaitTasks(Collections.singletonList(taskId));
+ worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
PowerMock.expectLastCall();
- herder.onRestart(taskId);
+ herder.onRestart(TASK0);
EasyMock.expectLastCall();
worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),