This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bfa2d86e4c6 [Proxy] Remove unused methods and code (#15420)
bfa2d86e4c6 is described below
commit bfa2d86e4c63ec12792a48695f1f9a5b0be9032f
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 4 10:40:20 2022 +0300
[Proxy] Remove unused methods and code (#15420)
### Motivation
There is some unused code in the pulsar-proxy module that can be removed.
### Modifications
- remove unused methods and code
- make one refactoring to fix a warning about generics type
---
.../proxy/server/BrokerDiscoveryProvider.java | 63 ----------------------
.../pulsar/proxy/server/BrokerProxyValidator.java | 2 +-
.../pulsar/proxy/server/DirectProxyHandler.java | 8 +--
.../pulsar/proxy/server/ParserProxyHandler.java | 4 +-
4 files changed, 6 insertions(+), 71 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index c66b91e7c8c..b781c7c3298 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -23,20 +23,15 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.classification.InterfaceStability;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
@@ -60,8 +55,6 @@ public class BrokerDiscoveryProvider implements Closeable {
private final ScheduledExecutorService scheduledExecutorScheduler =
Executors.newScheduledThreadPool(4,
new DefaultThreadFactory("pulsar-proxy-scheduled-executor"));
- private static final String PARTITIONED_TOPIC_PATH_ZNODE =
"partitioned-topics";
-
public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources
pulsarResources)
throws PulsarServerException {
try {
@@ -102,62 +95,6 @@ public class BrokerDiscoveryProvider implements Closeable {
}
}
- CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(ProxyService service,
- TopicName topicName, String role, AuthenticationDataSource
authenticationData) {
-
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
- try {
- checkAuthorization(service, topicName, role, authenticationData);
-
pulsarResources.getNamespaceResources().getPartitionedTopicResources()
- .getPartitionedTopicMetadataAsync(topicName)
- .thenAccept(metadata -> {
- // if the partitioned topic is not found in zk, then
the topic
- // is not partitioned
- if (metadata.isPresent()) {
- metadataFuture.complete(metadata.get());
- } else {
- metadataFuture.complete(new
PartitionedTopicMetadata());
- }
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex);
- return null;
- });
- } catch (Exception e) {
- metadataFuture.completeExceptionally(e);
- }
- return metadataFuture;
- }
-
- protected void checkAuthorization(ProxyService service, TopicName
topicName, String role,
- AuthenticationDataSource authenticationData) throws Exception {
- if (!service.getConfiguration().isAuthorizationEnabled()
- ||
service.getConfiguration().getSuperUserRoles().contains(role)) {
- // No enforcing of authorization policies
- return;
- }
- // get zk policy manager
- if (!service.getAuthorizationService().canLookup(topicName, role,
authenticationData)) {
- LOG.warn("[{}] Role {} is not allowed to lookup topic", topicName,
role);
- // check namespace authorization
- TenantInfo tenantInfo;
- try {
- tenantInfo =
pulsarResources.getTenantResources().getTenant(topicName.getTenant())
- .orElseThrow(() -> new
IllegalAccessException("Property does not exist"));
- } catch (Exception e) {
- LOG.error("Failed to get property admin data for property");
- throw new IllegalAccessException(String.format("Failed to get
property %s admin data due to %s",
- topicName.getTenant(), e.getMessage()));
- }
- if (!service.getAuthorizationService()
- .isTenantAdmin(topicName.getTenant(), role, tenantInfo,
authenticationData).get()) {
- throw new IllegalAccessException("Don't have permission to
administrate resources on this tenant");
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully authorized {} on property {}", role,
topicName.getTenant());
- }
- }
-
@Override
public void close() throws IOException {
metadataStoreCacheLoader.close();
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
index b0529c2a777..ab44e163bad 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -126,7 +126,7 @@ public class BrokerProxyValidator {
return NettyFutureUtil.toCompletableFuture(
inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host,
port)))
.thenCompose(resolvedAddress -> {
- CompletableFuture<InetSocketAddress> result = new
CompletableFuture();
+ CompletableFuture<InetSocketAddress> result = new
CompletableFuture<>();
if (isIPAddressAllowed(resolvedAddress)) {
result.complete(resolvedAddress);
} else {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index a632f0e7372..ef426d96651 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -379,20 +379,20 @@ public class DirectProxyHandler {
0, 4, 0, 4));
inboundChannel.pipeline().addBefore("handler",
"inboundParser",
- new ParserProxyHandler(service, inboundChannel,
+ new ParserProxyHandler(service,
ParserProxyHandler.FRONTEND_CONN,
connected.getMaxMessageSize(),
outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service, outboundChannel,
+ new ParserProxyHandler(service,
ParserProxyHandler.BACKEND_CONN,
connected.getMaxMessageSize(),
inboundChannel.id()));
} else {
inboundChannel.pipeline().addBefore("handler",
"inboundParser",
- new ParserProxyHandler(service, inboundChannel,
+ new ParserProxyHandler(service,
ParserProxyHandler.FRONTEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE,
outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service, outboundChannel,
+ new ParserProxyHandler(service,
ParserProxyHandler.BACKEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE,
inboundChannel.id()));
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 41a9b594f93..0e81029e356 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
- private final Channel channel;
//inbound
protected static final String FRONTEND_CONN = "frontendconn";
//outbound
@@ -68,10 +67,9 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
*/
private static final Map<String, String> consumerHashMap = new
ConcurrentHashMap<>();
- public ParserProxyHandler(ProxyService service, Channel channel, String
type, int maxMessageSize,
+ public ParserProxyHandler(ProxyService service, String type, int
maxMessageSize,
ChannelId peerChannelId) {
this.service = service;
- this.channel = channel;
this.connType = type;
this.maxMessageSize = maxMessageSize;
this.peerChannelId = peerChannelId;