codelipenghui commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757309992



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -189,6 +220,32 @@ public int 
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         return totalEntries;
     }
 
+    private EntryFilter.FilterResult getFilterResult(FilterContext 
filterContext, Entry entry) {
+        EntryFilter.FilterResult result = EntryFilter.FilterResult.REJECT;
+        for (EntryFilter entryFilter : entryFilters) {
+            if (entryFilter.filterEntry(entry, filterContext) == 
EntryFilter.FilterResult.ACCEPT) {
+                result = EntryFilter.FilterResult.ACCEPT;

Review comment:
       We should return ACCEPT only if all the filters accepted the entry? it 
more like the FilterChain of jetty 
http://www.servlets.com/javadoc/javax/servlet/FilterChain.html

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+    /**
+     * create entry filter instance.
+     */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> 
createEntryFilters(
+            ServiceConfiguration conf) throws IOException {
+        EntryFilterDefinitions definitions = 
searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = 
ImmutableMap.builder();
+        conf.getEntryFilterNames().forEach(filterName -> {
+            EntryFilterMetaData metaData = 
definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name 
`" + filterName
+                        + "`. Available entry filters are : " + 
definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            try {
+                filter = load(metaData, conf.getNarExtractionDirectory());
+                if (filter != null) {
+                    builder.put(filterName, filter);
+                }
+                log.info("Successfully loaded entry filter for name `{}`", 
filterName);
+            } catch (IOException e) {
+                log.error("Failed to load the entry filter for name `" + 
filterName + "`", e);
+                throw new RuntimeException("Failed to load the broker 
interceptor for name `" + filterName + "`");

Review comment:
       Can we throw the IOException directly? we will lose original stack if 
encounter IOException

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.SubscriptionOption;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+@Data
+public class FilterContext {
+    private EntryBatchSizes batchSizes;
+    private SendMessageInfo sendMessageInfo;
+    private EntryBatchIndexesAcks indexesAcks;
+    private ManagedCursor cursor;
+    private boolean isReplayRead;
+    private Subscription subscription;
+    private SubscriptionOption subscriptionOption;
+    private MessageMetadata msgMetadata;

Review comment:
       Do we need to expose all of them to the filter? I don't think the plugin 
need to filter the messages by batchSizes, sendMessageInfo, indexesAcks, 
cursor, isReplayRead?
   
   And if we expose Subscription, SubscriptionOption, MessageMetadata, users 
will get the write access, maybe we'd better to only expose the topic name, the 
subname and subType and a readonly MessageMetadata or a copy is more reasonable 
here? For the filter, it is always in the read-only mode.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -113,6 +130,7 @@ public int 
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        FilterContext filterContext = new FilterContext();

Review comment:
       And if the broker don't have a filter, we should not create such 
objects? we can use a FilterContextDisabled to instead?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -127,6 +145,19 @@ public int 
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, 
subscription.toString(), -1)
                     : msgMetadata;
+            if (CollectionUtils.isNotEmpty(entryFilters)) {
+                fillContext(filterContext, batchSizes, sendMessageInfo, 
indexesAcks, cursor, isReplayRead,
+                        msgMetadata, subscription);
+                EntryFilter.FilterResult result = 
getFilterResult(filterContext, entry);
+                if (EntryFilter.FilterResult.REJECT == result) {
+                    PositionImpl pos = (PositionImpl) entry.getPosition();
+                    entries.set(i, null);
+                    entry.release();
+                    
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual,

Review comment:
       If we have 100 entries to dispatch and 50 will be filtered out, we can 
improve here is only call the subscription.acknowledgeMessage() once. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -113,6 +130,7 @@ public int 
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        FilterContext filterContext = new FilterContext();

Review comment:
       Should reuse the FilterConext here? otherwise we will introduce more 
objects during the message dispatching




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to