This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_10x by this push:
     new 1fd74e1d146 SOLR-18061: CrossDC Consumer - add /health endpoint 
(cherry-pick from #4126) (#4151)
1fd74e1d146 is described below

commit 1fd74e1d146a11c20e1dc6add69bc538010d5497
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Fri Feb 20 16:34:54 2026 +0100

    SOLR-18061: CrossDC Consumer - add /health endpoint (cherry-pick from 
#4126) (#4151)
---
 changelog/unreleased/solr-18061.yml                |  9 +++
 .../solr/crossdc/manager/consumer/Consumer.java    |  2 +
 .../manager/consumer/HealthCheckServlet.java       | 67 ++++++++++++++++++++++
 .../manager/consumer/KafkaCrossDcConsumer.java     | 63 +++++++++++++++++++-
 .../manager/SolrAndKafkaIntegrationTest.java       | 32 ++++++++++-
 .../pages/cross-dc-replication.adoc                |  3 +-
 6 files changed, 172 insertions(+), 4 deletions(-)

diff --git a/changelog/unreleased/solr-18061.yml 
b/changelog/unreleased/solr-18061.yml
new file mode 100644
index 00000000000..d0df2494de0
--- /dev/null
+++ b/changelog/unreleased/solr-18061.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: "CrossDC Consumer: add /health endpoint"
+type: added # added, changed, fixed, deprecated, removed, dependency_update, 
security, other
+authors:
+  - name: Andrzej Bialecki
+    nick: ab
+links:
+  - name: SOLR-18061
+    url: https://issues.apache.org/jira/browse/SOLR-18061
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
index ca3d2a16532..59b62da42be 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
@@ -96,6 +96,8 @@ public class Consumer {
       context.setAttribute(
           MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE, 
metrics.getMetricManager());
       context.addServlet(MetricsServlet.class, "/metrics/*");
+      context.setAttribute(HealthCheckServlet.KAFKA_CROSSDC_CONSUMER, 
crossDcConsumer);
+      context.addServlet(HealthCheckServlet.class, "/health/*");
 
       for (ServletMapping mapping : 
context.getServletHandler().getServletMappings()) {
         if (log.isInfoEnabled()) {
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
new file mode 100644
index 00000000000..49a55bd2845
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+
+public class HealthCheckServlet extends HttpServlet {
+  private static final long serialVersionUID = -7848291432584409313L;
+
+  public static final String KAFKA_CROSSDC_CONSUMER =
+      HealthCheckServlet.class.getName() + ".kafkaCrossDcConsumer";
+
+  private KafkaCrossDcConsumer consumer;
+
+  @Override
+  public void init() throws ServletException {
+    consumer = (KafkaCrossDcConsumer) 
getServletContext().getAttribute(KAFKA_CROSSDC_CONSUMER);
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    if (consumer == null) {
+      resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+      return;
+    }
+    boolean kafkaConnected = consumer.isKafkaConnected();
+    boolean solrConnected = consumer.isSolrConnected();
+    boolean running = consumer.isRunning();
+    String content =
+        String.format(
+            Locale.ROOT,
+            "{\n  \"kafka\": %s,\n  \"solr\": %s,\n  \"running\": %s\n}",
+            kafkaConnected,
+            solrConnected,
+            running);
+    resp.setContentType("application/json");
+    resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
+    resp.setCharacterEncoding("UTF-8");
+    resp.getOutputStream().write(content.getBytes(StandardCharsets.UTF_8));
+    if (kafkaConnected && solrConnected && running) {
+      resp.setStatus(HttpServletResponse.SC_OK);
+    } else {
+      resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+    }
+  }
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index 19a8a4d115c..5e1af44d35c 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -25,16 +25,19 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -42,7 +45,9 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.HealthCheckRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.HealthCheckResponse;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -72,10 +77,12 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
+  private final AdminClient adminClient;
   private final CountDownLatch startLatch;
   KafkaMirroringSink kafkaMirroringSink;
 
   private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+
   private final String[] topicNames;
   private final int maxAttempts;
   private final CrossDcConf.CollapseUpdates collapseUpdates;
@@ -83,7 +90,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private final SolrMessageProcessor messageProcessor;
   protected final ConsumerMetrics metrics;
 
-  protected SolrClientSupplier solrClientSupplier;
+  protected final SolrClientSupplier solrClientSupplier;
 
   private final ThreadPoolExecutor executor;
 
@@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
 
+  private volatile boolean running = false;
+
   /**
    * Supplier for creating and managing a working CloudSolrClient instance. 
This class ensures that
    * the CloudSolrClient instance doesn't try to use its {@link
@@ -224,6 +233,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
     log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProps);
     kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
+    adminClient = createKafkaAdminClient(kafkaConsumerProps);
     partitionManager = new PartitionManager(kafkaConsumer);
     // Create producer for resubmitting failed requests
     log.info("Creating Kafka resubmit producer");
@@ -244,6 +254,10 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         properties, new StringDeserializer(), new 
MirroredSolrRequestSerializer());
   }
 
+  public AdminClient createKafkaAdminClient(Properties properties) {
+    return AdminClient.create(properties);
+  }
+
   protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) 
{
     return new KafkaMirroringSink(conf);
   }
@@ -269,18 +283,25 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
       log.info("Consumer started");
       startLatch.countDown();
+      running = true;
 
       while (pollAndProcessRequests()) {
         // no-op within this loop: everything is done in 
pollAndProcessRequests method defined
         // above.
       }
+      running = false;
 
-      log.info("Closed kafka consumer. Exiting now.");
+      log.info("Closing kafka consumer. Exiting now.");
       try {
         kafkaConsumer.close();
       } catch (Exception e) {
         log.warn("Failed to close kafka consumer", e);
       }
+      try {
+        adminClient.close();
+      } catch (Exception e) {
+        log.warn("Failed to close kafka admin client", e);
+      }
 
       try {
         kafkaMirroringSink.close();
@@ -292,6 +313,44 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     }
   }
 
+  public boolean isRunning() {
+    return running;
+  }
+
+  public boolean isSolrConnected() {
+    if (solrClientSupplier == null) {
+      return false;
+    }
+    try {
+      HealthCheckRequest request = new HealthCheckRequest();
+      HealthCheckResponse response = request.process(solrClientSupplier.get());
+      if (response.getStatus() != 0) {
+        return false;
+      }
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public boolean isKafkaConnected() {
+    if (adminClient == null) {
+      return false;
+    }
+    try {
+      Collection<Node> nodes = adminClient.describeCluster().nodes().get();
+      if (nodes == null || nodes.isEmpty()) {
+        return false;
+      }
+      return true;
+    } catch (InterruptedException | ExecutionException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      return false;
+    }
+  }
+
   /**
    * Polls and processes the requests from Kafka. This method returns false 
when the consumer needs
    * to be shutdown i.e. when there's a wakeup exception.
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 7d7941dc442..39970d7b4ca 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -68,6 +68,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.noggit.ObjectBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -341,7 +342,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
 
   @Test
   @SuppressWarnings({"unchecked"})
-  public void testMetrics() throws Exception {
+  public void testMetricsAndHealthcheck() throws Exception {
     CloudSolrClient client = solrCluster1.getSolrClient();
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField("id", String.valueOf(new Date().getTime()));
@@ -359,6 +360,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     HttpJettySolrClient httpJettySolrClient =
         new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build();
     try {
+      // test the metrics endpoint
       GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET, 
"/metrics");
       req.setResponseParser(new InputStreamResponseParser(null));
       NamedList<Object> rsp = httpJettySolrClient.request(req);
@@ -366,6 +368,34 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
           IOUtils.toString(
               (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), 
StandardCharsets.UTF_8);
       assertTrue(content, content.contains("crossdc_consumer_output_total"));
+
+      // test the healtcheck endpoint
+      req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/health");
+      req.setResponseParser(new InputStreamResponseParser(null));
+      rsp = httpJettySolrClient.request(req);
+      content =
+          IOUtils.toString(
+              (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), 
StandardCharsets.UTF_8);
+      assertEquals(Integer.valueOf(200), rsp.get("responseStatus"));
+      Map<String, Object> map = (Map<String, Object>) 
ObjectBuilder.fromJSON(content);
+      assertEquals(Boolean.TRUE, map.get("kafka"));
+      assertEquals(Boolean.TRUE, map.get("solr"));
+      assertEquals(Boolean.TRUE, map.get("running"));
+
+      // kill Solr to trigger unhealthy state
+      solrCluster2.shutdown();
+      solrCluster2 = null;
+      Thread.sleep(5000);
+      rsp = httpJettySolrClient.request(req);
+      content =
+          IOUtils.toString(
+              (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), 
StandardCharsets.UTF_8);
+      assertEquals(Integer.valueOf(503), rsp.get("responseStatus"));
+      map = (Map<String, Object>) ObjectBuilder.fromJSON(content);
+      assertEquals(Boolean.TRUE, map.get("kafka"));
+      assertEquals(Boolean.FALSE, map.get("solr"));
+      assertEquals(Boolean.TRUE, map.get("running"));
+
     } finally {
       httpJettySolrClient.close();
       client.close();
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index 8211aa04e1d..bb235dcdc7c 100644
--- 
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++ 
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -151,8 +151,9 @@ system property.
 
 Currently the following endpoints are exposed (on local port configured using 
`port` property, default is 8090):
 
-`/metrics` - (GET):: This endpoint returns JSON-formatted metrics describing 
various aspects of document processing in Consumer.
+`/metrics` - (GET):: This endpoint returns metrics in Prometheus text format, 
describing various aspects of document processing in Consumer.
 `/threads` - (GET):: Returns a plain-text thread dump of the JVM running the 
Consumer application.
+`/health` - (GET):: Returns an `HTTP 200 OK` code if the service is in a 
healthy state, or `HTTP 503 Service Unavailable` if one or more healthcheck 
probes failed. The JSON response body provides more details.
 
 ==== Configuration Properties for the CrossDC Manager:
 

Reply via email to