Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante merged PR #16477:
URL: https://github.com/apache/kafka/pull/16477


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante commented on PR #16477:
URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206933077

   Thanks Greg!
   
   I've realized the TODO in the code base was unnecessary and this should be 
safe to merge without waiting for more green CI runs (see the latest commit 
message for more details).
   
   Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


gharris1727 commented on code in PR #16477:
URL: https://github.com/apache/kafka/pull/16477#discussion_r1664476993


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/WorkerStatus.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.apache.kafka.connect.util.Stage;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class WorkerStatus {
+
+private final String status;
+private final String message;
+
+@JsonCreator
+private WorkerStatus(
+@JsonProperty("status") String status,
+@JsonProperty("message") String message
+) {
+this.status = status;
+this.message = message;
+}
+
+public static WorkerStatus healthy() {
+return new WorkerStatus(
+"healthy",
+"Worker has completed startup and is ready to handle requests."
+);
+}
+
+public static WorkerStatus starting(Stage stage) {

Review Comment:
   Ah yeah this is very reasonable. For some reason I thought there were a lot 
of call-sites that would need an extra null guard, but that wasn't the case. 
This is great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-03 Thread via GitHub


C0urante commented on PR #16477:
URL: https://github.com/apache/kafka/pull/16477#issuecomment-2206645942

   Thanks for the review @gharris1727! This is ready for another pass when you 
have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-02 Thread via GitHub


C0urante commented on code in PR #16477:
URL: https://github.com/apache/kafka/pull/16477#discussion_r1662610208


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java:
##
@@ -32,15 +43,59 @@
 public class RootResource {
 
 private final Herder herder;
+private final RestRequestTimeout requestTimeout;
+private final Time time;
 
 @Inject
-public RootResource(Herder herder) {
+public RootResource(Herder herder, RestRequestTimeout requestTimeout) {
+this(herder, requestTimeout, Time.SYSTEM);
+}
+
+// For testing only
+RootResource(Herder herder, RestRequestTimeout requestTimeout, Time time) {
 this.herder = herder;
+this.requestTimeout = requestTimeout;
+this.time = time;
 }
 
 @GET
-@Operation(summary = "Get details about this Connect worker and the id of 
the Kafka cluster it is connected to")
+@Operation(summary = "Get details about this Connect worker and the ID of 
the Kafka cluster it is connected to")
 public ServerInfo serverInfo() {
 return new ServerInfo(herder.kafkaClusterId());
 }
+
+@GET
+@Path("/health")
+@Operation(summary = "Health check endpoint to verify worker readiness and 
liveness")
+public Response healthCheck() throws Throwable {
+WorkerStatus workerStatus;
+int statusCode;
+try {
+FutureCallback cb = new FutureCallback<>();
+herder.healthCheck(cb);
+
+long timeoutNs = 
TimeUnit.MILLISECONDS.toNanos(requestTimeout.healthCheckTimeoutMs());
+long deadlineNs = timeoutNs + time.nanoseconds();
+time.waitForFuture(cb, deadlineNs);
+
+statusCode = Response.Status.OK.getStatusCode();
+workerStatus = WorkerStatus.healthy();
+} catch (TimeoutException e) {
+Stage stage = e instanceof StagedTimeoutException
+? ((StagedTimeoutException) e).stage()
+: null;
+if (!herder.isReady()) {
+statusCode = 
Response.Status.SERVICE_UNAVAILABLE.getStatusCode();
+workerStatus = WorkerStatus.starting(stage);
+} else {
+statusCode = 
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode();
+workerStatus = WorkerStatus.unhealthy(stage);
+}
+} catch (ExecutionException e) {
+throw e.getCause();

Review Comment:
   The exception mapper will catch this anyways, won't it? The intent was to 
hit [this 
part](https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L73-L77)
 of the code path and generate the standard 500-on-unexpected-error response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org