This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 956164fe6c6 [improve][broker] avoid creating new objects when intercepting (#22790) 956164fe6c6 is described below commit 956164fe6c62a78c4bd178d553a13715b1b83de7 Author: Qiang Zhao <mattisonc...@apache.org> AuthorDate: Tue May 28 22:45:30 2024 +0800 [improve][broker] avoid creating new objects when intercepting (#22790) --- .../BrokerInterceptorWithClassLoader.java | 127 +++++++++++++++++---- .../intercept/BrokerInterceptorUtilsTest.java | 2 +- .../BrokerInterceptorWithClassLoaderTest.java | 2 +- 3 files changed, 105 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index faee5799289..3997e214f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -29,7 +29,6 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { private final BrokerInterceptor interceptor; - private final NarClassLoader classLoader; + private final NarClassLoader narClassLoader; @Override public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata, consumer); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata){ - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.producerCreated(cnx, producer, metadata); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void producerClosed(ServerCnx cnx, Producer producer, Map<String, String> metadata) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.producerClosed(cnx, producer, metadata); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @@ -105,9 +124,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - this.interceptor.consumerCreated( - cnx, consumer, metadata); + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); + this.interceptor.consumerCreated(cnx, consumer, metadata); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @@ -115,8 +137,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void consumerClosed(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.consumerClosed(cnx, consumer, metadata); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @@ -124,87 +150,140 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { @Override public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId, long entryId, Topic.PublishContext publishContext) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId, long entryId, ByteBuf headersAndPayload) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.messageAcked(cnx, consumer, ackCmd); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void txnOpened(long tcId, String txnID) { - this.interceptor.txnOpened(tcId, txnID); + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); + this.interceptor.txnOpened(tcId, txnID); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); + } } @Override public void txnEnded(String txnID, long txnAction) { - this.interceptor.txnEnded(txnID, txnAction); + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); + this.interceptor.txnEnded(txnID, txnAction); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); + } } @Override public void onConnectionCreated(ServerCnx cnx) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onConnectionCreated(cnx); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onPulsarCommand(command, cnx); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onConnectionClosed(ServerCnx cnx) { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onConnectionClosed(cnx); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onWebserviceRequest(request); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onWebserviceResponse(request, response); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void initialize(PulsarService pulsarService) throws Exception { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.initialize(pulsarService); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void close() { - try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); interceptor.close(); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); } + try { - classLoader.close(); + narClassLoader.close(); } catch (IOException e) { log.warn("Failed to close the broker interceptor class loader", e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java index 5abe8a69ee4..979bf6cd0d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java @@ -65,7 +65,7 @@ public class BrokerInterceptorUtilsTest { BrokerInterceptorWithClassLoader returnedPhWithCL = BrokerInterceptorUtils.load(metadata, ""); BrokerInterceptor returnedPh = returnedPhWithCL.getInterceptor(); - assertSame(mockLoader, returnedPhWithCL.getClassLoader()); + assertSame(mockLoader, returnedPhWithCL.getNarClassLoader()); assertTrue(returnedPh instanceof MockBrokerInterceptor); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java index a2f97e16a76..64d4b5ee6cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java @@ -135,7 +135,7 @@ public class BrokerInterceptorWithClassLoaderTest { new BrokerInterceptorWithClassLoader(interceptor, narLoader); ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader(); // test class loader - assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), narLoader); + assertEquals(brokerInterceptorWithClassLoader.getNarClassLoader(), narLoader); // test initialize brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class)); assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);