C0urante commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1054700417
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1658,11 +1659,20 @@ private void backoff(long ms) { backoffRetries = BACKOFF_RETRIES; } - private void startAndStop(Collection<Callable<Void>> callables) { + // Visible for testing + void startAndStop(Collection<Callable<Void>> callables) { try { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore + } catch (RejectedExecutionException e) { + // Shutting down. Just log the exception + if (stopping.get()) { + log.debug("RejectedExecutionException thrown while herder is shutting down. This could be " + + "because startAndStopExecutor is either already shutdown or is full."); Review Comment: The [Executors::newFixedThreadPool Javadocs](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-java.util.concurrent.ThreadFactory-) state that the executor operates off of an "unbounded queue", so it's misleading to state that this error could arise because the executor is full. It would also probably help to be clear to users that this is not a sign that something's wrong with the worker. ```suggestion log.debug("Ignoring RejectedExecutionException thrown while starting/stopping connectors/tasks en masse " + "as the herder is already in the process of shutting down. This is not indicative of a problem and is normal behavior"); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -271,6 +275,9 @@ public DistributedHerder(DistributedConfig config, this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); + // Timeout for herderExecutor to gracefully terminate is set to a value to accommodate + // reading to the end of the config topic + successfully attempting to stop all connectors and tasks and a buffer of 10s + this.herderExecutorTerminationTimeoutMs = this.workerSyncTimeoutMs + this.workerTasksShutdownTimeoutMs + Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS + 10000; Review Comment: Do these have to be stored as instance variables? It clutters up the class a bit to keep having so many private fields that are only used once. Maybe we could remove the `workerTasksShutdownTimeoutMs` and `herderExecutorTerminationTimeoutMs` fields, and inline the logic responsible for computing them and move it into `stop`? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() { PowerMock.verifyAll(); } + @Test(expected = RejectedExecutionException.class) + @SuppressWarnings("unchecked") + public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() throws InterruptedException { + ExecutorService startAndStopExecutor = EasyMock.mock(ExecutorService.class); + herder.startAndStopExecutor = startAndStopExecutor; + + Callable<Void> connectorStartingCallable = () -> null; + + EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new RejectedExecutionException()); + + PowerMock.replayAll(startAndStopExecutor); + + herder.startAndStop(Collections.singletonList(connectorStartingCallable)); + + } + + @Test + public void shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws TimeoutException { + connectProtocolVersion = CONNECT_PROTOCOL_V1; + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); + final int rebalanceDelayMs = 20000; + + // Assign the connector to this worker, and have it start + expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); + + member.wakeup(); + PowerMock.expectLastCall(); + member.requestRejoin(); + PowerMock.expectLastCall(); + member.maybeLeaveGroup(anyString()); + PowerMock.expectLastCall(); + worker.stopAndAwaitConnectors(); + PowerMock.expectLastCall(); + worker.stopAndAwaitTasks(); + PowerMock.expectLastCall(); + member.stop(); + PowerMock.expectLastCall(); + configBackingStore.stop(); + PowerMock.expectLastCall(); + statusBackingStore.stop(); + PowerMock.expectLastCall(); + worker.stop(); + PowerMock.expectLastCall(); Review Comment: None of these calls are actually taking place; this becomes apparent when adding `PowerMock::verifyAll` to the end of the test. ```suggestion ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() { PowerMock.verifyAll(); } + @Test(expected = RejectedExecutionException.class) + @SuppressWarnings("unchecked") + public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() throws InterruptedException { + ExecutorService startAndStopExecutor = EasyMock.mock(ExecutorService.class); + herder.startAndStopExecutor = startAndStopExecutor; + + Callable<Void> connectorStartingCallable = () -> null; + + EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new RejectedExecutionException()); + + PowerMock.replayAll(startAndStopExecutor); + + herder.startAndStop(Collections.singletonList(connectorStartingCallable)); + + } Review Comment: I really like that we've added test coverage for this, but I think we can get better guarantees by testing with `Herder::tick` and using the herder's actual `startAndStopExecutor`, instead of mocking out the executor and calling `startAndStop` directly: ```suggestion @Test public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() { EasyMock.expect(member.memberId()).andStubReturn("leader"); expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); PowerMock.replayAll(); herder.startAndStopExecutor.shutdown(); assertThrows(RejectedExecutionException.class, herder::tick); PowerMock.verifyAll(); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() { PowerMock.verifyAll(); } + @Test(expected = RejectedExecutionException.class) + @SuppressWarnings("unchecked") + public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() throws InterruptedException { + ExecutorService startAndStopExecutor = EasyMock.mock(ExecutorService.class); + herder.startAndStopExecutor = startAndStopExecutor; + + Callable<Void> connectorStartingCallable = () -> null; + + EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new RejectedExecutionException()); + + PowerMock.replayAll(startAndStopExecutor); + + herder.startAndStop(Collections.singletonList(connectorStartingCallable)); + + } + + @Test + public void shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws TimeoutException { Review Comment: This isn't really testing what it claims to be testing. What I'd expect from at test like this is verification that even if the herder's tick thread is blocked during a read of the config topic, `Herder::stop` can and will proceed gracefully after any graceful shutdown timeouts have expired. In reality, what's happening is that `Herder::tick` is called once, returns immediately (gracefully handling a `TimeoutException` while attempting to read to the end of the config topic), and then `Herder::stop` is called afterward without any other simultaneous herder activity (like a blocked read to the end of the config topic) taking place. TBH, although it'd be nice to have coverage for a case like this, I don't think it's strictly necessary. We could theoretically use the `startBackgroundHerder` utility method to have `tick` continuously called in the background, set up a block while reading the config topic that never returns (at least until the rest of the test is complete), and then invoke `Herder::stop` on the main test thread, but that would either require heavy mocking out of the herder's executors (which would diminish the value of the test) or take a long time (i.e., the entire herder graceful shutdown timeout) for each test run to complete. Neither of these options really seem worth it and I trust the changes in the main parts of the code base enough that I'm okay without adding additional coverage for them this time around. If that seems reasonable, we can just remove this test case. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() { PowerMock.verifyAll(); } + @Test(expected = RejectedExecutionException.class) + @SuppressWarnings("unchecked") + public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() throws InterruptedException { + ExecutorService startAndStopExecutor = EasyMock.mock(ExecutorService.class); + herder.startAndStopExecutor = startAndStopExecutor; + + Callable<Void> connectorStartingCallable = () -> null; + + EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new RejectedExecutionException()); + + PowerMock.replayAll(startAndStopExecutor); + + herder.startAndStop(Collections.singletonList(connectorStartingCallable)); + + } + + @Test + public void shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws TimeoutException { + connectProtocolVersion = CONNECT_PROTOCOL_V1; + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); + final int rebalanceDelayMs = 20000; + + // Assign the connector to this worker, and have it start + expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); + + member.wakeup(); + PowerMock.expectLastCall(); + member.requestRejoin(); + PowerMock.expectLastCall(); + member.maybeLeaveGroup(anyString()); + PowerMock.expectLastCall(); + worker.stopAndAwaitConnectors(); + PowerMock.expectLastCall(); + worker.stopAndAwaitTasks(); + PowerMock.expectLastCall(); + member.stop(); + PowerMock.expectLastCall(); + configBackingStore.stop(); + PowerMock.expectLastCall(); + statusBackingStore.stop(); + PowerMock.expectLastCall(); + worker.stop(); + PowerMock.expectLastCall(); + + // Read to config topic times out + configBackingStore.refresh(anyLong(), EasyMock.anyObject(TimeUnit.class)); + EasyMock.expectLastCall().andThrow(new TimeoutException()); + + PowerMock.replayAll(); + + // Start the herder + herder.tick(); + // and immediately stop it. + herder.stop(); + + } Review Comment: ```suggestion PowerMock.verifyAll(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org