This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a4c3034f52f [fix][broker] Execute per-topic entry filters with the 
same classloader (#19364)
a4c3034f52f is described below

commit a4c3034f52f857ae0f4daf5d366ea9e578133bc2
Author: Nicolò Boschi <boschi1...@gmail.com>
AuthorDate: Wed Feb 1 20:55:30 2023 +0100

    [fix][broker] Execute per-topic entry filters with the same classloader 
(#19364)
---
 .../pulsar/broker/service/AbstractTopic.java       |  35 ++-
 .../pulsar/broker/service/BrokerService.java       |  52 +---
 .../pulsar/broker/service/EntryFilterSupport.java  |  30 +--
 .../org/apache/pulsar/broker/service/Topic.java    |   5 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   6 +-
 .../broker/service/persistent/PersistentTopic.java |   6 +-
 .../service/plugin/EntryFilterDefinition.java      |   2 +
 .../service/plugin/EntryFilterDefinitions.java     |  28 ---
 .../broker/service/plugin/EntryFilterProvider.java | 188 +++++++++------
 .../service/plugin/EntryFilterWithClassLoader.java |   8 +
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 265 +++++++++++++++++++--
 .../broker/service/AbstractBaseDispatcherTest.java |  16 +-
 .../broker/service/plugin/FilterEntryTest.java     | 133 +++++++++--
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   4 +-
 .../testcontext/MockEntryFilterProvider.java       |  66 +++++
 .../pulsar/common/policies/data/EntryFilters.java  |   2 +-
 16 files changed, 622 insertions(+), 224 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index c9f95ab524f..4e095cd66ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -43,6 +43,7 @@ import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -53,7 +54,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyExcep
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -148,7 +149,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
     protected final LongAdder bytesOutFromRemovedSubscriptions = new 
LongAdder();
-    protected Map<String, EntryFilterWithClassLoader> entryFilters;
+    protected volatile Pair<String, List<EntryFilter>> entryFilters;
 
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
@@ -188,8 +189,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return this.topicPolicies.getEntryFilters().get();
     }
 
-    public Map<String, EntryFilterWithClassLoader> getEntryFilters() {
-        return this.entryFilters;
+    public List<EntryFilter> getEntryFilters() {
+        return this.entryFilters.getRight();
     }
 
     public DispatchRateImpl getReplicatorDispatchRate() {
@@ -240,6 +241,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
         
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
         this.subscriptionPolicies = data.getSubscriptionPolicies();
+
+        updateEntryFilters();
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
@@ -288,6 +291,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         updateNamespaceDispatchRate(namespacePolicies, 
brokerService.getPulsar().getConfig().getClusterName());
         
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
         
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);
+
+        updateEntryFilters();
     }
 
     private void updateNamespaceDispatchRate(Policies namespacePolicies, 
String cluster) {
@@ -384,6 +389,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
         topicPolicies.getEntryFilters().updateBrokerValue(new 
EntryFilters(String.join(",",
                 config.getEntryFilterNames())));
+
+        updateEntryFilters();
     }
 
     private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) 
{
@@ -1158,6 +1165,26 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         }
     }
 
+    public void updateEntryFilters() {
+        final EntryFilters entryFiltersPolicy = getEntryFiltersPolicy();
+        if (entryFiltersPolicy == null || 
StringUtils.isBlank(entryFiltersPolicy.getEntryFilterNames())) {
+            entryFilters = Pair.of(null, Collections.emptyList());
+            return;
+        }
+        final String entryFilterNames = 
entryFiltersPolicy.getEntryFilterNames();
+        if (entryFilters != null && 
entryFilterNames.equals(entryFilters.getLeft())) {
+            return;
+        }
+        try {
+            final List<EntryFilter> filters =
+                    
brokerService.getEntryFilterProvider().loadEntryFiltersForPolicy(entryFiltersPolicy);
+            entryFilters = Pair.of(entryFilterNames, filters);
+        } catch (Throwable e) {
+            log.error("Failed to load entry filters on topic {}: {}", topic, 
e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
     public long getMsgInCounter() {
         return this.msgInCounter.longValue();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d88f040f11b..f7020963fb7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -116,7 +116,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -147,7 +146,6 @@ import 
org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -277,7 +275,7 @@ public class BrokerService implements Closeable {
     private boolean preciseTopicPublishRateLimitingEnable;
     private final LongAdder pausedConnections = new LongAdder();
     private BrokerInterceptor interceptor;
-    private Map<String, EntryFilterWithClassLoader> entryFilters;
+    private final EntryFilterProvider entryFilterProvider;
     private TopicFactory topicFactory;
 
     private Set<BrokerEntryMetadataInterceptor> 
brokerEntryMetadataInterceptors;
@@ -324,9 +322,7 @@ public class BrokerService implements Closeable {
                 new 
ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
-        if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
-            this.entryFilters = 
EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
-        }
+        this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
 
         
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
         
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
@@ -782,14 +778,8 @@ public class BrokerService implements Closeable {
             });
 
             //close entry filters
-            if (entryFilters != null) {
-                entryFilters.forEach((name, filter) -> {
-                    try {
-                        filter.close();
-                    } catch (Exception e) {
-                        log.warn("Error shutting down entry filter {}", name, 
e);
-                    }
-                });
+            if (entryFilterProvider != null) {
+                entryFilterProvider.close();
             }
 
             CompletableFuture<CompletableFuture<Void>> 
cancellableDownstreamFutureReference = new CompletableFuture<>();
@@ -1189,27 +1179,13 @@ public class BrokerService implements Closeable {
         NonPersistentTopic nonPersistentTopic;
         try {
             nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             log.warn("Failed to create topic {}", topic, e);
             return FutureUtil.failedFuture(e);
         }
         CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
         isOwner.thenRun(() -> {
             nonPersistentTopic.initialize()
-                    .thenAccept(__ -> {
-                        EntryFilters entryFiltersPolicy = 
nonPersistentTopic.getEntryFiltersPolicy();
-                        if 
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
-                            try {
-                                nonPersistentTopic.entryFilters =
-                                        
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
-                                                entryFiltersPolicy);
-                            } catch (IOException e) {
-                                log.warn("Failed to set entry filters on topic 
{}-{}", topic, e.getMessage());
-                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
-                                topicFuture.completeExceptionally(e);
-                            }
-                        }
-                    })
                     .thenCompose(__ -> nonPersistentTopic.checkReplication())
                     .thenRun(() -> {
                         log.info("Created topic {}", nonPersistentTopic);
@@ -1577,22 +1553,6 @@ public class BrokerService implements Closeable {
                                         : newTopic(topic, ledger, 
BrokerService.this, PersistentTopic.class);
                                 persistentTopic
                                         .initialize()
-                                        .thenAccept(__ -> {
-                                            EntryFilters entryFiltersPolicy = 
persistentTopic.getEntryFiltersPolicy();
-                                            if 
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
-                                                try {
-                                                    
persistentTopic.entryFilters =
-                                                            
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
-                                                                    
entryFiltersPolicy);
-                                                } catch (IOException e) {
-                                                    log.warn("Failed to set 
entry filters on topic {}-{}", topic,
-                                                            e.getMessage());
-                                                    
pulsar.getExecutor().execute(() ->
-                                                            
topics.remove(topic, topicFuture));
-                                                    
topicFuture.completeExceptionally(e);
-                                                }
-                                            }
-                                        })
                                         .thenCompose(__ -> 
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
                                         .thenCompose(__ -> 
persistentTopic.checkReplication())
                                         .thenCompose(v -> {
@@ -1633,7 +1593,7 @@ public class BrokerService implements Closeable {
                                             return null;
                                         });
                             } catch (PulsarServerException e) {
-                                log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
+                                log.warn("Failed to create topic {}: {}", 
topic, e.getMessage());
                                 pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 topicFuture.completeExceptionally(e);
                             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
index 6c0f1f65c69..4a9b33a9afd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
@@ -20,22 +20,15 @@ package org.apache.pulsar.broker.service;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.service.plugin.FilterContext;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
 public class EntryFilterSupport {
 
-    /**
-     * Entry filters in Broker.
-     * Not set to final, for the convenience of testing mock.
-     */
-    protected final List<EntryFilterWithClassLoader> entryFilters;
+    protected final List<EntryFilter> entryFilters;
     protected final boolean hasFilter;
     protected final FilterContext filterContext;
     protected final Subscription subscription;
@@ -43,19 +36,18 @@ public class EntryFilterSupport {
     public EntryFilterSupport(Subscription subscription) {
         this.subscription = subscription;
         if (subscription != null && subscription.getTopic() != null) {
-            if (MapUtils.isNotEmpty(subscription.getTopic()
-                    .getBrokerService().getEntryFilters())
-                    && !subscription.getTopic().getBrokerService().pulsar()
-                    .getConfiguration().isAllowOverrideEntryFilters()) {
-                this.entryFilters = 
subscription.getTopic().getBrokerService().getEntryFilters().values().stream()
-                        .toList();
+            final BrokerService brokerService = 
subscription.getTopic().getBrokerService();
+            final boolean allowOverrideEntryFilters = brokerService
+                    .pulsar().getConfiguration().isAllowOverrideEntryFilters();
+            if (!allowOverrideEntryFilters) {
+                this.entryFilters = 
brokerService.getEntryFilterProvider().getBrokerEntryFilters();
             } else {
-                Map<String, EntryFilterWithClassLoader> entryFiltersMap =
+                List<EntryFilter> topicEntryFilters =
                         subscription.getTopic().getEntryFilters();
-                if (entryFiltersMap != null) {
-                    this.entryFilters = 
subscription.getTopic().getEntryFilters().values().stream().toList();
+                if (topicEntryFilters != null && !topicEntryFilters.isEmpty()) 
{
+                    this.entryFilters = topicEntryFilters;
                 } else {
-                    this.entryFilters = Collections.emptyList();
+                    this.entryFilters = 
brokerService.getEntryFilterProvider().getBrokerEntryFilters();
                 }
             }
             this.filterContext = new FilterContext();
@@ -86,7 +78,7 @@ public class EntryFilterSupport {
 
 
     private static EntryFilter.FilterResult getFilterResult(FilterContext 
filterContext, Entry entry,
-                                                            
List<EntryFilterWithClassLoader> entryFilters) {
+                                                            List<EntryFilter> 
entryFilters) {
         for (EntryFilter entryFilter : entryFilters) {
             EntryFilter.FilterResult filterResult =
                     entryFilter.filterEntry(entry, filterContext);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 3949df92cec..e6a29368dbb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import io.netty.buffer.ByteBuf;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -251,7 +252,7 @@ public interface Topic {
 
     EntryFilters getEntryFiltersPolicy();
 
-    Map<String, EntryFilterWithClassLoader> getEntryFilters();
+    List<EntryFilter> getEntryFilters();
 
     BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index cf46103cc35..3b046570d73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -510,11 +510,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         }
 
         if (entryFilters != null) {
-            entryFilters.forEach((name, filter) -> {
+            entryFilters.getRight().forEach(filter -> {
                 try {
                     filter.close();
-                } catch (Exception e) {
-                    log.warn("Error shutting down entry filter {}", name, e);
+                } catch (Throwable e) {
+                    log.warn("Error shutting down entry filter {}", filter, e);
                 }
             });
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d009d3778f2..20744102a31 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1357,11 +1357,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         //close entry filters
         if (entryFilters != null) {
-            entryFilters.forEach((name, filter) -> {
+            entryFilters.getRight().forEach((filter) -> {
                 try {
                     filter.close();
-                } catch (Exception e) {
-                    log.warn("Error shutting down entry filter {}", name, e);
+                } catch (Throwable e) {
+                    log.warn("Error shutting down entry filter {}", filter, e);
                 }
             });
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
index 36f39efa384..fd93a6be51e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service.plugin;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
 public class EntryFilterDefinition {
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
deleted file mode 100644
index 384e7e2fcf4..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.plugin;
-
-import java.util.Map;
-import java.util.TreeMap;
-import lombok.Data;
-
-@Data
-public class EntryFilterDefinitions {
-    private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index 333f7f33339..db643f43fa8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -28,6 +28,12 @@ import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -37,74 +43,85 @@ import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Slf4j
-public class EntryFilterProvider {
+public class EntryFilterProvider implements AutoCloseable {
 
     @VisibleForTesting
     static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter";
 
-    /**
-     * create entry filter instance.
-     */
-    public static ImmutableMap<String, EntryFilterWithClassLoader> 
createEntryFilters(ServiceConfiguration conf,
-                                                                               
       EntryFilters entryFilters)
+    private final ServiceConfiguration serviceConfiguration;
+    @VisibleForTesting
+    protected Map<String, EntryFilterMetaData> definitions;
+    @VisibleForTesting
+    protected Map<String, NarClassLoader> cachedClassLoaders;
+    @VisibleForTesting
+    protected List<EntryFilter> brokerEntryFilters;
+
+    public EntryFilterProvider(ServiceConfiguration conf) throws IOException {
+        this.serviceConfiguration = conf;
+        initialize();
+        initializeBrokerEntryFilters();
+    }
+
+    protected void initializeBrokerEntryFilters() throws IOException {
+        if (!serviceConfiguration.getEntryFilterNames().isEmpty()) {
+            brokerEntryFilters = 
loadEntryFilters(serviceConfiguration.getEntryFilterNames());
+        } else {
+            brokerEntryFilters = Collections.emptyList();
+        }
+    }
+
+    public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
             throws IOException {
-        EntryFilterDefinitions definitions = 
searchForEntryFilters(conf.getEntryFiltersDirectory(),
-                conf.getNarExtractionDirectory());
-        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = 
ImmutableMap.builder();
-        for (String filterName : 
entryFilters.getEntryFilterNames().split(",")) {
-            EntryFilterMetaData metaData = 
definitions.getFilters().get(filterName);
-            if (null == metaData) {
-                throw new RuntimeException("No entry filter is found for name 
`" + filterName
-                        + "`. Available entry filters are : " + 
definitions.getFilters());
-            }
-            EntryFilterWithClassLoader filter;
-            filter = load(metaData, conf.getNarExtractionDirectory());
-            if (filter != null) {
-                builder.put(filterName, filter);
-            }
-            log.info("Successfully loaded entry filter for name `{}` from 
topic policy", filterName);
+        final String names = policy.getEntryFilterNames();
+        if (StringUtils.isBlank(names)) {
+            return Collections.emptyList();
         }
-        return builder.build();
+        final List<String> entryFilterList = Arrays.stream(names.split(","))
+                .filter(n -> StringUtils.isNotBlank(n))
+                .toList();
+        return loadEntryFilters(entryFilterList);
     }
 
-    public static ImmutableMap<String, EntryFilterWithClassLoader> 
createEntryFilters(
-            ServiceConfiguration conf) throws IOException {
-        EntryFilterDefinitions definitions = 
searchForEntryFilters(conf.getEntryFiltersDirectory(),
-                conf.getNarExtractionDirectory());
-        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = 
ImmutableMap.builder();
-        for (String filterName : conf.getEntryFilterNames()) {
-            EntryFilterMetaData metaData = 
definitions.getFilters().get(filterName);
+    private List<EntryFilter> loadEntryFilters(Collection<String> 
entryFilterNames)
+            throws IOException {
+        ImmutableMap.Builder<String, EntryFilter> builder = 
ImmutableMap.builder();
+        for (String filterName : entryFilterNames) {
+            EntryFilterMetaData metaData = definitions.get(filterName);
             if (null == metaData) {
                 throw new RuntimeException("No entry filter is found for name 
`" + filterName
-                        + "`. Available entry filters are : " + 
definitions.getFilters());
+                        + "`. Available entry filters are : " + 
definitions.keySet());
             }
-            EntryFilterWithClassLoader filter;
-            filter = load(metaData, conf.getNarExtractionDirectory());
-            if (filter != null) {
-                builder.put(filterName, filter);
-            }
-            log.info("Successfully loaded entry filter for name `{}`", 
filterName);
+            final EntryFilter entryFilter = load(metaData);
+            builder.put(filterName, entryFilter);
+            log.info("Successfully loaded entry filter `{}`", filterName);
         }
-        return builder.build();
+        return builder.build().values().asList();
     }
 
-    private static EntryFilterDefinitions searchForEntryFilters(String 
entryFiltersDirectory,
-                                                                            
String narExtractionDirectory)
-            throws IOException {
+    public List<EntryFilter> getBrokerEntryFilters() {
+        return brokerEntryFilters;
+    }
+
+    private void initialize() throws IOException {
+        final String entryFiltersDirectory = 
serviceConfiguration.getEntryFiltersDirectory();
         Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
         log.info("Searching for entry filters in {}", path);
 
-        EntryFilterDefinitions entryFilterDefinitions = new 
EntryFilterDefinitions();
+
         if (!path.toFile().exists()) {
             log.info("Pulsar entry filters directory not found");
-            return entryFilterDefinitions;
+            definitions = Collections.emptyMap();
+            cachedClassLoaders = Collections.emptyMap();
+            return;
         }
+        Map<String, EntryFilterMetaData> entryFilterDefinitions = new 
HashMap<>();
 
+        cachedClassLoaders = new HashMap<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, 
"*.nar")) {
             for (Path archive : stream) {
                 try {
-                    EntryFilterDefinition def =
-                            getEntryFilterDefinition(archive.toString(), 
narExtractionDirectory);
+                    final NarClassLoader narClassLoader = 
loadNarClassLoader(archive);
+                    EntryFilterDefinition def = 
getEntryFilterDefinition(narClassLoader);
                     log.info("Found entry filter from {} : {}", archive, def);
 
                     checkArgument(StringUtils.isNotBlank(def.getName()));
@@ -114,7 +131,7 @@ public class EntryFilterProvider {
                     metadata.setDefinition(def);
                     metadata.setArchivePath(archive);
 
-                    entryFilterDefinitions.getFilters().put(def.getName(), 
metadata);
+                    entryFilterDefinitions.put(def.getName(), metadata);
                 } catch (Throwable t) {
                     log.warn("Failed to load entry filters from {}."
                             + " It is OK however if you want to use this entry 
filters,"
@@ -123,19 +140,8 @@ public class EntryFilterProvider {
                 }
             }
         }
-
-        return entryFilterDefinitions;
-    }
-
-    private static EntryFilterDefinition getEntryFilterDefinition(String 
narPath,
-                                                                              
String narExtractionDirectory)
-            throws IOException {
-        try (NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                .narFile(new File(narPath))
-                .extractionDirectory(narExtractionDirectory)
-                .build()) {
-            return getEntryFilterDefinition(ncl);
-        }
+        definitions = Collections.unmodifiableMap(entryFilterDefinitions);
+        cachedClassLoaders = Collections.unmodifiableMap(cachedClassLoaders);
     }
 
     @VisibleForTesting
@@ -153,22 +159,19 @@ public class EntryFilterProvider {
         );
     }
 
-    private static EntryFilterWithClassLoader load(EntryFilterMetaData 
metadata,
-                                                               String 
narExtractionDirectory)
+    protected EntryFilter load(EntryFilterMetaData metadata)
             throws IOException {
-        final File narFile = 
metadata.getArchivePath().toAbsolutePath().toFile();
-        NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                .narFile(narFile)
-                .parentClassLoader(EntryFilter.class.getClassLoader())
-                .extractionDirectory(narExtractionDirectory)
-                .build();
-        EntryFilterDefinition def = getEntryFilterDefinition(ncl);
+        final EntryFilterDefinition def = metadata.getDefinition();
         if (StringUtils.isBlank(def.getEntryFilterClass())) {
-            throw new IOException("Entry filters `" + def.getName() + "` does 
NOT provide a entry"
+            throw new RuntimeException("Entry filter `" + def.getName() + "` 
does NOT provide a entry"
                     + " filters implementation");
         }
-
         try {
+            final NarClassLoader ncl = 
getNarClassLoader(metadata.getArchivePath());
+            if (ncl == null) {
+                throw new RuntimeException("Entry filter `" + def.getName() + 
"` cannot be loaded, "
+                        + "see the broker logs for further details");
+            }
             Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass());
             Object filter = 
entryFilterClass.getDeclaredConstructor().newInstance();
             if (!(filter instanceof EntryFilter)) {
@@ -177,12 +180,55 @@ public class EntryFilterProvider {
             }
             EntryFilter pi = (EntryFilter) filter;
             return new EntryFilterWithClassLoader(pi, ncl);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             if (e instanceof IOException) {
                 throw (IOException) e;
             }
-            log.error("Failed to load class {}", def.getEntryFilterClass(), e);
+            log.error("Failed to load class {}", 
metadata.getDefinition().getEntryFilterClass(), e);
             throw new IOException(e);
         }
     }
+
+    private NarClassLoader getNarClassLoader(Path archivePath) {
+        return cachedClassLoaders.get(classLoaderKey(archivePath));
+    }
+
+    private NarClassLoader loadNarClassLoader(Path archivePath) {
+        final String absolutePath = classLoaderKey(archivePath);
+        return cachedClassLoaders
+                .computeIfAbsent(absolutePath, narFilePath -> {
+                    try {
+                        final File narFile = 
archivePath.toAbsolutePath().toFile();
+                        return NarClassLoaderBuilder.builder()
+                                .narFile(narFile)
+                                
.parentClassLoader(EntryFilter.class.getClassLoader())
+                                
.extractionDirectory(serviceConfiguration.getNarExtractionDirectory())
+                                .build();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    private static String classLoaderKey(Path archivePath) {
+        return archivePath.toString();
+    }
+
+    @Override
+    public void close() throws Exception {
+        brokerEntryFilters.forEach((filter) -> {
+            try {
+                filter.close();
+            } catch (Throwable e) {
+                log.warn("Error shutting down entry filter {}", filter, e);
+            }
+        });
+        cachedClassLoaders.forEach((name, ncl) -> {
+            try {
+                ncl.close();
+            } catch (Throwable e) {
+                log.warn("Error closing entry filter class loader {}", name, 
e);
+            }
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
index 29a5dea119b..c5c57210877 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.broker.service.plugin;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.common.nar.NarClassLoader;
 
 @Slf4j
+@ToString
 public class EntryFilterWithClassLoader implements EntryFilter {
     private final EntryFilter entryFilter;
     private final NarClassLoader classLoader;
@@ -38,6 +41,11 @@ public class EntryFilterWithClassLoader implements 
EntryFilter {
         return entryFilter.filterEntry(entry, context);
     }
 
+    @VisibleForTesting
+    public EntryFilter getEntryFilter() {
+        return entryFilter;
+    }
+
     @Override
     public void close() {
         entryFilter.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 95b91fde1e1..c8ea5818d03 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -56,6 +56,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -66,6 +67,13 @@ import 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilter2Test;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.Mode;
@@ -2236,35 +2244,244 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 30000)
     public void testSetNamespaceEntryFilters() throws Exception {
-        EntryFilters entryFilters = new EntryFilters(
-                "org.apache.pulsar.broker.service.plugin.EntryFilterTest");
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(conf);
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        final EntryFilterProvider oldEntryFilterProvider = 
pulsar.getBrokerService().getEntryFilterProvider();
+        FieldUtils.writeField(pulsar.getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
 
-        final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
-        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
-
-        assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
-
-        admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters);
-        assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), 
entryFilters);
-        admin.namespaces().removeNamespaceEntryFilters(myNamespace);
-        assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+        try {
+            EntryFilters entryFilters = new EntryFilters("test");
+
+            final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+            admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+            final String topicName = myNamespace + "/topic";
+            admin.topics().createNonPartitionedTopic(topicName);
+
+            
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(topicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 0);
+
+            admin.namespaces().setNamespaceEntryFilters(myNamespace, 
entryFilters);
+            
assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), 
entryFilters);
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(topicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "test");
+            });
+
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(topicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 1);
+            admin.namespaces().removeNamespaceEntryFilters(myNamespace);
+            
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(topicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "");
+            });
+
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(topicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 0);
+        } finally {
+            FieldUtils.writeField(pulsar.getBrokerService(),
+                    "entryFilterProvider", oldEntryFilterProvider, true);
+        }
     }
 
     @Test(dataProvider = "topicType")
     public void testSetTopicLevelEntryFilters(String topicType) throws 
Exception {
-        EntryFilters entryFilters = new 
EntryFilters("org.apache.pulsar.broker.service.plugin.EntryFilterTest");
-        final String topic = topicType + 
"://prop-xyz/ns1/test-schema-validation-enforced";
-        admin.topics().createPartitionedTopic(topic, 1);
-        @Cleanup
-        Producer<byte[]> producer1 = pulsarClient.newProducer()
-                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
-                .create();
-        assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
-        admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
-        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
-                false), entryFilters));
-        admin.topicPolicies().removeEntryFiltersPerTopic(topic);
-        assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(conf);
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        final EntryFilterProvider oldEntryFilterProvider = 
pulsar.getBrokerService().getEntryFilterProvider();
+        FieldUtils.writeField(pulsar.getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+        try {
+            EntryFilters entryFilters = new EntryFilters("test");
+            final String topic = topicType + 
"://prop-xyz/ns1/test-schema-validation-enforced";
+            admin.topics().createPartitionedTopic(topic, 1);
+            final String fullTopicName = topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
+            @Cleanup
+            Producer<byte[]> producer1 = pulsarClient.newProducer()
+                    .topic(fullTopicName)
+                    .create();
+            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(fullTopicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 0);
+            admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
+            Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
+                    false), entryFilters));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(fullTopicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "test");
+            });
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(fullTopicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 1);
+            admin.topicPolicies().removeEntryFiltersPerTopic(topic);
+            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(fullTopicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "");
+            });
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(fullTopicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 0);
+        } finally {
+            FieldUtils.writeField(pulsar.getBrokerService(),
+                    "entryFilterProvider", oldEntryFilterProvider, true);
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void testSetEntryFiltersHierarchy() throws Exception {
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(conf);
+        conf.setEntryFilterNames(List.of("test", "test1"));
+
+        testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ), new EntryFilterDefinition(
+                        "test1",
+                        null,
+                        EntryFilter2Test.class.getName()
+                ));
+        final EntryFilterProvider oldEntryFilterProvider = 
pulsar.getBrokerService().getEntryFilterProvider();
+        FieldUtils.writeField(pulsar.getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+        try {
+
+            final String topic = 
"persistent://prop-xyz/ns1/test-schema-validation-enforced";
+            admin.topics().createPartitionedTopic(topic, 1);
+            final String fullTopicName = topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
+            @Cleanup
+            Producer<byte[]> producer1 = pulsarClient.newProducer()
+                    .topic(fullTopicName)
+                    .create();
+            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+            assertEquals(pulsar
+                    .getBrokerService()
+                    .getTopic(fullTopicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters()
+                    .size(), 2);
+
+            EntryFilters nsEntryFilters = new EntryFilters("test");
+            admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1", 
nsEntryFilters);
+            
assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"), 
nsEntryFilters);
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(fullTopicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "test");
+            });
+
+            Awaitility.await().untilAsserted(() -> {
+                final List<EntryFilter> entryFilters = pulsar
+                        .getBrokerService()
+                        .getTopic(fullTopicName, false)
+                        .get()
+                        .get()
+                        .getEntryFilters();
+                assertEquals(entryFilters.size(), 1);
+                assertEquals(((EntryFilterWithClassLoader)entryFilters.get(0))
+                        .getEntryFilter().getClass(), EntryFilterTest.class);
+
+            });
+
+
+            EntryFilters topicEntryFilters = new EntryFilters("test1");
+            admin.topicPolicies().setEntryFiltersPerTopic(topic, 
topicEntryFilters);
+            Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
+                    false), topicEntryFilters));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(pulsar
+                        .getBrokerService()
+                        .getTopic(fullTopicName, false)
+                        .get()
+                        .get()
+                        .getEntryFiltersPolicy()
+                        .getEntryFilterNames(), "test1");
+            });
+            final List<EntryFilter> entryFilters = pulsar
+                    .getBrokerService()
+                    .getTopic(fullTopicName, false)
+                    .get()
+                    .get()
+                    .getEntryFilters();
+            assertEquals(entryFilters.size(), 1);
+            assertEquals(((EntryFilterWithClassLoader) entryFilters.get(0))
+                    .getEntryFilter().getClass(), EntryFilter2Test.class);
+
+        } finally {
+            FieldUtils.writeField(pulsar.getBrokerService(),
+                    "entryFilterProvider", oldEntryFilterProvider, true);
+        }
     }
 
     @Test(timeOut = 30000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 554ef1c3f96..cc2ec3444d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -29,19 +29,19 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
 import org.apache.pulsar.broker.service.plugin.FilterContext;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -87,13 +87,19 @@ public class AbstractBaseDispatcherTest {
         Topic mockTopic = mock(Topic.class);
         when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
 
+        final EntryFilterProvider entryFilterProvider = 
mock(EntryFilterProvider.class);
+        final ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
+        
when(serviceConfiguration.isAllowOverrideEntryFilters()).thenReturn(true);
+        final PulsarService pulsar = mock(PulsarService.class);
+        when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
         BrokerService mockBrokerService = mock(BrokerService.class);
+        when(mockBrokerService.pulsar()).thenReturn(pulsar);
+        
when(mockBrokerService.getEntryFilterProvider()).thenReturn(entryFilterProvider);
         when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
-        EntryFilterWithClassLoader mockFilter = 
mock(EntryFilterWithClassLoader.class);
+        EntryFilter mockFilter = mock(EntryFilter.class);
         when(mockFilter.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
                 EntryFilter.FilterResult.REJECT);
-        Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", 
mockFilter);
-        when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+        when(mockTopic.getEntryFilters()).thenReturn(List.of(mockFilter));
         DispatchRateLimiter subscriptionDispatchRateLimiter = 
mock(DispatchRateLimiter.class);
 
         this.helper = new 
AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index d5131e5df7b..4b9d91fbde2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -23,12 +23,14 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static 
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
+
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
@@ -37,9 +39,14 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -59,6 +66,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -92,22 +100,15 @@ public class FilterEntryTest extends BrokerTestBase {
                 .getTopicReference(topic).get();
 
         // set topic level entry filters
-        EntryFilterWithClassLoader mockFilter = 
mock(EntryFilterWithClassLoader.class);
+        EntryFilter mockFilter = mock(EntryFilter.class);
         when(mockFilter.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
                 EntryFilter.FilterResult.REJECT);
-        Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", 
mockFilter);
-
-        Field field = 
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
-        field.setAccessible(true);
-        field.set(topicRef, entryFilters);
+        setMockFilterToTopic(topicRef, List.of(mockFilter));
 
-        EntryFilterWithClassLoader mockFilter1 = 
mock(EntryFilterWithClassLoader.class);
+        EntryFilter mockFilter1 = mock(EntryFilter.class);
         when(mockFilter1.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
                 EntryFilter.FilterResult.ACCEPT);
-        Map<String, EntryFilterWithClassLoader> entryFilters1 = Map.of("key2", 
mockFilter1);
-        Field field2 = 
pulsar.getBrokerService().getClass().getDeclaredField("entryFilters");
-        field2.setAccessible(true);
-        field2.set(pulsar.getBrokerService(), entryFilters1);
+        setMockBrokerFilter(List.of(mockFilter1));
 
         Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
                 .subscriptionInitialPosition(Earliest)
@@ -148,11 +149,22 @@ public class FilterEntryTest extends BrokerTestBase {
         consumer.close();
     }
 
+    @SneakyThrows
+    private void setMockFilterToTopic(PersistentTopic topicRef, 
List<EntryFilter> mockFilter) {
+        FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null, 
mockFilter), true);
+    }
+
+    @SneakyThrows
+    private void setMockBrokerFilter(List<EntryFilter> mockFilter) {
+        
FieldUtils.writeField(pulsar.getBrokerService().getEntryFilterProvider(),
+                "brokerEntryFilters", mockFilter, true);
+    }
+
     @Test
     public void testFilter() throws Exception {
         Map<String, String> map = new HashMap<>();
-        map.put("1","1");
-        map.put("2","2");
+        map.put("1", "1");
+        map.put("2", "2");
         String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
         String subName = "sub";
         Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
@@ -266,9 +278,7 @@ public class FilterEntryTest extends BrokerTestBase {
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
                 .getTopicReference(topic).get();
-        Field field1 = 
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
-        field1.setAccessible(true);
-        field1.set(topicRef, Map.of("1", loader1, "2", loader2));
+        setMockFilterToTopic(topicRef, List.of(loader1, loader2));
 
         cleanup();
         verify(loader1, times(1)).close();
@@ -471,7 +481,7 @@ public class FilterEntryTest extends BrokerTestBase {
                                int numEntriesAccepted, int numMessagesAccepted,
                                int numEntriesRejected, int numMessagesRejected,
                                int numEntriesRescheduled, int 
numMessagesRescheduled
-                               ) throws Exception {
+    ) throws Exception {
         AnalyzeSubscriptionBacklogResult a1
                 = admin.topics().analyzeSubscriptionBacklog(topic, 
subscription, Optional.empty());
 
@@ -485,4 +495,93 @@ public class FilterEntryTest extends BrokerTestBase {
         Assert.assertEquals(numMessagesRejected, 
a1.getFilterRejectedMessages());
         Assert.assertEquals(numMessagesRescheduled, 
a1.getFilterRescheduledMessages());
     }
+
+
+    @DataProvider(name = "overrideBrokerEntryFilters")
+    public static Object[][] overrideBrokerEntryFilters() {
+        return new Object[][]{ {true}, {false} };
+    }
+
+
+    @Test(dataProvider = "overrideBrokerEntryFilters")
+    public void testExecuteInOrder(boolean overrideBrokerEntryFilters) throws 
Exception {
+        conf.setAllowOverrideEntryFilters(true);
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("test");
+        }
+
+        EntryFilter mockFilterReject = mock(EntryFilter.class);
+        when(mockFilterReject.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
+                EntryFilter.FilterResult.REJECT);
+        EntryFilter mockFilterAccept = mock(EntryFilter.class);
+        when(mockFilterAccept.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
+                EntryFilter.FilterResult.ACCEPT);
+        if (overrideBrokerEntryFilters) {
+            setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+                    .getTopicReference(topic).get(), List.of(mockFilterReject, 
mockFilterAccept));
+        } else {
+            setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+                    .getTopicReference(topic).get(), List.of());
+            setMockBrokerFilter(List.of(mockFilterReject, mockFilterAccept));
+        }
+
+
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionInitialPosition(Earliest)
+                .subscriptionName(subName).subscribe();
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        // All normal messages can be received
+        assertEquals(0, counter);
+        consumer.close();
+        verify(mockFilterReject, times(10))
+                .filterEntry(any(Entry.class), any(FilterContext.class));
+        verify(mockFilterAccept, never())
+                .filterEntry(any(Entry.class), any(FilterContext.class));
+
+        if (overrideBrokerEntryFilters) {
+            setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+                    .getTopicReference(topic).get(), List.of(mockFilterAccept, 
mockFilterReject));
+        } else {
+            setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+                    .getTopicReference(topic).get(), List.of());
+            setMockBrokerFilter(List.of(mockFilterAccept, mockFilterReject));
+        }
+
+        @Cleanup
+        Consumer<String> consumer2 = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionInitialPosition(Earliest)
+                .subscriptionName(subName + "-2").subscribe();
+
+        counter = 0;
+        while (true) {
+            Message<String> message = consumer2.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer2.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        assertEquals(0, counter);
+        verify(mockFilterReject, times(20))
+                .filterEntry(any(Entry.class), any(FilterContext.class));
+        verify(mockFilterAccept, times(10))
+                .filterEntry(any(Entry.class), any(FilterContext.class));
+
+
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 13f1b3cc8e2..bbeee9f5a49 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -374,6 +375,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
 
     @Test
     public void testAvgMessagesPerEntry() throws Exception {
+        conf.setAllowOverrideEntryFilters(true);
         final String topic = "persistent://public/default/testFilterState";
         String subName = "sub";
 
@@ -406,7 +408,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
         EntryFilterWithClassLoader
                 loader = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
                 narClassLoader);
-        Map<String, EntryFilterWithClassLoader> entryFilters = 
Map.of("filter", loader);
+        Pair<String, List<EntryFilter>> entryFilters = Pair.of("filter", 
List.of(loader));
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
                 .getTopicReference(topic).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
new file mode 100644
index 00000000000..425f4ee41a7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterMetaData;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockEntryFilterProvider extends EntryFilterProvider {
+
+    public MockEntryFilterProvider(ServiceConfiguration config) throws 
IOException {
+        super(config);
+    }
+
+    @SneakyThrows
+    public void setMockEntryFilters(EntryFilterDefinition... defs) {
+        definitions = new HashMap<>();
+        cachedClassLoaders = new HashMap<>();
+        brokerEntryFilters = new ArrayList<>();
+
+        for (EntryFilterDefinition def : defs) {
+            final String name = def.getName();
+            final EntryFilterMetaData meta = new EntryFilterMetaData();
+            meta.setDefinition(def);
+            meta.setArchivePath(Path.of(name));
+            definitions.put(name, meta);
+            final NarClassLoader ncl = mock(NarClassLoader.class);
+
+            when(ncl.loadClass(anyString())).thenAnswer(a -> {
+                final Object argument = a.getArguments()[0];
+                return 
Thread.currentThread().getContextClassLoader().loadClass(argument.toString());
+            });
+            cachedClassLoaders.put(Path.of(name).toString(), ncl);
+        }
+        initializeBrokerEntryFilters();
+    }
+
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
index 5192e9bad3a..5ebe793d14b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
@@ -28,7 +28,7 @@ import lombok.NoArgsConstructor;
 public class EntryFilters {
 
     /**
-     * The class name for the entry filter.
+     * Entry filters class names separated by a comma.
      */
     private String entryFilterNames;
 

Reply via email to