C0urante commented on code in PR #16477:
URL: https://github.com/apache/kafka/pull/16477#discussion_r1662610208


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java:
##########
@@ -32,15 +43,59 @@
 public class RootResource {
 
     private final Herder herder;
+    private final RestRequestTimeout requestTimeout;
+    private final Time time;
 
     @Inject
-    public RootResource(Herder herder) {
+    public RootResource(Herder herder, RestRequestTimeout requestTimeout) {
+        this(herder, requestTimeout, Time.SYSTEM);
+    }
+
+    // For testing only
+    RootResource(Herder herder, RestRequestTimeout requestTimeout, Time time) {
         this.herder = herder;
+        this.requestTimeout = requestTimeout;
+        this.time = time;
     }
 
     @GET
-    @Operation(summary = "Get details about this Connect worker and the id of 
the Kafka cluster it is connected to")
+    @Operation(summary = "Get details about this Connect worker and the ID of 
the Kafka cluster it is connected to")
     public ServerInfo serverInfo() {
         return new ServerInfo(herder.kafkaClusterId());
     }
+
+    @GET
+    @Path("/health")
+    @Operation(summary = "Health check endpoint to verify worker readiness and 
liveness")
+    public Response healthCheck() throws Throwable {
+        WorkerStatus workerStatus;
+        int statusCode;
+        try {
+            FutureCallback<Void> cb = new FutureCallback<>();
+            herder.healthCheck(cb);
+
+            long timeoutNs = 
TimeUnit.MILLISECONDS.toNanos(requestTimeout.healthCheckTimeoutMs());
+            long deadlineNs = timeoutNs + time.nanoseconds();
+            time.waitForFuture(cb, deadlineNs);
+
+            statusCode = Response.Status.OK.getStatusCode();
+            workerStatus = WorkerStatus.healthy();
+        } catch (TimeoutException e) {
+            Stage stage = e instanceof StagedTimeoutException
+                    ? ((StagedTimeoutException) e).stage()
+                    : null;
+            if (!herder.isReady()) {
+                statusCode = 
Response.Status.SERVICE_UNAVAILABLE.getStatusCode();
+                workerStatus = WorkerStatus.starting(stage);
+            } else {
+                statusCode = 
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode();
+                workerStatus = WorkerStatus.unhealthy(stage);
+            }
+        } catch (ExecutionException e) {
+            throw e.getCause();

Review Comment:
   The exception mapper will catch this anyways, won't it? The intent was to 
hit [this 
part](https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L73-L77)
 of the code path and generate the standard 500-on-unexpected-error response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to