This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a4c3034f52f [fix][broker] Execute per-topic entry filters with the same classloader (#19364) a4c3034f52f is described below commit a4c3034f52f857ae0f4daf5d366ea9e578133bc2 Author: Nicolò Boschi <boschi1...@gmail.com> AuthorDate: Wed Feb 1 20:55:30 2023 +0100 [fix][broker] Execute per-topic entry filters with the same classloader (#19364) --- .../pulsar/broker/service/AbstractTopic.java | 35 ++- .../pulsar/broker/service/BrokerService.java | 52 +--- .../pulsar/broker/service/EntryFilterSupport.java | 30 +-- .../org/apache/pulsar/broker/service/Topic.java | 5 +- .../service/nonpersistent/NonPersistentTopic.java | 6 +- .../broker/service/persistent/PersistentTopic.java | 6 +- .../service/plugin/EntryFilterDefinition.java | 2 + .../service/plugin/EntryFilterDefinitions.java | 28 --- .../broker/service/plugin/EntryFilterProvider.java | 188 +++++++++------ .../service/plugin/EntryFilterWithClassLoader.java | 8 + .../apache/pulsar/broker/admin/AdminApi2Test.java | 265 +++++++++++++++++++-- .../broker/service/AbstractBaseDispatcherTest.java | 16 +- .../broker/service/plugin/FilterEntryTest.java | 133 +++++++++-- .../pulsar/broker/stats/ConsumerStatsTest.java | 4 +- .../testcontext/MockEntryFilterProvider.java | 66 +++++ .../pulsar/common/policies/data/EntryFilters.java | 2 +- 16 files changed, 622 insertions(+), 224 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c9f95ab524f..4e095cd66ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -43,6 +43,7 @@ import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -53,7 +54,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyExcep import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; -import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; @@ -148,7 +149,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); - protected Map<String, EntryFilterWithClassLoader> entryFilters; + protected volatile Pair<String, List<EntryFilter>> entryFilters; public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; @@ -188,8 +189,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP return this.topicPolicies.getEntryFilters().get(); } - public Map<String, EntryFilterWithClassLoader> getEntryFilters() { - return this.entryFilters; + public List<EntryFilter> getEntryFilters() { + return this.entryFilters.getRight(); } public DispatchRateImpl getReplicatorDispatchRate() { @@ -240,6 +241,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced()); topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters()); this.subscriptionPolicies = data.getSubscriptionPolicies(); + + updateEntryFilters(); } protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { @@ -288,6 +291,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced); topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters); + + updateEntryFilters(); } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { @@ -384,6 +389,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced()); topicPolicies.getEntryFilters().updateBrokerValue(new EntryFilters(String.join(",", config.getEntryFilterNames()))); + + updateEntryFilters(); } private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) { @@ -1158,6 +1165,26 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP } } + public void updateEntryFilters() { + final EntryFilters entryFiltersPolicy = getEntryFiltersPolicy(); + if (entryFiltersPolicy == null || StringUtils.isBlank(entryFiltersPolicy.getEntryFilterNames())) { + entryFilters = Pair.of(null, Collections.emptyList()); + return; + } + final String entryFilterNames = entryFiltersPolicy.getEntryFilterNames(); + if (entryFilters != null && entryFilterNames.equals(entryFilters.getLeft())) { + return; + } + try { + final List<EntryFilter> filters = + brokerService.getEntryFilterProvider().loadEntryFiltersForPolicy(entryFiltersPolicy); + entryFilters = Pair.of(entryFilterNames, filters); + } catch (Throwable e) { + log.error("Failed to load entry filters on topic {}: {}", topic, e.getMessage()); + throw new RuntimeException(e); + } + } + public long getMsgInCounter() { return this.msgInCounter.longValue(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d88f040f11b..f7020963fb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -116,7 +116,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; -import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -147,7 +146,6 @@ import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -277,7 +275,7 @@ public class BrokerService implements Closeable { private boolean preciseTopicPublishRateLimitingEnable; private final LongAdder pausedConnections = new LongAdder(); private BrokerInterceptor interceptor; - private Map<String, EntryFilterWithClassLoader> entryFilters; + private final EntryFilterProvider entryFilterProvider; private TopicFactory topicFactory; private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors; @@ -324,9 +322,7 @@ public class BrokerService implements Closeable { new ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater")); this.authorizationService = new AuthorizationService( pulsar.getConfiguration(), pulsar().getPulsarResources()); - if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) { - this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration()); - } + this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); @@ -782,14 +778,8 @@ public class BrokerService implements Closeable { }); //close entry filters - if (entryFilters != null) { - entryFilters.forEach((name, filter) -> { - try { - filter.close(); - } catch (Exception e) { - log.warn("Error shutting down entry filter {}", name, e); - } - }); + if (entryFilterProvider != null) { + entryFilterProvider.close(); } CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>(); @@ -1189,27 +1179,13 @@ public class BrokerService implements Closeable { NonPersistentTopic nonPersistentTopic; try { nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); return FutureUtil.failedFuture(e); } CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); isOwner.thenRun(() -> { nonPersistentTopic.initialize() - .thenAccept(__ -> { - EntryFilters entryFiltersPolicy = nonPersistentTopic.getEntryFiltersPolicy(); - if (!entryFiltersPolicy.getEntryFilterNames().isEmpty()) { - try { - nonPersistentTopic.entryFilters = - EntryFilterProvider.createEntryFilters(pulsar.getConfig(), - entryFiltersPolicy); - } catch (IOException e) { - log.warn("Failed to set entry filters on topic {}-{}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); - } - } - }) .thenCompose(__ -> nonPersistentTopic.checkReplication()) .thenRun(() -> { log.info("Created topic {}", nonPersistentTopic); @@ -1577,22 +1553,6 @@ public class BrokerService implements Closeable { : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); persistentTopic .initialize() - .thenAccept(__ -> { - EntryFilters entryFiltersPolicy = persistentTopic.getEntryFiltersPolicy(); - if (!entryFiltersPolicy.getEntryFilterNames().isEmpty()) { - try { - persistentTopic.entryFilters = - EntryFilterProvider.createEntryFilters(pulsar.getConfig(), - entryFiltersPolicy); - } catch (IOException e) { - log.warn("Failed to set entry filters on topic {}-{}", topic, - e.getMessage()); - pulsar.getExecutor().execute(() -> - topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); - } - } - }) .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) .thenCompose(__ -> persistentTopic.checkReplication()) .thenCompose(v -> { @@ -1633,7 +1593,7 @@ public class BrokerService implements Closeable { return null; }); } catch (PulsarServerException e) { - log.warn("Failed to create topic {}-{}", topic, e.getMessage()); + log.warn("Failed to create topic {}: {}", topic, e.getMessage()); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java index 6c0f1f65c69..4a9b33a9afd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java @@ -20,22 +20,15 @@ package org.apache.pulsar.broker.service; import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.bookkeeper.mledger.Entry; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.pulsar.broker.service.plugin.EntryFilter; -import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.common.api.proto.MessageMetadata; public class EntryFilterSupport { - /** - * Entry filters in Broker. - * Not set to final, for the convenience of testing mock. - */ - protected final List<EntryFilterWithClassLoader> entryFilters; + protected final List<EntryFilter> entryFilters; protected final boolean hasFilter; protected final FilterContext filterContext; protected final Subscription subscription; @@ -43,19 +36,18 @@ public class EntryFilterSupport { public EntryFilterSupport(Subscription subscription) { this.subscription = subscription; if (subscription != null && subscription.getTopic() != null) { - if (MapUtils.isNotEmpty(subscription.getTopic() - .getBrokerService().getEntryFilters()) - && !subscription.getTopic().getBrokerService().pulsar() - .getConfiguration().isAllowOverrideEntryFilters()) { - this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().stream() - .toList(); + final BrokerService brokerService = subscription.getTopic().getBrokerService(); + final boolean allowOverrideEntryFilters = brokerService + .pulsar().getConfiguration().isAllowOverrideEntryFilters(); + if (!allowOverrideEntryFilters) { + this.entryFilters = brokerService.getEntryFilterProvider().getBrokerEntryFilters(); } else { - Map<String, EntryFilterWithClassLoader> entryFiltersMap = + List<EntryFilter> topicEntryFilters = subscription.getTopic().getEntryFilters(); - if (entryFiltersMap != null) { - this.entryFilters = subscription.getTopic().getEntryFilters().values().stream().toList(); + if (topicEntryFilters != null && !topicEntryFilters.isEmpty()) { + this.entryFilters = topicEntryFilters; } else { - this.entryFilters = Collections.emptyList(); + this.entryFilters = brokerService.getEntryFilterProvider().getBrokerEntryFilters(); } } this.filterContext = new FilterContext(); @@ -86,7 +78,7 @@ public class EntryFilterSupport { private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, - List<EntryFilterWithClassLoader> entryFilters) { + List<EntryFilter> entryFilters) { for (EntryFilter entryFilter : entryFilters) { EntryFilter.FilterResult filterResult = entryFilter.filterEntry(entry, filterContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3949df92cec..e6a29368dbb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; -import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; @@ -251,7 +252,7 @@ public interface Topic { EntryFilters getEntryFiltersPolicy(); - Map<String, EntryFilterWithClassLoader> getEntryFilters(); + List<EntryFilter> getEntryFilters(); BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index cf46103cc35..3b046570d73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -510,11 +510,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol } if (entryFilters != null) { - entryFilters.forEach((name, filter) -> { + entryFilters.getRight().forEach(filter -> { try { filter.close(); - } catch (Exception e) { - log.warn("Error shutting down entry filter {}", name, e); + } catch (Throwable e) { + log.warn("Error shutting down entry filter {}", filter, e); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d009d3778f2..20744102a31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1357,11 +1357,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal //close entry filters if (entryFilters != null) { - entryFilters.forEach((name, filter) -> { + entryFilters.getRight().forEach((filter) -> { try { filter.close(); - } catch (Exception e) { - log.warn("Error shutting down entry filter {}", name, e); + } catch (Throwable e) { + log.warn("Error shutting down entry filter {}", filter, e); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java index 36f39efa384..fd93a6be51e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service.plugin; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor +@AllArgsConstructor public class EntryFilterDefinition { /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java deleted file mode 100644 index 384e7e2fcf4..00000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service.plugin; - -import java.util.Map; -import java.util.TreeMap; -import lombok.Data; - -@Data -public class EntryFilterDefinitions { - private final Map<String, EntryFilterMetaData> filters = new TreeMap<>(); -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index 333f7f33339..db643f43fa8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -28,6 +28,12 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; @@ -37,74 +43,85 @@ import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.util.ObjectMapperFactory; @Slf4j -public class EntryFilterProvider { +public class EntryFilterProvider implements AutoCloseable { @VisibleForTesting static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter"; - /** - * create entry filter instance. - */ - public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(ServiceConfiguration conf, - EntryFilters entryFilters) + private final ServiceConfiguration serviceConfiguration; + @VisibleForTesting + protected Map<String, EntryFilterMetaData> definitions; + @VisibleForTesting + protected Map<String, NarClassLoader> cachedClassLoaders; + @VisibleForTesting + protected List<EntryFilter> brokerEntryFilters; + + public EntryFilterProvider(ServiceConfiguration conf) throws IOException { + this.serviceConfiguration = conf; + initialize(); + initializeBrokerEntryFilters(); + } + + protected void initializeBrokerEntryFilters() throws IOException { + if (!serviceConfiguration.getEntryFilterNames().isEmpty()) { + brokerEntryFilters = loadEntryFilters(serviceConfiguration.getEntryFilterNames()); + } else { + brokerEntryFilters = Collections.emptyList(); + } + } + + public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy) throws IOException { - EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(), - conf.getNarExtractionDirectory()); - ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder(); - for (String filterName : entryFilters.getEntryFilterNames().split(",")) { - EntryFilterMetaData metaData = definitions.getFilters().get(filterName); - if (null == metaData) { - throw new RuntimeException("No entry filter is found for name `" + filterName - + "`. Available entry filters are : " + definitions.getFilters()); - } - EntryFilterWithClassLoader filter; - filter = load(metaData, conf.getNarExtractionDirectory()); - if (filter != null) { - builder.put(filterName, filter); - } - log.info("Successfully loaded entry filter for name `{}` from topic policy", filterName); + final String names = policy.getEntryFilterNames(); + if (StringUtils.isBlank(names)) { + return Collections.emptyList(); } - return builder.build(); + final List<String> entryFilterList = Arrays.stream(names.split(",")) + .filter(n -> StringUtils.isNotBlank(n)) + .toList(); + return loadEntryFilters(entryFilterList); } - public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters( - ServiceConfiguration conf) throws IOException { - EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(), - conf.getNarExtractionDirectory()); - ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder(); - for (String filterName : conf.getEntryFilterNames()) { - EntryFilterMetaData metaData = definitions.getFilters().get(filterName); + private List<EntryFilter> loadEntryFilters(Collection<String> entryFilterNames) + throws IOException { + ImmutableMap.Builder<String, EntryFilter> builder = ImmutableMap.builder(); + for (String filterName : entryFilterNames) { + EntryFilterMetaData metaData = definitions.get(filterName); if (null == metaData) { throw new RuntimeException("No entry filter is found for name `" + filterName - + "`. Available entry filters are : " + definitions.getFilters()); + + "`. Available entry filters are : " + definitions.keySet()); } - EntryFilterWithClassLoader filter; - filter = load(metaData, conf.getNarExtractionDirectory()); - if (filter != null) { - builder.put(filterName, filter); - } - log.info("Successfully loaded entry filter for name `{}`", filterName); + final EntryFilter entryFilter = load(metaData); + builder.put(filterName, entryFilter); + log.info("Successfully loaded entry filter `{}`", filterName); } - return builder.build(); + return builder.build().values().asList(); } - private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory, - String narExtractionDirectory) - throws IOException { + public List<EntryFilter> getBrokerEntryFilters() { + return brokerEntryFilters; + } + + private void initialize() throws IOException { + final String entryFiltersDirectory = serviceConfiguration.getEntryFiltersDirectory(); Path path = Paths.get(entryFiltersDirectory).toAbsolutePath(); log.info("Searching for entry filters in {}", path); - EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions(); + if (!path.toFile().exists()) { log.info("Pulsar entry filters directory not found"); - return entryFilterDefinitions; + definitions = Collections.emptyMap(); + cachedClassLoaders = Collections.emptyMap(); + return; } + Map<String, EntryFilterMetaData> entryFilterDefinitions = new HashMap<>(); + cachedClassLoaders = new HashMap<>(); try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) { for (Path archive : stream) { try { - EntryFilterDefinition def = - getEntryFilterDefinition(archive.toString(), narExtractionDirectory); + final NarClassLoader narClassLoader = loadNarClassLoader(archive); + EntryFilterDefinition def = getEntryFilterDefinition(narClassLoader); log.info("Found entry filter from {} : {}", archive, def); checkArgument(StringUtils.isNotBlank(def.getName())); @@ -114,7 +131,7 @@ public class EntryFilterProvider { metadata.setDefinition(def); metadata.setArchivePath(archive); - entryFilterDefinitions.getFilters().put(def.getName(), metadata); + entryFilterDefinitions.put(def.getName(), metadata); } catch (Throwable t) { log.warn("Failed to load entry filters from {}." + " It is OK however if you want to use this entry filters," @@ -123,19 +140,8 @@ public class EntryFilterProvider { } } } - - return entryFilterDefinitions; - } - - private static EntryFilterDefinition getEntryFilterDefinition(String narPath, - String narExtractionDirectory) - throws IOException { - try (NarClassLoader ncl = NarClassLoaderBuilder.builder() - .narFile(new File(narPath)) - .extractionDirectory(narExtractionDirectory) - .build()) { - return getEntryFilterDefinition(ncl); - } + definitions = Collections.unmodifiableMap(entryFilterDefinitions); + cachedClassLoaders = Collections.unmodifiableMap(cachedClassLoaders); } @VisibleForTesting @@ -153,22 +159,19 @@ public class EntryFilterProvider { ); } - private static EntryFilterWithClassLoader load(EntryFilterMetaData metadata, - String narExtractionDirectory) + protected EntryFilter load(EntryFilterMetaData metadata) throws IOException { - final File narFile = metadata.getArchivePath().toAbsolutePath().toFile(); - NarClassLoader ncl = NarClassLoaderBuilder.builder() - .narFile(narFile) - .parentClassLoader(EntryFilter.class.getClassLoader()) - .extractionDirectory(narExtractionDirectory) - .build(); - EntryFilterDefinition def = getEntryFilterDefinition(ncl); + final EntryFilterDefinition def = metadata.getDefinition(); if (StringUtils.isBlank(def.getEntryFilterClass())) { - throw new IOException("Entry filters `" + def.getName() + "` does NOT provide a entry" + throw new RuntimeException("Entry filter `" + def.getName() + "` does NOT provide a entry" + " filters implementation"); } - try { + final NarClassLoader ncl = getNarClassLoader(metadata.getArchivePath()); + if (ncl == null) { + throw new RuntimeException("Entry filter `" + def.getName() + "` cannot be loaded, " + + "see the broker logs for further details"); + } Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass()); Object filter = entryFilterClass.getDeclaredConstructor().newInstance(); if (!(filter instanceof EntryFilter)) { @@ -177,12 +180,55 @@ public class EntryFilterProvider { } EntryFilter pi = (EntryFilter) filter; return new EntryFilterWithClassLoader(pi, ncl); - } catch (Exception e) { + } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; } - log.error("Failed to load class {}", def.getEntryFilterClass(), e); + log.error("Failed to load class {}", metadata.getDefinition().getEntryFilterClass(), e); throw new IOException(e); } } + + private NarClassLoader getNarClassLoader(Path archivePath) { + return cachedClassLoaders.get(classLoaderKey(archivePath)); + } + + private NarClassLoader loadNarClassLoader(Path archivePath) { + final String absolutePath = classLoaderKey(archivePath); + return cachedClassLoaders + .computeIfAbsent(absolutePath, narFilePath -> { + try { + final File narFile = archivePath.toAbsolutePath().toFile(); + return NarClassLoaderBuilder.builder() + .narFile(narFile) + .parentClassLoader(EntryFilter.class.getClassLoader()) + .extractionDirectory(serviceConfiguration.getNarExtractionDirectory()) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static String classLoaderKey(Path archivePath) { + return archivePath.toString(); + } + + @Override + public void close() throws Exception { + brokerEntryFilters.forEach((filter) -> { + try { + filter.close(); + } catch (Throwable e) { + log.warn("Error shutting down entry filter {}", filter, e); + } + }); + cachedClassLoaders.forEach((name, ncl) -> { + try { + ncl.close(); + } catch (Throwable e) { + log.warn("Error closing entry filter class loader {}", name, e); + } + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index 29a5dea119b..c5c57210877 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.broker.service.plugin; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.common.nar.NarClassLoader; @Slf4j +@ToString public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; @@ -38,6 +41,11 @@ public class EntryFilterWithClassLoader implements EntryFilter { return entryFilter.filterEntry(entry, context); } + @VisibleForTesting + public EntryFilter getEntryFilter() { + return entryFilter; + } + @Override public void close() { entryFilter.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 95b91fde1e1..c8ea5818d03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -56,6 +56,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -66,6 +67,13 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.EntryFilter2Test; +import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.plugin.EntryFilterTest; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.Mode; @@ -2236,35 +2244,244 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { @Test(timeOut = 30000) public void testSetNamespaceEntryFilters() throws Exception { - EntryFilters entryFilters = new EntryFilters( - "org.apache.pulsar.broker.service.plugin.EntryFilterTest"); + final MockEntryFilterProvider testEntryFilterProvider = + new MockEntryFilterProvider(conf); + + testEntryFilterProvider + .setMockEntryFilters(new EntryFilterDefinition( + "test", + null, + EntryFilterTest.class.getName() + )); + final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider(); + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", testEntryFilterProvider, true); - final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); - admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); - - assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); - - admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters); - assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), entryFilters); - admin.namespaces().removeNamespaceEntryFilters(myNamespace); - assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); + try { + EntryFilters entryFilters = new EntryFilters("test"); + + final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topicName = myNamespace + "/topic"; + admin.topics().createNonPartitionedTopic(topicName); + + assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); + assertEquals(pulsar + .getBrokerService() + .getTopic(topicName, false) + .get() + .get() + .getEntryFilters() + .size(), 0); + + admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters); + assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), entryFilters); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(topicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), "test"); + }); + + assertEquals(pulsar + .getBrokerService() + .getTopic(topicName, false) + .get() + .get() + .getEntryFilters() + .size(), 1); + admin.namespaces().removeNamespaceEntryFilters(myNamespace); + assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(topicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), ""); + }); + + assertEquals(pulsar + .getBrokerService() + .getTopic(topicName, false) + .get() + .get() + .getEntryFilters() + .size(), 0); + } finally { + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", oldEntryFilterProvider, true); + } } @Test(dataProvider = "topicType") public void testSetTopicLevelEntryFilters(String topicType) throws Exception { - EntryFilters entryFilters = new EntryFilters("org.apache.pulsar.broker.service.plugin.EntryFilterTest"); - final String topic = topicType + "://prop-xyz/ns1/test-schema-validation-enforced"; - admin.topics().createPartitionedTopic(topic, 1); - @Cleanup - Producer<byte[]> producer1 = pulsarClient.newProducer() - .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0) - .create(); - assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); - admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters); - Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, - false), entryFilters)); - admin.topicPolicies().removeEntryFiltersPerTopic(topic); - assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + final MockEntryFilterProvider testEntryFilterProvider = + new MockEntryFilterProvider(conf); + + testEntryFilterProvider + .setMockEntryFilters(new EntryFilterDefinition( + "test", + null, + EntryFilterTest.class.getName() + )); + final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider(); + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", testEntryFilterProvider, true); + try { + EntryFilters entryFilters = new EntryFilters("test"); + final String topic = topicType + "://prop-xyz/ns1/test-schema-validation-enforced"; + admin.topics().createPartitionedTopic(topic, 1); + final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0; + @Cleanup + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic(fullTopicName) + .create(); + assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters() + .size(), 0); + admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, + false), entryFilters)); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), "test"); + }); + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters() + .size(), 1); + admin.topicPolicies().removeEntryFiltersPerTopic(topic); + assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), ""); + }); + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters() + .size(), 0); + } finally { + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", oldEntryFilterProvider, true); + } + } + + @Test(timeOut = 30000) + public void testSetEntryFiltersHierarchy() throws Exception { + final MockEntryFilterProvider testEntryFilterProvider = + new MockEntryFilterProvider(conf); + conf.setEntryFilterNames(List.of("test", "test1")); + + testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition( + "test", + null, + EntryFilterTest.class.getName() + ), new EntryFilterDefinition( + "test1", + null, + EntryFilter2Test.class.getName() + )); + final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider(); + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", testEntryFilterProvider, true); + try { + + final String topic = "persistent://prop-xyz/ns1/test-schema-validation-enforced"; + admin.topics().createPartitionedTopic(topic, 1); + final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0; + @Cleanup + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic(fullTopicName) + .create(); + assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters() + .size(), 2); + + EntryFilters nsEntryFilters = new EntryFilters("test"); + admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1", nsEntryFilters); + assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"), nsEntryFilters); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), "test"); + }); + + Awaitility.await().untilAsserted(() -> { + final List<EntryFilter> entryFilters = pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters(); + assertEquals(entryFilters.size(), 1); + assertEquals(((EntryFilterWithClassLoader)entryFilters.get(0)) + .getEntryFilter().getClass(), EntryFilterTest.class); + + }); + + + EntryFilters topicEntryFilters = new EntryFilters("test1"); + admin.topicPolicies().setEntryFiltersPerTopic(topic, topicEntryFilters); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, + false), topicEntryFilters)); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFiltersPolicy() + .getEntryFilterNames(), "test1"); + }); + final List<EntryFilter> entryFilters = pulsar + .getBrokerService() + .getTopic(fullTopicName, false) + .get() + .get() + .getEntryFilters(); + assertEquals(entryFilters.size(), 1); + assertEquals(((EntryFilterWithClassLoader) entryFilters.get(0)) + .getEntryFilter().getClass(), EntryFilter2Test.class); + + } finally { + FieldUtils.writeField(pulsar.getBrokerService(), + "entryFilterProvider", oldEntryFilterProvider, true); + } } @Test(timeOut = 30000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index 554ef1c3f96..cc2ec3444d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -29,19 +29,19 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; -import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -87,13 +87,19 @@ public class AbstractBaseDispatcherTest { Topic mockTopic = mock(Topic.class); when(this.subscriptionMock.getTopic()).thenReturn(mockTopic); + final EntryFilterProvider entryFilterProvider = mock(EntryFilterProvider.class); + final ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + when(serviceConfiguration.isAllowOverrideEntryFilters()).thenReturn(true); + final PulsarService pulsar = mock(PulsarService.class); + when(pulsar.getConfiguration()).thenReturn(serviceConfiguration); BrokerService mockBrokerService = mock(BrokerService.class); + when(mockBrokerService.pulsar()).thenReturn(pulsar); + when(mockBrokerService.getEntryFilterProvider()).thenReturn(entryFilterProvider); when(mockTopic.getBrokerService()).thenReturn(mockBrokerService); - EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class); + EntryFilter mockFilter = mock(EntryFilter.class); when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( EntryFilter.FilterResult.REJECT); - Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter); - when(mockTopic.getEntryFilters()).thenReturn(entryFilters); + when(mockTopic.getEntryFilters()).thenReturn(List.of(mockFilter)); DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class); this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index d5131e5df7b..4b9d91fbde2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -23,12 +23,14 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; + import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; @@ -37,9 +39,14 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Dispatcher; @@ -59,6 +66,7 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -92,22 +100,15 @@ public class FilterEntryTest extends BrokerTestBase { .getTopicReference(topic).get(); // set topic level entry filters - EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class); + EntryFilter mockFilter = mock(EntryFilter.class); when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( EntryFilter.FilterResult.REJECT); - Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter); - - Field field = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters"); - field.setAccessible(true); - field.set(topicRef, entryFilters); + setMockFilterToTopic(topicRef, List.of(mockFilter)); - EntryFilterWithClassLoader mockFilter1 = mock(EntryFilterWithClassLoader.class); + EntryFilter mockFilter1 = mock(EntryFilter.class); when(mockFilter1.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( EntryFilter.FilterResult.ACCEPT); - Map<String, EntryFilterWithClassLoader> entryFilters1 = Map.of("key2", mockFilter1); - Field field2 = pulsar.getBrokerService().getClass().getDeclaredField("entryFilters"); - field2.setAccessible(true); - field2.set(pulsar.getBrokerService(), entryFilters1); + setMockBrokerFilter(List.of(mockFilter1)); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) .subscriptionInitialPosition(Earliest) @@ -148,11 +149,22 @@ public class FilterEntryTest extends BrokerTestBase { consumer.close(); } + @SneakyThrows + private void setMockFilterToTopic(PersistentTopic topicRef, List<EntryFilter> mockFilter) { + FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null, mockFilter), true); + } + + @SneakyThrows + private void setMockBrokerFilter(List<EntryFilter> mockFilter) { + FieldUtils.writeField(pulsar.getBrokerService().getEntryFilterProvider(), + "brokerEntryFilters", mockFilter, true); + } + @Test public void testFilter() throws Exception { Map<String, String> map = new HashMap<>(); - map.put("1","1"); - map.put("2","2"); + map.put("1", "1"); + map.put("2", "2"); String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); String subName = "sub"; Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) @@ -266,9 +278,7 @@ public class FilterEntryTest extends BrokerTestBase { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() .getTopicReference(topic).get(); - Field field1 = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters"); - field1.setAccessible(true); - field1.set(topicRef, Map.of("1", loader1, "2", loader2)); + setMockFilterToTopic(topicRef, List.of(loader1, loader2)); cleanup(); verify(loader1, times(1)).close(); @@ -471,7 +481,7 @@ public class FilterEntryTest extends BrokerTestBase { int numEntriesAccepted, int numMessagesAccepted, int numEntriesRejected, int numMessagesRejected, int numEntriesRescheduled, int numMessagesRescheduled - ) throws Exception { + ) throws Exception { AnalyzeSubscriptionBacklogResult a1 = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty()); @@ -485,4 +495,93 @@ public class FilterEntryTest extends BrokerTestBase { Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages()); Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages()); } + + + @DataProvider(name = "overrideBrokerEntryFilters") + public static Object[][] overrideBrokerEntryFilters() { + return new Object[][]{ {true}, {false} }; + } + + + @Test(dataProvider = "overrideBrokerEntryFilters") + public void testExecuteInOrder(boolean overrideBrokerEntryFilters) throws Exception { + conf.setAllowOverrideEntryFilters(true); + String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); + String subName = "sub"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topic).create(); + for (int i = 0; i < 10; i++) { + producer.send("test"); + } + + EntryFilter mockFilterReject = mock(EntryFilter.class); + when(mockFilterReject.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( + EntryFilter.FilterResult.REJECT); + EntryFilter mockFilterAccept = mock(EntryFilter.class); + when(mockFilterAccept.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( + EntryFilter.FilterResult.ACCEPT); + if (overrideBrokerEntryFilters) { + setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(), List.of(mockFilterReject, mockFilterAccept)); + } else { + setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(), List.of()); + setMockBrokerFilter(List.of(mockFilterReject, mockFilterAccept)); + } + + + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionInitialPosition(Earliest) + .subscriptionName(subName).subscribe(); + + int counter = 0; + while (true) { + Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer.acknowledge(message); + } else { + break; + } + } + // All normal messages can be received + assertEquals(0, counter); + consumer.close(); + verify(mockFilterReject, times(10)) + .filterEntry(any(Entry.class), any(FilterContext.class)); + verify(mockFilterAccept, never()) + .filterEntry(any(Entry.class), any(FilterContext.class)); + + if (overrideBrokerEntryFilters) { + setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(), List.of(mockFilterAccept, mockFilterReject)); + } else { + setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(), List.of()); + setMockBrokerFilter(List.of(mockFilterAccept, mockFilterReject)); + } + + @Cleanup + Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionInitialPosition(Earliest) + .subscriptionName(subName + "-2").subscribe(); + + counter = 0; + while (true) { + Message<String> message = consumer2.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer2.acknowledge(message); + } else { + break; + } + } + assertEquals(0, counter); + verify(mockFilterReject, times(20)) + .filterEntry(any(Entry.class), any(FilterContext.class)); + verify(mockFilterAccept, times(10)) + .filterEntry(any(Entry.class), any(FilterContext.class)); + + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 13f1b3cc8e2..bbeee9f5a49 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -374,6 +375,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase { @Test public void testAvgMessagesPerEntry() throws Exception { + conf.setAllowOverrideEntryFilters(true); final String topic = "persistent://public/default/testFilterState"; String subName = "sub"; @@ -406,7 +408,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase { EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter, narClassLoader); - Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("filter", loader); + Pair<String, List<EntryFilter>> entryFilters = Pair.of("filter", List.of(loader)); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() .getTopicReference(topic).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java new file mode 100644 index 00000000000..425f4ee41a7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.testcontext; + +import lombok.SneakyThrows; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition; +import org.apache.pulsar.broker.service.plugin.EntryFilterMetaData; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.common.nar.NarClassLoader; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockEntryFilterProvider extends EntryFilterProvider { + + public MockEntryFilterProvider(ServiceConfiguration config) throws IOException { + super(config); + } + + @SneakyThrows + public void setMockEntryFilters(EntryFilterDefinition... defs) { + definitions = new HashMap<>(); + cachedClassLoaders = new HashMap<>(); + brokerEntryFilters = new ArrayList<>(); + + for (EntryFilterDefinition def : defs) { + final String name = def.getName(); + final EntryFilterMetaData meta = new EntryFilterMetaData(); + meta.setDefinition(def); + meta.setArchivePath(Path.of(name)); + definitions.put(name, meta); + final NarClassLoader ncl = mock(NarClassLoader.class); + + when(ncl.loadClass(anyString())).thenAnswer(a -> { + final Object argument = a.getArguments()[0]; + return Thread.currentThread().getContextClassLoader().loadClass(argument.toString()); + }); + cachedClassLoaders.put(Path.of(name).toString(), ncl); + } + initializeBrokerEntryFilters(); + } + +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java index 5192e9bad3a..5ebe793d14b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java @@ -28,7 +28,7 @@ import lombok.NoArgsConstructor; public class EntryFilters { /** - * The class name for the entry filter. + * Entry filters class names separated by a comma. */ private String entryFilterNames;