This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d833b8bef21 [improve][broker] If there is a deadlock in the service,
the probe should return a failure because the service may be unavailable
(#23634)
d833b8bef21 is described below
commit d833b8bef21cb9e85f0f313eb9d49c7ca550fbbd
Author: yangyijun <[email protected]>
AuthorDate: Fri Sep 19 18:53:34 2025 +0800
[improve][broker] If there is a deadlock in the service, the probe should
return a failure because the service may be unavailable (#23634)
Co-authored-by: Lari Hotari <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
pulsar-broker-common/pom.xml | 6 +
.../pulsar/common/configuration/VipStatus.java | 99 +++++++++++--
.../pulsar/common/configuration/VipStatusTest.java | 161 +++++++++++++++++++++
3 files changed, 254 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 06b855a3f18..50ce9f466c3 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -94,6 +94,12 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index 5e6a31b323f..aa4ec1109a6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -18,8 +18,15 @@
*/
package org.apache.pulsar.common.configuration;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.time.Clock;
+import java.util.Arrays;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -27,6 +34,7 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.ThreadDumpUtil;
/**
* Web resource used by the VIP service to check to availability of the
service instance.
@@ -38,25 +46,92 @@ public class VipStatus {
public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
+ // log a full thread dump when a deadlock is detected in status check once
every 10 minutes
+ // to prevent excessive logging
+ private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED =
600000L;
+ // Rate limit status checks to every 500ms to prevent DoS
+ private static final long CHECK_STATUS_INTERVAL = 500L;
+
+ private static volatile long lastCheckStatusTimestamp;
+ private static volatile long lastPrintThreadDumpTimestamp;
+ private static volatile boolean lastCheckStatusResult;
+
+ private long printThreadDumpIntervalMs;
+ private Clock clock;
+
@Context
protected ServletContext servletContext;
+ public VipStatus() {
+ this.clock = Clock.systemUTC();
+ this.printThreadDumpIntervalMs =
LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED;
+ }
+
+ @VisibleForTesting
+ public VipStatus(ServletContext servletContext, long
printThreadDumpIntervalMs) {
+ this.servletContext = servletContext;
+ this.printThreadDumpIntervalMs = printThreadDumpIntervalMs;
+ this.clock = Clock.systemUTC();
+ }
+
+ @VisibleForTesting
+ static void reset() {
+ lastCheckStatusTimestamp = 0L;
+ lastPrintThreadDumpTimestamp = 0L;
+ lastCheckStatusResult = false;
+ }
+
@GET
public String checkStatus() {
- String statusFilePath = (String)
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
- @SuppressWarnings("unchecked")
- Supplier<Boolean> isReadyProbe = (Supplier<Boolean>)
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+ // Locking classes to avoid deadlock detection in multi-thread
concurrent requests.
+ synchronized (VipStatus.class) {
+ if (clock.millis() - lastCheckStatusTimestamp <
CHECK_STATUS_INTERVAL) {
+ if (lastCheckStatusResult) {
+ return "OK";
+ } else {
+ throw new
WebApplicationException(Status.SERVICE_UNAVAILABLE);
+ }
+ }
+ lastCheckStatusTimestamp = clock.millis();
- boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
+ String statusFilePath = (String)
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
+ @SuppressWarnings("unchecked")
+ Supplier<Boolean> isReadyProbe = (Supplier<Boolean>)
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+ boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
- if (statusFilePath != null) {
- File statusFile = new File(statusFilePath);
- if (isReady && statusFile.exists() && statusFile.isFile()) {
- return "OK";
+ if (statusFilePath != null) {
+ File statusFile = new File(statusFilePath);
+ if (isReady && statusFile.exists() && statusFile.isFile()) {
+ // check deadlock
+ ThreadMXBean threadBean =
ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadBean.findDeadlockedThreads();
+ if (threadIds != null && threadIds.length > 0) {
+ ThreadInfo[] threadInfos =
threadBean.getThreadInfo(threadIds, false,
+ false);
+ String threadNames = Arrays.stream(threadInfos)
+ .map(threadInfo -> threadInfo.getThreadName()
+ + "(tid=" + threadInfo.getThreadId() +
")")
+ .collect(Collectors.joining(", "));
+ if (clock.millis() - lastPrintThreadDumpTimestamp >
printThreadDumpIntervalMs) {
+ String diagnosticResult =
ThreadDumpUtil.buildThreadDiagnosticString();
+ log.error("Deadlocked threads detected. {}.
Service may be unavailable, "
+ + "thread stack details are as
follows:\n{}", threadNames, diagnosticResult);
+ lastPrintThreadDumpTimestamp = clock.millis();
+ } else {
+ log.error("Deadlocked threads detected. {}",
threadNames);
+ }
+ lastCheckStatusResult = false;
+ throw new
WebApplicationException(Status.SERVICE_UNAVAILABLE);
+ } else {
+ lastCheckStatusResult = true;
+ return "OK";
+ }
+ }
}
+ lastCheckStatusResult = false;
+ log.warn("Status file '{}' doesn't exist or ready probe value ({})
isn't true. The service is not ready",
+ statusFilePath, isReady);
+ throw new WebApplicationException(Status.NOT_FOUND);
}
- log.warn("Failed to access \"status.html\". The service is not ready");
- throw new WebApplicationException(Status.NOT_FOUND);
}
-
-}
+}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
new file mode 100644
index 00000000000..36542d237b5
--- /dev/null
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration;
+
+import static org.testng.Assert.assertEquals;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import javax.servlet.ServletContext;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.util.Files;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class VipStatusTest {
+
+ public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
+ public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
+ private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED =
10000L;
+ // Rate limit status checks to every 500ms to prevent DoS
+ private static final long CHECK_STATUS_INTERVAL = 500L;
+
+ private ServletContext mockServletContext;
+ private VipStatus vipStatus;
+ private File file;
+
+ @BeforeMethod
+ public void setup() throws IOException {
+ file = Files.newTemporaryFile();
+ Supplier<Boolean> isReadyProbe = () -> true;
+ mockServletContext = Mockito.mock(ServletContext.class);
+
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(file.getAbsolutePath());
+
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe);
+ vipStatus = new VipStatus(mockServletContext,
LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED);
+ VipStatus.reset();
+ }
+
+ @Test
+ public void testVipStatusCheckStatus() {
+ // No deadlocks
+ testVipStatusCheckStatusWithoutDeadlock();
+ // There is a deadlock
+ testVipStatusCheckStatusWithDeadlock();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void release() throws IOException {
+ if (file != null) {
+ file.delete();
+ file = null;
+ }
+ }
+
+ @Test
+ public void testVipStatusCheckStatusWithoutDeadlock() {
+ assertEquals(vipStatus.checkStatus(), "OK");
+ }
+
+ @Test
+ public void testVipStatusCheckStatusWithDeadlock() {
+ MockDeadlock mockDeadlock = new MockDeadlock();
+ boolean asExpected = true;
+ try {
+ mockDeadlock.startDeadlock();
+ vipStatus.checkStatus();
+ asExpected = false;
+ System.out.println("Simulated deadlock, no deadlock detected, not
as expected.");
+ } catch (Exception wae) {
+ System.out.println("Simulated deadlock and detected it, as
expected.");
+ } finally {
+ mockDeadlock.close();
+ }
+
+ if (!asExpected) {
+ throw new
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
+ }
+ }
+
+ static class MockDeadlock implements Closeable {
+ private ExecutorService executorService =
Executors.newCachedThreadPool();
+ private ReentrantLock lockA = new ReentrantLock();
+ private ReentrantLock lockB = new ReentrantLock();
+ private Phaser phaser = new Phaser(2);
+
+ @SneakyThrows
+ public void startDeadlock() {
+ executorService.execute(new TaskOne());
+ executorService.execute(new TaskTwo());
+ Thread.sleep(CHECK_STATUS_INTERVAL);
+ }
+
+ @Override
+ public void close() {
+ executorService.shutdownNow();
+ }
+
+ private class TaskOne implements Runnable {
+ @Override
+ public void run() {
+ try {
+ lockA.lock();
+ System.out.println("ThreadOne acquired lockA");
+ phaser.arriveAndAwaitAdvance();
+ while (!lockB.tryLock(1, TimeUnit.SECONDS)) {
+ System.out.println("ThreadOne acquired lockB");
+ }
+ } catch (InterruptedException e) {
+ //e.printStackTrace();
+ } finally {
+ lockA.unlock();
+ }
+ }
+ }
+
+ private class TaskTwo implements Runnable {
+ @Override
+ public void run() {
+ try {
+ lockB.lock();
+ System.out.println("ThreadOne acquired lockB");
+ phaser.arriveAndAwaitAdvance();
+ while (!lockA.tryLock(1, TimeUnit.SECONDS)) {
+ System.out.println("ThreadOne acquired lockA");
+ }
+ } catch (InterruptedException e) {
+ //e.printStackTrace();
+ } finally {
+ lockB.unlock();
+ }
+ }
+ }
+ }
+}