Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-13 Thread via GitHub


zzzk1 commented on PR #16451:
URL: https://github.com/apache/kafka/pull/16451#issuecomment-2226903656

   @C0urante Improved code based on your suggestion, ptal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-12 Thread via GitHub


C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1676172863


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java:
##
@@ -47,6 +50,11 @@ public Connect(H herder, ConnectRestServer rest) {
 shutdownHook = new ShutdownHook();
 }
 
+// public for testing
+public Future getDistributedHerderFuture() {

Review Comment:
   Nit: We don't use the `get` prefix in this codebase. Also, this method can 
technically be called for more than just distributed workers. Maybe we could 
call it `herderTask()` and note in the Javadocs that it will return null if the 
herder type doesn't have a separate work thread?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -232,6 +233,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
 private final DistributedConfig config;
 
+// visible for testing
+public Future herderFuture;

Review Comment:
   This field should still be `private`, and only exposed via a getter:
   ```java
   private Future herderFuture;
   
   ...
   
   public Future herderFuture() {
   return herderFuture;
   }
   ```
   
   Otherwise, it could be modified by other classes, which we almost certainly 
do not want.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java:
##
@@ -152,11 +156,20 @@ public void 
testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I
 "Body did not contain expected message detailing the 
worker's in-progress operation: " + body
 );
 }
+
 connect.resetRequestTimeout();
 
+Future future = worker.getDistributedHerderFuture();
+
+try {
+future.get(5000, TimeUnit.MILLISECONDS);
+}  catch (TimeoutException | ExecutionException exception) {
+log.error("Failed to start a worker:", exception);
+future.cancel(true);
+}

Review Comment:
   Few things:
   1. We don't need or want to cancel the future; resource cleanup should take 
place when we shut down the cluster at the end of the test (see 
[here](https://github.com/apache/kafka/blob/0ada8fac6869cad8ac33a79032cf5d57bfa2a3ea/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java#L61-L65))
   2. We should fail the test on a `TimeoutException`, since it indicates that 
the worker hasn't failed to startup yet
   3. We don't need to log anything, especially not at `ERROR` level, if we get 
an `ExecutionException`
   4. We should probably go with a higher timeout; our CI can be really slow 
sometimes and if we do something too low it might cause flakiness
   
   ```suggestion
   assertThrows(
   ExecutionException.class,
   () -> future.get(1, TimeUnit.MINUTES)
   );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-10 Thread via GitHub


zzzk1 commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1672424490


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -231,6 +232,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 Thread herderThread;
 
 private final DistributedConfig config;
+
+public static Future herderFuture;

Review Comment:
   I have Imporved this place, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-07 Thread via GitHub


C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1667658373


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -231,6 +232,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 Thread herderThread;
 
 private final DistributedConfig config;
+
+public static Future herderFuture;

Review Comment:
   I don't think this can be `static`; if it is, we won't be able to use it 
with multiple workers in a single integration test (which we do very 
frequently).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-03 Thread via GitHub


C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1664292432


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -363,7 +364,17 @@ public DistributedHerder(DistributedConfig config,
 
 @Override
 public void start() {
-this.herderExecutor.submit(this);
+Future future = this.herderExecutor.submit(this);
+try {
+future.get(6, TimeUnit.SECONDS);
+} catch (TimeoutException timeoutException) {
+log.error("herder work thread timeout:", timeoutException);
+future.cancel(true);
+} catch (InterruptedException interruptedException) {
+Thread.currentThread().interrupt();
+} catch (ExecutionException executionException) {
+log.error("herder work thread execution exception:", 
executionException);

Review Comment:
   We don't want to change `start` to block. Can we save the future in a field 
in this class, then expose something like a `Future herderTask()` method 
that allows testing code to access that future and, if desired, block on it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-02 Thread via GitHub


zzzk1 opened a new pull request, #16451:
URL: https://github.com/apache/kafka/pull/16451

   Synchronously verify worker status when created
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org