This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new be3e2afe890 SOLR-18061: CrossDC Consumer - add /health endpoint (#4126)
be3e2afe890 is described below
commit be3e2afe89011a50487cccfdf1a6487fcdaef2fa
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Fri Feb 20 15:56:06 2026 +0100
SOLR-18061: CrossDC Consumer - add /health endpoint (#4126)
---
changelog/unreleased/solr-18061.yml | 9 +++
.../solr/crossdc/manager/consumer/Consumer.java | 2 +
.../manager/consumer/HealthCheckServlet.java | 67 ++++++++++++++++++++++
.../manager/consumer/KafkaCrossDcConsumer.java | 63 +++++++++++++++++++-
.../manager/SolrAndKafkaIntegrationTest.java | 32 ++++++++++-
.../pages/cross-dc-replication.adoc | 3 +-
6 files changed, 172 insertions(+), 4 deletions(-)
diff --git a/changelog/unreleased/solr-18061.yml
b/changelog/unreleased/solr-18061.yml
new file mode 100644
index 00000000000..d0df2494de0
--- /dev/null
+++ b/changelog/unreleased/solr-18061.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: "CrossDC Consumer: add /health endpoint"
+type: added # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: Andrzej Bialecki
+ nick: ab
+links:
+ - name: SOLR-18061
+ url: https://issues.apache.org/jira/browse/SOLR-18061
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
index ca3d2a16532..59b62da42be 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
@@ -96,6 +96,8 @@ public class Consumer {
context.setAttribute(
MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE,
metrics.getMetricManager());
context.addServlet(MetricsServlet.class, "/metrics/*");
+ context.setAttribute(HealthCheckServlet.KAFKA_CROSSDC_CONSUMER,
crossDcConsumer);
+ context.addServlet(HealthCheckServlet.class, "/health/*");
for (ServletMapping mapping :
context.getServletHandler().getServletMappings()) {
if (log.isInfoEnabled()) {
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
new file mode 100644
index 00000000000..49a55bd2845
--- /dev/null
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+
+public class HealthCheckServlet extends HttpServlet {
+ private static final long serialVersionUID = -7848291432584409313L;
+
+ public static final String KAFKA_CROSSDC_CONSUMER =
+ HealthCheckServlet.class.getName() + ".kafkaCrossDcConsumer";
+
+ private KafkaCrossDcConsumer consumer;
+
+ @Override
+ public void init() throws ServletException {
+ consumer = (KafkaCrossDcConsumer)
getServletContext().getAttribute(KAFKA_CROSSDC_CONSUMER);
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ if (consumer == null) {
+ resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ return;
+ }
+ boolean kafkaConnected = consumer.isKafkaConnected();
+ boolean solrConnected = consumer.isSolrConnected();
+ boolean running = consumer.isRunning();
+ String content =
+ String.format(
+ Locale.ROOT,
+ "{\n \"kafka\": %s,\n \"solr\": %s,\n \"running\": %s\n}",
+ kafkaConnected,
+ solrConnected,
+ running);
+ resp.setContentType("application/json");
+ resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
+ resp.setCharacterEncoding("UTF-8");
+ resp.getOutputStream().write(content.getBytes(StandardCharsets.UTF_8));
+ if (kafkaConnected && solrConnected && running) {
+ resp.setStatus(HttpServletResponse.SC_OK);
+ } else {
+ resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ }
+ }
+}
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index 19a8a4d115c..5e1af44d35c 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -25,16 +25,19 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
@@ -42,7 +45,9 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.HealthCheckRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.HealthCheckResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@@ -72,10 +77,12 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
+ private final AdminClient adminClient;
private final CountDownLatch startLatch;
KafkaMirroringSink kafkaMirroringSink;
private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+
private final String[] topicNames;
private final int maxAttempts;
private final CrossDcConf.CollapseUpdates collapseUpdates;
@@ -83,7 +90,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final SolrMessageProcessor messageProcessor;
protected final ConsumerMetrics metrics;
- protected SolrClientSupplier solrClientSupplier;
+ protected final SolrClientSupplier solrClientSupplier;
private final ThreadPoolExecutor executor;
@@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
+ private volatile boolean running = false;
+
/**
* Supplier for creating and managing a working CloudSolrClient instance.
This class ensures that
* the CloudSolrClient instance doesn't try to use its {@link
@@ -224,6 +233,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProps);
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
+ adminClient = createKafkaAdminClient(kafkaConsumerProps);
partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
@@ -244,6 +254,10 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
properties, new StringDeserializer(), new
MirroredSolrRequestSerializer());
}
+ public AdminClient createKafkaAdminClient(Properties properties) {
+ return AdminClient.create(properties);
+ }
+
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
{
return new KafkaMirroringSink(conf);
}
@@ -269,18 +283,25 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.info("Consumer started");
startLatch.countDown();
+ running = true;
while (pollAndProcessRequests()) {
// no-op within this loop: everything is done in
pollAndProcessRequests method defined
// above.
}
+ running = false;
- log.info("Closed kafka consumer. Exiting now.");
+ log.info("Closing kafka consumer. Exiting now.");
try {
kafkaConsumer.close();
} catch (Exception e) {
log.warn("Failed to close kafka consumer", e);
}
+ try {
+ adminClient.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka admin client", e);
+ }
try {
kafkaMirroringSink.close();
@@ -292,6 +313,44 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
}
}
+ public boolean isRunning() {
+ return running;
+ }
+
+ public boolean isSolrConnected() {
+ if (solrClientSupplier == null) {
+ return false;
+ }
+ try {
+ HealthCheckRequest request = new HealthCheckRequest();
+ HealthCheckResponse response = request.process(solrClientSupplier.get());
+ if (response.getStatus() != 0) {
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public boolean isKafkaConnected() {
+ if (adminClient == null) {
+ return false;
+ }
+ try {
+ Collection<Node> nodes = adminClient.describeCluster().nodes().get();
+ if (nodes == null || nodes.isEmpty()) {
+ return false;
+ }
+ return true;
+ } catch (InterruptedException | ExecutionException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+ }
+
/**
* Polls and processes the requests from Kafka. This method returns false
when the consumer needs
* to be shutdown i.e. when there's a wakeup exception.
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 7d7941dc442..39970d7b4ca 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -68,6 +68,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -341,7 +342,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
@Test
@SuppressWarnings({"unchecked"})
- public void testMetrics() throws Exception {
+ public void testMetricsAndHealthcheck() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(new Date().getTime()));
@@ -359,6 +360,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
HttpJettySolrClient httpJettySolrClient =
new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build();
try {
+ // test the metrics endpoint
GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/metrics");
req.setResponseParser(new InputStreamResponseParser(null));
NamedList<Object> rsp = httpJettySolrClient.request(req);
@@ -366,6 +368,34 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
IOUtils.toString(
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY),
StandardCharsets.UTF_8);
assertTrue(content, content.contains("crossdc_consumer_output_total"));
+
+ // test the healtcheck endpoint
+ req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/health");
+ req.setResponseParser(new InputStreamResponseParser(null));
+ rsp = httpJettySolrClient.request(req);
+ content =
+ IOUtils.toString(
+ (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY),
StandardCharsets.UTF_8);
+ assertEquals(Integer.valueOf(200), rsp.get("responseStatus"));
+ Map<String, Object> map = (Map<String, Object>)
ObjectBuilder.fromJSON(content);
+ assertEquals(Boolean.TRUE, map.get("kafka"));
+ assertEquals(Boolean.TRUE, map.get("solr"));
+ assertEquals(Boolean.TRUE, map.get("running"));
+
+ // kill Solr to trigger unhealthy state
+ solrCluster2.shutdown();
+ solrCluster2 = null;
+ Thread.sleep(5000);
+ rsp = httpJettySolrClient.request(req);
+ content =
+ IOUtils.toString(
+ (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY),
StandardCharsets.UTF_8);
+ assertEquals(Integer.valueOf(503), rsp.get("responseStatus"));
+ map = (Map<String, Object>) ObjectBuilder.fromJSON(content);
+ assertEquals(Boolean.TRUE, map.get("kafka"));
+ assertEquals(Boolean.FALSE, map.get("solr"));
+ assertEquals(Boolean.TRUE, map.get("running"));
+
} finally {
httpJettySolrClient.close();
client.close();
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index 8211aa04e1d..bb235dcdc7c 100644
---
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -151,8 +151,9 @@ system property.
Currently the following endpoints are exposed (on local port configured using
`port` property, default is 8090):
-`/metrics` - (GET):: This endpoint returns JSON-formatted metrics describing
various aspects of document processing in Consumer.
+`/metrics` - (GET):: This endpoint returns metrics in Prometheus text format,
describing various aspects of document processing in Consumer.
`/threads` - (GET):: Returns a plain-text thread dump of the JVM running the
Consumer application.
+`/health` - (GET):: Returns an `HTTP 200 OK` code if the service is in a
healthy state, or `HTTP 503 Service Unavailable` if one or more healthcheck
probes failed. The JSON response body provides more details.
==== Configuration Properties for the CrossDC Manager: