This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new f2920b2  KAFKA-13719: Fix connector restart cause duplicate tasks 
(#11869)
f2920b2 is described below

commit f2920b2477d77fd52d3d828012c600e4c2ed20a5
Author: sunshujie1990 <[email protected]>
AuthorDate: Wed Mar 30 20:58:58 2022 +0800

    KAFKA-13719: Fix connector restart cause duplicate tasks (#11869)
    
    
    Reviewers: Mickael Maison <[email protected]>, Luke Chen 
<[email protected]>, Chris Egerton <[email protected]>
    Co-authored-by: Chris Egerton <[email protected]>
---
 .../connect/runtime/distributed/DistributedHerder.java    |  6 +++---
 .../runtime/distributed/DistributedHerderTest.java        | 15 ++++++++-------
 2 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 357796c..65a8e7e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1180,8 +1180,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             }
         }
         if (restartTasks) {
-            log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
-            plan.taskIdsToRestart().forEach(taskId -> {
+            log.debug("Restarting {} of {} tasks for {}", 
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
+            assignedIdsToRestart.forEach(taskId -> {
                 try {
                     if (startTask(taskId)) {
                         log.info("Task '{}' restart successful", taskId);
@@ -1192,7 +1192,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                     log.error("Task '{}' restart failed", taskId, t);
                 }
             });
-            log.debug("Restarted {} of {} tasks for {} as requested", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+            log.debug("Restarted {} of {} tasks for {} as requested", 
assignedIdsToRestart.size(), plan.totalTaskCount(), request);
         }
         log.info("Completed {}", plan);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 6ddf047..996c840 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1289,24 +1289,25 @@ public class DistributedHerderTest {
 
     @Test
     public void testDoRestartConnectorAndTasksOnlyTasks() {
-        ConnectorTaskId taskId = new ConnectorTaskId(CONN1, 0);
         RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
         RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
         
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
         
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-        
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
-        EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
+        // The connector has three tasks
+        
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0, 
TASK1, TASK2)).anyTimes();
+        
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(3).anyTimes();
+        EasyMock.expect(restartPlan.totalTaskCount()).andReturn(3).anyTimes();
         
EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
 
         herder.assignment = PowerMock.createMock(ExtendedAssignment.class);
         
EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes();
-        
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes();
+        // But only one task is assigned to this worker
+        
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();
 
-        worker.stopAndAwaitTasks(Collections.singletonList(taskId));
+        worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
         PowerMock.expectLastCall();
 
-        herder.onRestart(taskId);
+        herder.onRestart(TASK0);
         EasyMock.expectLastCall();
 
         worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyObject(),

Reply via email to