C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsNotLeader() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotLeaderException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + Message msg = new Message("The offsets for this connector have been altered successfully"); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals("The offsets for this connector have been altered successfully", msg.message()); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { + // Setup herder with exactly-once support for source connectors enabled + herder = exactlyOnceHerder(); + rebalanceListener = herder.new RebalanceListener(time); + PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); + PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); + + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall().anyTimes(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall().anyTimes(); + member.ensureActive(); + PowerMock.expectLastCall().anyTimes(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes(); + + // Expect a round of zombie fencing to occur + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + KafkaFuture<Void> workerFencingFuture = EasyMock.mock(KafkaFuture.class); + KafkaFuture<Void> herderFencingFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), CONN1_CONFIG)).andReturn(workerFencingFuture); + EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); + + // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active + // fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to + // queue up the actual alter offsets request if the zombie fencing succeeds. + for (int i = 0; i < 2; i++) { + Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); + EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { + herderFencingCallback.getValue().accept(null, null); + return null; + }); + } + + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + Message msg = new Message("The offsets for this connector have been altered successfully"); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we + // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is + // refreshed once at the beginning of the DistributedHerder::alterConnectorOffsets method, once before checking + // whether zombie fencing is required, and once before actually proceeding to alter connector offsets. + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture2.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(workerFencingFuture, herderFencingFuture); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + // Process the zombie fencing request + herder.tick(); + // Process the alter offsets request + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + FutureCallback<Message> callback2 = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback2); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { Review Comment: Thanks, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org