C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotFoundException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsNotLeader() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotLeaderException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertEquals("The offsets for this connector have been altered 
successfully", msg.message());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws 
Exception {
+        // Setup herder with exactly-once support for source connectors enabled
+        herder = exactlyOnceHerder();
+        rebalanceListener = herder.new RebalanceListener(time);
+        PowerMock.expectPrivate(herder, 
"updateDeletedConnectorStatus").andVoid().anyTimes();
+        PowerMock.expectPrivate(herder, 
"updateDeletedTaskStatus").andVoid().anyTimes();
+
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall().anyTimes();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+        member.ensureActive();
+        PowerMock.expectLastCall().anyTimes();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes();
+
+        // Expect a round of zombie fencing to occur
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        KafkaFuture<Void> workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        KafkaFuture<Void> herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        EasyMock.expect(worker.fenceZombies(CONN1, 
SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), 
CONN1_CONFIG)).andReturn(workerFencingFuture);
+        
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void,
 Void>>anyObject())).andReturn(herderFencingFuture);
+
+        // Two fencing callbacks are added - one is in ZombieFencing::start 
itself to remove the connector from the active
+        // fencing list. The other is the callback passed from 
DistributedHerder::alterConnectorOffsets in order to
+        // queue up the actual alter offsets request if the zombie fencing 
succeeds.
+        for (int i = 0; i < 2; i++) {
+            Capture<KafkaFuture.BiConsumer<Void, Throwable>> 
herderFencingCallback = EasyMock.newCapture();
+            
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(()
 -> {
+                herderFencingCallback.getValue().accept(null, null);
+                return null;
+            });
+        }
+
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        // Handle the second alter connector offsets request. No zombie 
fencing request to the worker is expected now since we
+        // already did a round of zombie fencing last time and no new tasks 
came up in the meanwhile. The config snapshot is
+        // refreshed once at the beginning of the 
DistributedHerder::alterConnectorOffsets method, once before checking
+        // whether zombie fencing is required, and once before actually 
proceeding to alter connector offsets.
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        Capture<Callback<Message>> workerCallbackCapture2 = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture2));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture2.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        // Process the zombie fencing request
+        herder.tick();
+        // Process the alter offsets request
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        FutureCallback<Message> callback2 = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback2);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws 
Exception {

Review Comment:
   Thanks, LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to