C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1054700417


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1658,11 +1659,20 @@ private void backoff(long ms) {
         backoffRetries = BACKOFF_RETRIES;
     }
 
-    private void startAndStop(Collection<Callable<Void>> callables) {
+    // Visible for testing
+    void startAndStop(Collection<Callable<Void>> callables) {
         try {
             startAndStopExecutor.invokeAll(callables);
         } catch (InterruptedException e) {
             // ignore
+        } catch (RejectedExecutionException e) {
+            // Shutting down. Just log the exception
+            if (stopping.get()) {
+                log.debug("RejectedExecutionException thrown while herder is 
shutting down. This could be " +
+                        "because startAndStopExecutor is either already 
shutdown or is full.");

Review Comment:
   The [Executors::newFixedThreadPool 
Javadocs](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-java.util.concurrent.ThreadFactory-)
 state that the executor operates off of an "unbounded queue", so it's 
misleading to state that this error could arise because the executor is full.
   
   It would also probably help to be clear to users that this is not a sign 
that something's wrong with the worker.
   
   ```suggestion
                   log.debug("Ignoring RejectedExecutionException thrown while 
starting/stopping connectors/tasks en masse " +
                           "as the herder is already in the process of shutting 
down. This is not indicative of a problem and is normal behavior");
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -271,6 +275,9 @@ public DistributedHerder(DistributedConfig config,
         this.workerGroupId = 
config.getString(DistributedConfig.GROUP_ID_CONFIG);
         this.workerSyncTimeoutMs = 
config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerTasksShutdownTimeoutMs = 
config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+        // Timeout for herderExecutor to gracefully terminate is set to a 
value to accommodate
+        // reading to the end of the config topic + successfully attempting to 
stop all connectors and tasks and a buffer of 10s
+        this.herderExecutorTerminationTimeoutMs = this.workerSyncTimeoutMs + 
this.workerTasksShutdownTimeoutMs + 
Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS + 10000;

Review Comment:
   Do these have to be stored as instance variables? It clutters up the class a 
bit to keep having so many private fields that are only used once.
   
   Maybe we could remove the `workerTasksShutdownTimeoutMs` and 
`herderExecutorTerminationTimeoutMs` fields, and inline the logic responsible 
for computing them and move it into `stop`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = RejectedExecutionException.class)
+    @SuppressWarnings("unchecked")
+    public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 throws InterruptedException {
+        ExecutorService startAndStopExecutor = 
EasyMock.mock(ExecutorService.class);
+        herder.startAndStopExecutor = startAndStopExecutor;
+
+        Callable<Void> connectorStartingCallable = () -> null;
+
+        
EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new
 RejectedExecutionException());
+
+        PowerMock.replayAll(startAndStopExecutor);
+
+        
herder.startAndStop(Collections.singletonList(connectorStartingCallable));
+
+    }
+
+    @Test
+    public void 
shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws 
TimeoutException {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+        final int rebalanceDelayMs = 20000;
+
+        // Assign the connector to this worker, and have it start
+        expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+        member.maybeLeaveGroup(anyString());
+        PowerMock.expectLastCall();
+        worker.stopAndAwaitConnectors();
+        PowerMock.expectLastCall();
+        worker.stopAndAwaitTasks();
+        PowerMock.expectLastCall();
+        member.stop();
+        PowerMock.expectLastCall();
+        configBackingStore.stop();
+        PowerMock.expectLastCall();
+        statusBackingStore.stop();
+        PowerMock.expectLastCall();
+        worker.stop();
+        PowerMock.expectLastCall();

Review Comment:
   None of these calls are actually taking place; this becomes apparent when 
adding `PowerMock::verifyAll` to the end of the test.
   
   ```suggestion
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = RejectedExecutionException.class)
+    @SuppressWarnings("unchecked")
+    public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 throws InterruptedException {
+        ExecutorService startAndStopExecutor = 
EasyMock.mock(ExecutorService.class);
+        herder.startAndStopExecutor = startAndStopExecutor;
+
+        Callable<Void> connectorStartingCallable = () -> null;
+
+        
EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new
 RejectedExecutionException());
+
+        PowerMock.replayAll(startAndStopExecutor);
+
+        
herder.startAndStop(Collections.singletonList(connectorStartingCallable));
+
+    }

Review Comment:
   I really like that we've added test coverage for this, but I think we can 
get better guarantees by testing with `Herder::tick` and using the herder's 
actual `startAndStopExecutor`, instead of mocking out the executor and calling 
`startAndStop` directly:
   
   ```suggestion
       @Test
       public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 {
           EasyMock.expect(member.memberId()).andStubReturn("leader");
           expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
           expectConfigRefreshAndSnapshot(SNAPSHOT);
           
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
   
           PowerMock.replayAll();
   
           herder.startAndStopExecutor.shutdown();
           assertThrows(RejectedExecutionException.class, herder::tick);
   
           PowerMock.verifyAll();
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = RejectedExecutionException.class)
+    @SuppressWarnings("unchecked")
+    public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 throws InterruptedException {
+        ExecutorService startAndStopExecutor = 
EasyMock.mock(ExecutorService.class);
+        herder.startAndStopExecutor = startAndStopExecutor;
+
+        Callable<Void> connectorStartingCallable = () -> null;
+
+        
EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new
 RejectedExecutionException());
+
+        PowerMock.replayAll(startAndStopExecutor);
+
+        
herder.startAndStop(Collections.singletonList(connectorStartingCallable));
+
+    }
+
+    @Test
+    public void 
shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws 
TimeoutException {

Review Comment:
   This isn't really testing what it claims to be testing. What I'd expect from 
at test like this is verification that even if the herder's tick thread is 
blocked during a read of the config topic, `Herder::stop` can and will proceed 
gracefully after any graceful shutdown timeouts have expired.
   
   In reality, what's happening is that `Herder::tick` is called once, returns 
immediately (gracefully handling a `TimeoutException` while attempting to read 
to the end of the config topic), and then `Herder::stop` is called afterward 
without any 
   other simultaneous herder activity (like a blocked read to the end of the 
config topic) taking place.
   
   TBH, although it'd be nice to have coverage for a case like this, I don't 
think it's strictly necessary. We could theoretically use the 
`startBackgroundHerder` utility method to have `tick` continuously called in 
the background, set up a block while reading the config topic that never 
returns (at least until the rest of the test is complete), and then invoke 
`Herder::stop` on the main test thread, but that would either require heavy 
mocking out of the herder's executors (which would diminish the value of the 
test) or take a long time (i.e., the entire herder graceful shutdown timeout) 
for each test run to complete. 
   
   Neither of these options really seem worth it and I trust the changes in the 
main parts of the code base enough that I'm okay without adding additional 
coverage for them this time around.
   
   If that seems reasonable, we can just remove this test case.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = RejectedExecutionException.class)
+    @SuppressWarnings("unchecked")
+    public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 throws InterruptedException {
+        ExecutorService startAndStopExecutor = 
EasyMock.mock(ExecutorService.class);
+        herder.startAndStopExecutor = startAndStopExecutor;
+
+        Callable<Void> connectorStartingCallable = () -> null;
+
+        
EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new
 RejectedExecutionException());
+
+        PowerMock.replayAll(startAndStopExecutor);
+
+        
herder.startAndStop(Collections.singletonList(connectorStartingCallable));
+
+    }
+
+    @Test
+    public void 
shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws 
TimeoutException {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+        final int rebalanceDelayMs = 20000;
+
+        // Assign the connector to this worker, and have it start
+        expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+        member.maybeLeaveGroup(anyString());
+        PowerMock.expectLastCall();
+        worker.stopAndAwaitConnectors();
+        PowerMock.expectLastCall();
+        worker.stopAndAwaitTasks();
+        PowerMock.expectLastCall();
+        member.stop();
+        PowerMock.expectLastCall();
+        configBackingStore.stop();
+        PowerMock.expectLastCall();
+        statusBackingStore.stop();
+        PowerMock.expectLastCall();
+        worker.stop();
+        PowerMock.expectLastCall();
+
+        // Read to config topic times out
+        configBackingStore.refresh(anyLong(), 
EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall().andThrow(new TimeoutException());
+
+        PowerMock.replayAll();
+
+        // Start the herder
+        herder.tick();
+        // and immediately stop it.
+        herder.stop();
+
+    }

Review Comment:
   ```suggestion
           PowerMock.verifyAll();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to