This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 956164fe6c6 [improve][broker] avoid creating new objects when 
intercepting (#22790)
956164fe6c6 is described below

commit 956164fe6c62a78c4bd178d553a13715b1b83de7
Author: Qiang Zhao <mattisonc...@apache.org>
AuthorDate: Tue May 28 22:45:30 2024 +0800

    [improve][broker] avoid creating new objects when intercepting (#22790)
---
 .../BrokerInterceptorWithClassLoader.java          | 127 +++++++++++++++++----
 .../intercept/BrokerInterceptorUtilsTest.java      |   2 +-
 .../BrokerInterceptorWithClassLoaderTest.java      |   2 +-
 3 files changed, 105 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index faee5799289..3997e214f43 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -29,7 +29,6 @@ import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ClassLoaderSwitcher;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
@@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
 
     private final BrokerInterceptor interceptor;
-    private final NarClassLoader classLoader;
+    private final NarClassLoader narClassLoader;
 
     @Override
     public void beforeSendMessage(Subscription subscription,
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.beforeSendMessage(
                     subscription, entry, ackSet, msgMetadata);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
@@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
                                   long[] ackSet,
                                   MessageMetadata msgMetadata,
                                   Consumer consumer) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.beforeSendMessage(
                     subscription, entry, ackSet, msgMetadata, consumer);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
                                  Topic.PublishContext publishContext) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onMessagePublish(producer, headersAndPayload, 
publishContext);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void producerCreated(ServerCnx cnx, Producer producer,
                                 Map<String, String> metadata){
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.producerCreated(cnx, producer, metadata);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
@@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
     public void producerClosed(ServerCnx cnx,
                                Producer producer,
                                Map<String, String> metadata) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.producerClosed(cnx, producer, metadata);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
@@ -105,9 +124,12 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
     public void consumerCreated(ServerCnx cnx,
                                 Consumer consumer,
                                 Map<String, String> metadata) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
-            this.interceptor.consumerCreated(
-                    cnx, consumer, metadata);
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
+            this.interceptor.consumerCreated(cnx, consumer, metadata);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
@@ -115,8 +137,12 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
     public void consumerClosed(ServerCnx cnx,
                                Consumer consumer,
                                Map<String, String> metadata) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.consumerClosed(cnx, consumer, metadata);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
@@ -124,87 +150,140 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long 
startTimeNs, long ledgerId,
                                 long entryId, Topic.PublishContext 
publishContext) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.messageProduced(cnx, producer, startTimeNs, 
ledgerId, entryId, publishContext);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public  void messageDispatched(ServerCnx cnx, Consumer consumer, long 
ledgerId,
                                    long entryId, ByteBuf headersAndPayload) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.messageDispatched(cnx, consumer, ledgerId, 
entryId, headersAndPayload);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void messageAcked(ServerCnx cnx, Consumer consumer,
                              CommandAck ackCmd) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.messageAcked(cnx, consumer, ackCmd);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void txnOpened(long tcId, String txnID) {
-        this.interceptor.txnOpened(tcId, txnID);
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
+            this.interceptor.txnOpened(tcId, txnID);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
+        }
     }
 
     @Override
     public void txnEnded(String txnID, long txnAction) {
-        this.interceptor.txnEnded(txnID, txnAction);
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
+            this.interceptor.txnEnded(txnID, txnAction);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
+        }
     }
 
     @Override
     public void onConnectionCreated(ServerCnx cnx) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onConnectionCreated(cnx);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws 
InterceptException {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onPulsarCommand(command, cnx);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void onConnectionClosed(ServerCnx cnx) {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onConnectionClosed(cnx);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void onWebserviceRequest(ServletRequest request) throws 
IOException, ServletException, InterceptException {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onWebserviceRequest(request);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse 
response)
             throws IOException, ServletException {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.onWebserviceResponse(request, response);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void initialize(PulsarService pulsarService) throws Exception {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             this.interceptor.initialize(pulsarService);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
     }
 
     @Override
     public void close() {
-        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
             interceptor.close();
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
         }
+
         try {
-            classLoader.close();
+            narClassLoader.close();
         } catch (IOException e) {
             log.warn("Failed to close the broker interceptor class loader", e);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
index 5abe8a69ee4..979bf6cd0d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
@@ -65,7 +65,7 @@ public class BrokerInterceptorUtilsTest {
             BrokerInterceptorWithClassLoader returnedPhWithCL = 
BrokerInterceptorUtils.load(metadata, "");
             BrokerInterceptor returnedPh = returnedPhWithCL.getInterceptor();
 
-            assertSame(mockLoader, returnedPhWithCL.getClassLoader());
+            assertSame(mockLoader, returnedPhWithCL.getNarClassLoader());
             assertTrue(returnedPh instanceof MockBrokerInterceptor);
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index a2f97e16a76..64d4b5ee6cc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -135,7 +135,7 @@ public class BrokerInterceptorWithClassLoaderTest {
                 new BrokerInterceptorWithClassLoader(interceptor, narLoader);
         ClassLoader curClassLoader = 
Thread.currentThread().getContextClassLoader();
         // test class loader
-        assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), 
narLoader);
+        assertEquals(brokerInterceptorWithClassLoader.getNarClassLoader(), 
narLoader);
         // test initialize
         brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
         assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);

Reply via email to