This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bfa2d86e4c6 [Proxy] Remove unused methods and code (#15420)
bfa2d86e4c6 is described below

commit bfa2d86e4c63ec12792a48695f1f9a5b0be9032f
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 4 10:40:20 2022 +0300

    [Proxy] Remove unused methods and code (#15420)
    
    ### Motivation
    
    There is some unused code in the pulsar-proxy module that can be removed.
    
    ### Modifications
    
    - remove unused methods and code
    - make one refactoring to fix a warning about generics type
---
 .../proxy/server/BrokerDiscoveryProvider.java      | 63 ----------------------
 .../pulsar/proxy/server/BrokerProxyValidator.java  |  2 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    |  8 +--
 .../pulsar/proxy/server/ParserProxyHandler.java    |  4 +-
 4 files changed, 6 insertions(+), 71 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index c66b91e7c8c..b781c7c3298 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -23,20 +23,15 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.classification.InterfaceStability;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
@@ -60,8 +55,6 @@ public class BrokerDiscoveryProvider implements Closeable {
     private final ScheduledExecutorService scheduledExecutorScheduler = 
Executors.newScheduledThreadPool(4,
             new DefaultThreadFactory("pulsar-proxy-scheduled-executor"));
 
-    private static final String PARTITIONED_TOPIC_PATH_ZNODE = 
"partitioned-topics";
-
     public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources 
pulsarResources)
             throws PulsarServerException {
         try {
@@ -102,62 +95,6 @@ public class BrokerDiscoveryProvider implements Closeable {
         }
     }
 
-    CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(ProxyService service,
-            TopicName topicName, String role, AuthenticationDataSource 
authenticationData) {
-
-        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
-        try {
-            checkAuthorization(service, topicName, role, authenticationData);
-            
pulsarResources.getNamespaceResources().getPartitionedTopicResources()
-                    .getPartitionedTopicMetadataAsync(topicName)
-                    .thenAccept(metadata -> {
-                        // if the partitioned topic is not found in zk, then 
the topic
-                        // is not partitioned
-                        if (metadata.isPresent()) {
-                            metadataFuture.complete(metadata.get());
-                        } else {
-                            metadataFuture.complete(new 
PartitionedTopicMetadata());
-                        }
-                    }).exceptionally(ex -> {
-                        metadataFuture.completeExceptionally(ex);
-                        return null;
-                    });
-        } catch (Exception e) {
-            metadataFuture.completeExceptionally(e);
-        }
-        return metadataFuture;
-    }
-
-    protected void checkAuthorization(ProxyService service, TopicName 
topicName, String role,
-            AuthenticationDataSource authenticationData) throws Exception {
-        if (!service.getConfiguration().isAuthorizationEnabled()
-                || 
service.getConfiguration().getSuperUserRoles().contains(role)) {
-            // No enforcing of authorization policies
-            return;
-        }
-        // get zk policy manager
-        if (!service.getAuthorizationService().canLookup(topicName, role, 
authenticationData)) {
-            LOG.warn("[{}] Role {} is not allowed to lookup topic", topicName, 
role);
-            // check namespace authorization
-            TenantInfo tenantInfo;
-            try {
-                tenantInfo = 
pulsarResources.getTenantResources().getTenant(topicName.getTenant())
-                        .orElseThrow(() -> new 
IllegalAccessException("Property does not exist"));
-            } catch (Exception e) {
-                LOG.error("Failed to get property admin data for property");
-                throw new IllegalAccessException(String.format("Failed to get 
property %s admin data due to %s",
-                        topicName.getTenant(), e.getMessage()));
-            }
-            if (!service.getAuthorizationService()
-                    .isTenantAdmin(topicName.getTenant(), role, tenantInfo, 
authenticationData).get()) {
-                throw new IllegalAccessException("Don't have permission to 
administrate resources on this tenant");
-            }
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Successfully authorized {} on property {}", role, 
topicName.getTenant());
-        }
-    }
-
     @Override
     public void close() throws IOException {
         metadataStoreCacheLoader.close();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
index b0529c2a777..ab44e163bad 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -126,7 +126,7 @@ public class BrokerProxyValidator {
             return NettyFutureUtil.toCompletableFuture(
                             
inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, 
port)))
                     .thenCompose(resolvedAddress -> {
-                        CompletableFuture<InetSocketAddress> result = new 
CompletableFuture();
+                        CompletableFuture<InetSocketAddress> result = new 
CompletableFuture<>();
                         if (isIPAddressAllowed(resolvedAddress)) {
                             result.complete(resolvedAddress);
                         } else {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index a632f0e7372..ef426d96651 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -379,20 +379,20 @@ public class DirectProxyHandler {
                                     0, 4, 0, 4));
 
                     inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                            new ParserProxyHandler(service, inboundChannel,
+                            new ParserProxyHandler(service,
                                     ParserProxyHandler.FRONTEND_CONN,
                                     connected.getMaxMessageSize(), 
outboundChannel.id()));
                     
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                            new ParserProxyHandler(service, outboundChannel,
+                            new ParserProxyHandler(service,
                                     ParserProxyHandler.BACKEND_CONN,
                                     connected.getMaxMessageSize(), 
inboundChannel.id()));
                 } else {
                     inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                            new ParserProxyHandler(service, inboundChannel,
+                            new ParserProxyHandler(service,
                                     ParserProxyHandler.FRONTEND_CONN,
                                     Commands.DEFAULT_MAX_MESSAGE_SIZE, 
outboundChannel.id()));
                     
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                            new ParserProxyHandler(service, outboundChannel,
+                            new ParserProxyHandler(service,
                                     ParserProxyHandler.BACKEND_CONN,
                                     Commands.DEFAULT_MAX_MESSAGE_SIZE, 
inboundChannel.id()));
                 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 41a9b594f93..0e81029e356 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
 
 
-    private final Channel channel;
     //inbound
     protected static final String FRONTEND_CONN = "frontendconn";
     //outbound
@@ -68,10 +67,9 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
      */
     private static final Map<String, String> consumerHashMap = new 
ConcurrentHashMap<>();
 
-    public ParserProxyHandler(ProxyService service, Channel channel, String 
type, int maxMessageSize,
+    public ParserProxyHandler(ProxyService service, String type, int 
maxMessageSize,
                               ChannelId peerChannelId) {
         this.service = service;
-        this.channel = channel;
         this.connType = type;
         this.maxMessageSize = maxMessageSize;
         this.peerChannelId = peerChannelId;

Reply via email to