This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new cba8800 Enable CheckStyle Plugin in Pulsar proxy (#13343) cba8800 is described below commit cba8800de1013d0e8ac81f43ecb040a55978c358 Author: ZhangJian He <shoot...@gmail.com> AuthorDate: Thu Dec 16 08:39:28 2021 +0800 Enable CheckStyle Plugin in Pulsar proxy (#13343) --- .../src/main/resources/pulsar/suppressions.xml | 2 + pulsar-proxy/pom.xml | 13 +++ .../proxy/extensions/ExtensionsDefinitions.java | 5 +- .../pulsar/proxy/extensions/ProxyExtension.java | 5 +- .../proxy/extensions/ProxyExtensionMetadata.java | 3 +- .../extensions/ProxyExtensionWithClassLoader.java | 7 +- .../pulsar/proxy/extensions/ProxyExtensions.java | 13 ++- .../proxy/extensions/ProxyExtensionsUtils.java | 14 ++-- ...xtensionsDefinitions.java => package-info.java} | 19 +---- .../pulsar/proxy/server/AdminProxyHandler.java | 21 +++-- .../proxy/server/BrokerDiscoveryProvider.java | 8 +- .../pulsar/proxy/server/DirectProxyHandler.java | 67 +++++++-------- .../pulsar/proxy/server/LookupProxyHandler.java | 97 ++++++++++------------ .../pulsar/proxy/server/ParserProxyHandler.java | 86 ++++++++++--------- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 5 +- .../pulsar/proxy/server/ProxyConfiguration.java | 30 +++---- .../pulsar/proxy/server/ProxyConnection.java | 44 +++++----- .../pulsar/proxy/server/ProxyConnectionPool.java | 4 +- .../apache/pulsar/proxy/server/ProxyService.java | 29 ++++--- .../pulsar/proxy/server/ProxyServiceStarter.java | 51 ++++++------ .../proxy/server/ServiceChannelInitializer.java | 13 ++- .../org/apache/pulsar/proxy/server/WebServer.java | 13 +-- .../package-info.java} | 19 +---- .../org/apache/pulsar/proxy/stats/ProxyStats.java | 23 ++--- .../apache/pulsar/proxy/stats/RestException.java | 2 - .../org/apache/pulsar/proxy/stats/TopicStats.java | 4 +- .../package-info.java} | 19 +---- .../package-info.java} | 19 +---- .../server/ProxyConnectionThrottlingTest.java | 4 +- .../proxy/server/ProxyLookupThrottlingTest.java | 4 +- 30 files changed, 292 insertions(+), 351 deletions(-) diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml index 0b120bb..ef9658c 100644 --- a/buildtools/src/main/resources/pulsar/suppressions.xml +++ b/buildtools/src/main/resources/pulsar/suppressions.xml @@ -39,6 +39,8 @@ <!-- suppress most all checks expect below--> <suppress checks="^(?!.*UnusedImports).*$" files=".*[\\/]src[\\/]test[\\/].*"/> + <suppress checks="IllegalImport" files="ProxyServiceStarter.java"/> + <!-- suppress all checks in the copied code --> <suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java"/> diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 0c8e649..529cc77 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -198,6 +198,19 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <executions> + <execution> + <id>checkstyle</id> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> <profiles> diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java index 844c7ca..2cdbad6 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java @@ -18,11 +18,10 @@ */ package org.apache.pulsar.proxy.extensions; -import lombok.Data; -import lombok.experimental.Accessors; - import java.util.Map; import java.util.TreeMap; +import lombok.Data; +import lombok.experimental.Accessors; /** * The collection of Proxy Extensions. diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java index b973e10..928a4b4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java @@ -20,14 +20,13 @@ package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; +import java.net.InetSocketAddress; +import java.util.Map; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; -import java.net.InetSocketAddress; -import java.util.Map; - /** * The extension interface for support additional extensions on Pulsar Proxy. */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java index 632c841..935a393 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java @@ -18,11 +18,10 @@ */ package org.apache.pulsar.proxy.extensions; +import java.nio.file.Path; import lombok.Data; import lombok.NoArgsConstructor; -import java.nio.file.Path; - /** * The metadata of Proxy Extension. */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java index 1f6924a..922c339 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java @@ -20,6 +20,9 @@ package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -27,10 +30,6 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; - /** * A extension with its classloader. */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java index ba3c383..14dae3a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java @@ -21,22 +21,19 @@ package org.apache.pulsar.proxy.extensions; import com.google.common.collect.ImmutableMap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; - +import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.HashMap; import java.util.HashSet; - +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - /** * A collection of loaded extensions. */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java index 2f02827..f8a532d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java @@ -18,12 +18,7 @@ */ package org.apache.pulsar.proxy.extensions; -import lombok.experimental.UtilityClass; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.common.util.ObjectMapperFactory; - +import static com.google.common.base.Preconditions.checkArgument; import java.io.File; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -31,8 +26,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; - -import static com.google.common.base.Preconditions.checkArgument; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; /** * Util class to search and load {@link ProxyExtension}s. diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java similarity index 69% copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java index 844c7ca..7ef1689 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java @@ -16,21 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.extensions; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.Map; -import java.util.TreeMap; - -/** - * The collection of Proxy Extensions. - */ -@Data -@Accessors(fluent = true) -class ExtensionsDefinitions { - - private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>(); - -} +package org.apache.pulsar.proxy.extensions; \ No newline at end of file diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 853eb0b..a38e2f3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -19,7 +19,6 @@ package org.apache.pulsar.proxy.server; import static org.apache.commons.lang3.StringUtils.isBlank; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -33,13 +32,11 @@ import java.util.Iterator; import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; - import javax.net.ssl.SSLContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -115,14 +112,16 @@ class AdminProxyHandler extends ProxyServlet { String value = config.getInitParameter("maxThreads"); if (value == null || "-".equals(value)) { executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) + if (executor == null) { throw new IllegalStateException("No server executor for proxy"); + } } else { QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value)); String servletName = config.getServletName(); int dot = servletName.lastIndexOf('.'); - if (dot >= 0) + if (dot >= 0) { servletName = servletName.substring(dot + 1); + } qtp.setName(servletName); executor = qtp; } @@ -130,22 +129,26 @@ class AdminProxyHandler extends ProxyServlet { client.setExecutor(executor); value = config.getInitParameter("maxConnections"); - if (value == null) + if (value == null) { value = "256"; + } client.setMaxConnectionsPerDestination(Integer.parseInt(value)); value = config.getInitParameter("idleTimeout"); - if (value == null) + if (value == null) { value = "30000"; + } client.setIdleTimeout(Long.parseLong(value)); value = config.getInitParameter("requestBufferSize"); - if (value != null) + if (value != null) { client.setRequestBufferSize(Integer.parseInt(value)); + } value = config.getInitParameter("responseBufferSize"); - if (value != null) + if (value != null){ client.setResponseBufferSize(Integer.parseInt(value)); + } try { client.start(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index bb90be2..ae81350 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -19,7 +19,7 @@ package org.apache.pulsar.proxy.server; import static org.apache.bookkeeper.common.util.MathUtils.signSafeMod; - +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarServerException; @@ -43,8 +42,6 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Maintains available active broker list and returns next active broker in round-robin for discovery service. * This is an API used by Proxy Extensions. @@ -151,7 +148,8 @@ public class BrokerDiscoveryProvider implements Closeable { throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", topicName.getTenant(), e.getMessage())); } - if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) { + if (!service.getAuthorizationService() + .isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) { throw new IllegalAccessException("Don't have permission to administrate resources on this tenant"); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index c896be5..ead243a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -21,7 +21,6 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; - import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -42,7 +41,6 @@ import io.netty.handler.ssl.SslHandler; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; - import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -50,16 +48,12 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; - import javax.net.ssl.SSLSession; - import lombok.Getter; - import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.tls.TlsHostnameVerifier; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; @@ -67,6 +61,7 @@ import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.tls.TlsHostnameVerifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,25 +147,29 @@ public class DirectProxyHandler { inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id()); } - if (config.isHaProxyProtocolEnabled()) { - if (proxyConnection.hasHAProxyMessage()) { - outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage())); - } else { - if (inboundChannel.remoteAddress() instanceof InetSocketAddress) { - InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress(); - String sourceAddress = clientAddress.getAddress().getHostAddress(); - int sourcePort = clientAddress.getPort(); - if (outboundChannel.localAddress() instanceof InetSocketAddress) { - InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress(); - String destinationAddress = proxyAddress.getAddress().getHostAddress(); - int destinationPort = proxyAddress.getPort(); - HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, - HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort); - outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg)); - msg.release(); - } - } + if (!config.isHaProxyProtocolEnabled()) { + return; + } + + if (proxyConnection.hasHAProxyMessage()) { + outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage())); + } else { + if (!(inboundChannel.remoteAddress() instanceof InetSocketAddress)) { + return; + } + if (!(outboundChannel.localAddress() instanceof InetSocketAddress)) { + return; } + InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress(); + String sourceAddress = clientAddress.getAddress().getHostAddress(); + int sourcePort = clientAddress.getPort(); + InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress(); + String destinationAddress = proxyAddress.getAddress().getHostAddress(); + int destinationPort = proxyAddress.getPort(); + HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, + HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort); + outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg)); + msg.release(); } }); } @@ -246,9 +245,9 @@ public class DirectProxyHandler { break; case HandshakeCompleted: - ProxyService.opsCounter.inc(); + ProxyService.OPS_COUNTER.inc(); if (msg instanceof ByteBuf) { - ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes()); + ProxyService.BYTES_COUNTER.inc(((ByteBuf) msg).readableBytes()); } inboundChannel.writeAndFlush(msg).addListener(this); break; @@ -352,14 +351,16 @@ public class DirectProxyHandler { // Enable parsing feature, proxyLogLevel(1 or 2) // Add parser handler if (connected.hasMaxMessageSize()) { - inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, - 0, 4, 0, 4)); + inboundChannel.pipeline() + .replace("frameDecoder", "newFrameDecoder", + new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + + Commands.MESSAGE_SIZE_FRAME_PADDING, + 0, 4, 0, 4)); outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder( - connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); + new LengthFieldBasedFrameDecoder( + connected.getMaxMessageSize() + + Commands.MESSAGE_SIZE_FRAME_PADDING, + 0, 4, 0, 4)); inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(service, inboundChannel, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index b14bea5..6319824 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -19,67 +19,64 @@ package org.apache.pulsar.proxy.server; import static org.apache.commons.lang3.StringUtils.isBlank; - +import io.netty.buffer.ByteBuf; +import io.prometheus.client.Counter; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandGetSchema; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandLookupTopic; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.prometheus.client.Counter; - public class LookupProxyHandler { private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests"; private final ProxyService service; private final ProxyConnection proxyConnection; private final boolean connectWithTLS; - private SocketAddress clientAddress; - private String brokerServiceURL; + private final SocketAddress clientAddress; + private final String brokerServiceURL; - private static final Counter lookupRequests = Counter + private static final Counter LOOKUP_REQUESTS = Counter .build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register(); - private static final Counter partitionsMetadataRequests = Counter + private static final Counter PARTITIONS_METADATA_REQUESTS = Counter .build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create() .register(); - private static final Counter getTopicsOfNamespaceRequestss = Counter + private static final Counter GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter .build("pulsar_proxy_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests") .create() .register(); - private static final Counter getSchemaRequests = Counter + private static final Counter GET_SCHEMA_REQUESTS = Counter .build("pulsar_proxy_get_schema_requests", "Counter of schema requests") .create() .register(); - static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests", + static final Counter REJECTED_LOOKUP_REQUESTS = Counter.build("pulsar_proxy_rejected_lookup_requests", "Counter of topic lookup requests rejected due to throttling").create().register(); - static final Counter rejectedPartitionsMetadataRequests = Counter + static final Counter REJECTED_PARTITIONS_METADATA_REQUESTS = Counter .build("pulsar_proxy_rejected_partitions_metadata_requests", "Counter of partitions metadata requests rejected due to throttling") .create().register(); - static final Counter rejectedGetTopicsOfNamespaceRequests = Counter + static final Counter REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter .build("pulsar_proxy_rejected_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests rejected due to throttling") .create().register(); @@ -99,7 +96,7 @@ public class LookupProxyHandler { } long clientRequestId = lookup.getRequestId(); if (this.service.getLookupRequestSemaphore().tryAcquire()) { - lookupRequests.inc(); + LOOKUP_REQUESTS.inc(); String topic = lookup.getTopic(); String serviceUrl; if (isBlank(brokerServiceURL)) { @@ -121,7 +118,7 @@ public class LookupProxyHandler { performLookup(clientRequestId, topic, serviceUrl, false, 10); this.service.getLookupRequestSemaphore().release(); } else { - rejectedLookupRequests.inc(); + REJECTED_LOOKUP_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress, throttlingErrorMessage); @@ -179,9 +176,9 @@ public class LookupProxyHandler { // to use the appropriate target broker (and port) when it // will connect back. if (log.isDebugEnabled()) { - log.debug( - "Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}", - addr, topic, clientRequestId, brokerUrl); + log.debug("Successfully perform lookup '{}' for topic '{}'" + + " with clientReq Id '{}' and lookup-broker {}", + addr, topic, clientRequestId, brokerUrl); } proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true, LookupType.Connect, clientRequestId, true /* this is coming from proxy */)); @@ -198,7 +195,7 @@ public class LookupProxyHandler { } public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) { - partitionsMetadataRequests.inc(); + PARTITIONS_METADATA_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup", clientAddress); } @@ -207,7 +204,7 @@ public class LookupProxyHandler { handlePartitionMetadataResponse(partitionMetadata, clientRequestId); this.service.getLookupRequestSemaphore().release(); } else { - rejectedPartitionsMetadataRequests.inc(); + REJECTED_PARTITIONS_METADATA_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", clientRequestId, clientAddress, throttlingErrorMessage); @@ -270,7 +267,7 @@ public class LookupProxyHandler { } public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { - getTopicsOfNamespaceRequestss.inc(); + GET_TOPICS_OF_NAMESPACE_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("[{}] Received GetTopicsOfNamespace", clientAddress); } @@ -281,7 +278,7 @@ public class LookupProxyHandler { handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId); this.service.getLookupRequestSemaphore().release(); } else { - rejectedGetTopicsOfNamespaceRequests.inc(); + REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", requestId, clientAddress, throttlingErrorMessage); @@ -296,11 +293,11 @@ public class LookupProxyHandler { long clientRequestId) { String serviceUrl = getBrokerServiceUrl(clientRequestId); - if(!StringUtils.isNotBlank(serviceUrl)) { + if (!StringUtils.isNotBlank(serviceUrl)) { return; } - performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, - commandGetTopicsOfNamespace.getMode()); + performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, + 10, commandGetTopicsOfNamespace.getMode()); } private void performGetTopicsOfNamespace(long clientRequestId, @@ -316,7 +313,7 @@ public class LookupProxyHandler { InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId); - if(addr == null){ + if (addr == null) { return; } @@ -331,7 +328,8 @@ public class LookupProxyHandler { command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode); clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { if (t != null) { - log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage()); + log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", + clientAddress, namespaceName, t.getMessage()); proxyConnection.ctx().writeAndFlush( Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage())); } else { @@ -350,7 +348,7 @@ public class LookupProxyHandler { } public void handleGetSchema(CommandGetSchema commandGetSchema) { - getSchemaRequests.inc(); + GET_SCHEMA_REQUESTS.inc(); if (log.isDebugEnabled()) { log.debug("[{}] Received GetSchema {}", clientAddress, commandGetSchema); } @@ -365,12 +363,12 @@ public class LookupProxyHandler { schemaVersion = Optional.empty(); } - if(!StringUtils.isNotBlank(serviceUrl)) { + if (!StringUtils.isNotBlank(serviceUrl)) { return; } InetSocketAddress addr = getAddr(serviceUrl, clientRequestId); - if(addr == null){ + if (addr == null) { return; } if (log.isDebugEnabled()) { @@ -405,27 +403,24 @@ public class LookupProxyHandler { } /** - * Get default broker service url or discovery an available broker + * Get default broker service url or discovery an available broker. **/ private String getBrokerServiceUrl(long clientRequestId) { - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newError( - clientRequestId, ServerError.ServiceNotReady, e.getMessage() - )); - return null; - } - return this.connectWithTLS ? - availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); - } else { - return this.connectWithTLS ? - service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); + if (StringUtils.isNotBlank(brokerServiceURL)) { + return this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS() + : service.getConfiguration().getBrokerServiceURL(); } - + ServiceLookupData availableBroker; + try { + availableBroker = service.getDiscoveryProvider().nextBroker(); + } catch (Exception e) { + log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); + proxyConnection.ctx().writeAndFlush(Commands.newError( + clientRequestId, ServerError.ServiceNotReady, e.getMessage() + )); + return null; + } + return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); } private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index f152b32..40c05a3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -19,12 +19,18 @@ package org.apache.pulsar.proxy.server; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.raw.MessageParser; @@ -34,34 +40,31 @@ import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; - public class ParserProxyHandler extends ChannelInboundHandlerAdapter { - private Channel channel; + private final Channel channel; //inbound protected static final String FRONTEND_CONN = "frontendconn"; //outbound protected static final String BACKEND_CONN = "backendconn"; - private String connType; + private final String connType; - private int maxMessageSize; + private final int maxMessageSize; private final ProxyService service; - //producerid+channelid as key - //or consumerid+channelid as key - private static Map<String, String> producerHashMap = new ConcurrentHashMap<>(); - private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>(); + /** + * producerid + channelid as key. + */ + private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>(); + + /** + * consumerid + channelid as key. + */ + private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>(); public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) { this.service = service; @@ -70,23 +73,26 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { this.maxMessageSize = maxMessageSize; } - private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{ + private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) { if (messages != null) { // lag StringBuilder infoBuilder = new StringBuilder(info); for (RawMessage message : messages) { - infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ").append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8)); + infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ") + .append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8)); } info = infoBuilder.toString(); } // log conn format is like from source to target switch (this.connType) { case ParserProxyHandler.FRONTEND_CONN: - log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info); + log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + + conn.localAddress() + "]", cmdtype, info); break; case ParserProxyHandler.BACKEND_CONN: - log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info); + log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + + conn.remoteAddress() + "]", cmdtype, info); break; } } @@ -94,9 +100,9 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { private final BaseCommand cmd = new BaseCommand(); public void channelRead(ChannelHandlerContext ctx, Object msg) { - TopicName topicName ; + TopicName topicName; List<RawMessage> messages = new ArrayList<>(); - ByteBuf buffer = (ByteBuf)(msg); + ByteBuf buffer = (ByteBuf) (msg); try { buffer.markReaderIndex(); @@ -107,9 +113,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { switch (cmd.getType()) { case PRODUCER: - ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic()); + ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(), + cmd.getProducer().getTopic()); - logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null); + logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + + ",topic:" + cmd.getProducer().getTopic() + "}", null); break; case SEND: @@ -117,10 +125,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { logging(ctx.channel() , cmd.getType() , "", null); break; } - topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getSend().getProducerId()) + "," + String.valueOf(ctx.channel().id()))); + topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," + + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L,buffer,(message) -> { + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { messages.add(message); msgBytes.add(message.getData().readableBytes()); }, maxMessageSize); @@ -132,9 +141,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { break; case SUBSCRIBE: - ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic()); + ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + "," + + ctx.channel().id(), cmd.getSubscribe().getTopic()); - logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null); + logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null); break; case MESSAGE: @@ -142,13 +153,14 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { logging(ctx.channel() , cmd.getType() , "" , null); break; } - topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id()))); + topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() + + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id()))); msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L,buffer,(message) -> { - messages.add(message); - msgBytes.add(message.getData().readableBytes()); - }, maxMessageSize); + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { + messages.add(message); + msgBytes.add(message.getData().readableBytes()); + }, maxMessageSize); // update topic stats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), topic -> new TopicStats()); @@ -172,8 +184,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { ByteBuf totalSizeBuf = Unpooled.buffer(4); totalSizeBuf.writeInt(buffer.readableBytes()); CompositeByteBuf compBuf = Unpooled.compositeBuffer(); - compBuf.addComponents(totalSizeBuf,buffer); - compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity()); + compBuf.addComponents(totalSizeBuf, buffer); + compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity()); // Release mssages messages.forEach(RawMessage::release); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 665b9f8..50a77d3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -20,7 +20,6 @@ package org.apache.pulsar.proxy.server; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; - import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -48,8 +47,8 @@ public class ProxyClientCnx extends ClientCnx { @Override protected ByteBuf newConnectCommand() throws Exception { if (log.isDebugEnabled()) { - log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," + - " clientAuthData = {}, clientAuthMethod = {}", + log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," + + " clientAuthData = {}, clientAuthMethod = {}", clientAuthRole, clientAuthData, clientAuthMethod); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 5238c46..41ec92f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -26,10 +26,9 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; import java.util.regex.Matcher; import java.util.regex.Pattern; - +import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; @@ -149,7 +148,7 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private String advertisedAddress; - @FieldContext(category=CATEGORY_SERVER, + @FieldContext(category = CATEGORY_SERVER, doc = "Enable or disable the proxy protocol.") private boolean haProxyProtocolEnabled; @@ -300,7 +299,7 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private String anonymousUserRole = null; - /***** --- TLS --- ****/ + // TLS @Deprecated private boolean tlsEnabledInProxy = false; @@ -361,7 +360,7 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private boolean tlsRequireTrustedClientCertOnConnect = false; - /**** --- KeyStore TLS config variables --- ****/ + // KeyStore TLS config variables @FieldContext( category = CATEGORY_KEYSTORE_TLS, @@ -411,7 +410,9 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private String tlsTrustStorePassword = null; - /**** --- KeyStore TLS config variables used for proxy to auth with broker--- ****/ + /** + * KeyStore TLS config variables used for proxy to auth with broker. + */ @FieldContext( category = CATEGORY_KEYSTORE_TLS, doc = "Whether the Pulsar proxy use KeyStore type to authenticate with Pulsar brokers" @@ -459,7 +460,7 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private Set<String> brokerClientTlsProtocols = new TreeSet<>(); - /***** --- HTTP --- ****/ + // HTTP @FieldContext( category = CATEGORY_HTTP, @@ -478,7 +479,7 @@ public class ProxyConfiguration implements PulsarConfiguration { + "proxy, this should be set to the minimum value, 1, so that clients " + "see the data as soon as possible." ) - private int httpOutputBufferSize = 32*1024; + private int httpOutputBufferSize = 32 * 1024; @FieldContext( minValue = 1, @@ -532,15 +533,16 @@ public class ProxyConfiguration implements PulsarConfiguration { private Set<String> additionalServlets = new TreeSet<>(); @FieldContext( - category = CATEGORY_HTTP, + category = CATEGORY_HTTP, doc = "Enable the enforcement of limits on the incoming HTTP requests" - ) + ) private boolean httpRequestsLimitEnabled = false; @FieldContext( - category = CATEGORY_HTTP, - doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)" - ) + category = CATEGORY_HTTP, + doc = "Max HTTP requests per seconds allowed." + + " The excess of requests will be rejected with HTTP code 429 (Too many requests)" + ) private double httpRequestsMaxPerSecond = 100.0; @PropertiesContext( @@ -587,7 +589,7 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private boolean useSeparateThreadPoolForProxyExtensions = true; - /***** --- WebSocket --- ****/ + /***** --- WebSocket. --- ****/ @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Enable or disable the WebSocket servlet" diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 87d84f7..a6dba1c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -19,15 +19,19 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; - +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; - import java.util.function.Supplier; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; - -import io.netty.handler.codec.haproxy.HAProxyMessage; +import lombok.Getter; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -40,29 +44,21 @@ import org.apache.pulsar.client.impl.PulsarChannelInitializer; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandConnect; +import org.apache.pulsar.common.api.proto.CommandGetSchema; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandLookupTopic; -import org.apache.pulsar.common.api.proto.CommandGetSchema; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.PulsarHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import lombok.Getter; - /** - * Handles incoming discovery request from client and sends appropriate response back to client + * Handles incoming discovery request from client and sends appropriate response back to client. * */ public class ProxyConnection extends PulsarHandler implements FutureListener<Void> { @@ -125,10 +121,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); - ProxyService.activeConnections.inc(); - if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { + ProxyService.ACTIVE_CONNECTIONS.inc(); + if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { ctx.close(); - ProxyService.rejectedConnections.inc(); + ProxyService.REJECTED_CONNECTIONS.inc(); return; } } @@ -136,13 +132,13 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); - ProxyService.activeConnections.dec(); + ProxyService.ACTIVE_CONNECTIONS.dec(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - ProxyService.newConnections.inc(); + ProxyService.NEW_CONNECTIONS.inc(); service.getClientCnxs().add(this); LOG.info("[{}] New connection opened", remoteAddress); } @@ -195,11 +191,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi case ProxyConnectionToBroker: // Pass the buffer to the outbound connection and schedule next read // only if we can write on the connection - ProxyService.opsCounter.inc(); + ProxyService.OPS_COUNTER.inc(); if (msg instanceof ByteBuf) { int bytes = ((ByteBuf) msg).readableBytes(); directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); - ProxyService.bytesCounter.inc(bytes); + ProxyService.BYTES_COUNTER.inc(bytes); } directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this); break; @@ -400,7 +396,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi } /** - * handles discovery request from client ands sends next active broker address + * handles discovery request from client ands sends next active broker address. */ @Override protected void handleLookup(CommandLookupTopic lookup) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java index cd1b31d..4dcb095 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.proxy.server; +import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; - import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; @@ -29,8 +29,6 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.EventLoopGroup; - public class ProxyConnectionPool extends ConnectionPool { public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 778f42b..2af7ebf 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -28,9 +28,9 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import java.io.Closeable; @@ -69,7 +69,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Pulsar proxy service + * Pulsar proxy service. */ public class ProxyService implements Closeable { @@ -108,22 +108,22 @@ public class ProxyService implements Closeable { private static final int numThreads = Runtime.getRuntime().availableProcessors(); - static final Gauge activeConnections = Gauge + static final Gauge ACTIVE_CONNECTIONS = Gauge .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create() .register(); - static final Counter newConnections = Counter + static final Counter NEW_CONNECTIONS = Counter .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create() .register(); - static final Counter rejectedConnections = Counter + static final Counter REJECTED_CONNECTIONS = Counter .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create() .register(); - static final Counter opsCounter = Counter + static final Counter OPS_COUNTER = Counter .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register(); - static final Counter bytesCounter = Counter + static final Counter BYTES_COUNTER = Counter .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register(); @Getter @@ -137,11 +137,12 @@ public class ProxyService implements Closeable { AuthenticationService authenticationService) throws Exception { requireNonNull(proxyConfig); this.proxyConfig = proxyConfig; - this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); + this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", + Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = new ConcurrentHashMap<>(); - this.lookupRequestSemaphore = new AtomicReference<Semaphore>( + this.lookupRequestSemaphore = new AtomicReference<>( new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false)); if (proxyConfig.getProxyLogLevel().isPresent()) { @@ -208,7 +209,8 @@ public class ProxyService implements Closeable { // Bind and start to accept incoming connections. if (proxyConfig.getServicePort().isPresent()) { try { - listenChannel = bootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePort().get()).sync().channel(); + listenChannel = bootstrap.bind(proxyConfig.getBindAddress(), + proxyConfig.getServicePort().get()).sync().channel(); LOG.info("Started Pulsar Proxy at {}", listenChannel.localAddress()); } catch (Exception e) { throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort().get(), e); @@ -218,7 +220,8 @@ public class ProxyService implements Closeable { if (proxyConfig.getServicePortTls().isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true)); - listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePortTls().get()).sync().channel(); + listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(), + proxyConfig.getServicePortTls().get()).sync().channel(); LOG.info("Started Pulsar TLS Proxy on {}", listenChannelTls.localAddress()); } @@ -247,8 +250,8 @@ public class ProxyService implements Closeable { } // This call is used for starting additional protocol handlers - public void startProxyExtensions( - Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) { + public void startProxyExtensions(Map<String, Map<InetSocketAddress, + ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) { protocolHandlers.forEach((extensionName, initializers) -> { initializers.forEach((address, initializer) -> { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 315607e..3d81317 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -25,16 +25,30 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import static org.slf4j.bridge.SLF4JBridgeHandler.install; import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger; - +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.internal.PlatformDependent; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import io.prometheus.client.Gauge.Child; +import io.prometheus.client.exporter.MetricsServlet; +import io.prometheus.client.hotspot.DefaultExports; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; import org.apache.logging.log4j.core.util.datetime.FixedDateFormat; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.util.CmdGenerateDocs; +import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; @@ -46,27 +60,9 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; - -import io.netty.util.internal.PlatformDependent; -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Gauge; -import io.prometheus.client.Gauge.Child; -import io.prometheus.client.exporter.MetricsServlet; -import io.prometheus.client.hotspot.DefaultExports; -import org.apache.pulsar.common.configuration.VipStatus; -import org.apache.pulsar.proxy.stats.ProxyStats; - -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; - /** - * Starts an instance of the Pulsar ProxyService + * Starts an instance of the Pulsar ProxyService. * */ public class ProxyServiceStarter { @@ -107,7 +103,8 @@ public class ProxyServiceStarter { DateFormat dateFormat = new SimpleDateFormat( FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern()); Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> { - System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage())); + System.out.printf("%s [%s] error Uncaught exception in thread %s: %s%n", dateFormat.format(new Date()), + thread.getContextClassLoader(), thread.getName(), exception.getMessage()); exception.printStackTrace(System.out); }); @@ -211,10 +208,10 @@ public class ProxyServiceStarter { public void close() { try { - if(proxyService != null) { + if (proxyService != null) { proxyService.close(); } - if(server != null) { + if (server != null) { server.stop(); } } catch (Exception e) { @@ -226,10 +223,12 @@ public class ProxyServiceStarter { ProxyConfiguration config, ProxyService service, BrokerDiscoveryProvider discoveryProvider) throws Exception { - server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), + Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); server.addRestResources("/", VipStatus.class.getPackage().getName(), VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); - server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service); + server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), + ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service); AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 658dd87..55f0b69 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -19,7 +19,10 @@ package org.apache.pulsar.proxy.server; import static org.apache.commons.lang3.StringUtils.isEmpty; - +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import java.util.function.Supplier; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -28,11 +31,6 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder; import org.apache.pulsar.common.util.NettyClientSslContextRefresher; import org.apache.pulsar.common.util.NettyServerSslContextBuilder; - -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.ssl.SslContext; import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; @@ -77,7 +75,8 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> } else { serverSslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), - serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), + serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), + serviceConfig.getTlsProtocols(), serviceConfig.isTlsRequireTrustedClientCertOnConnect(), serviceConfig.getTlsCertRefreshCheckDurationSec()); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index c92f722..687bfaf 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -31,9 +31,9 @@ import javax.servlet.DispatcherType; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.broker.web.JettyRequestLogFactory; import org.apache.pulsar.broker.web.JsonMapperProvider; import org.apache.pulsar.broker.web.RateLimitingFilter; -import org.apache.pulsar.broker.web.JettyRequestLogFactory; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext; @@ -76,7 +76,7 @@ public class WebServer { private ServerConnector connector; private ServerConnector connectorTls; - public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) throws IOException { + public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) { this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web"); this.server = new Server(webServiceExecutor); this.authenticationService = authenticationService; @@ -84,12 +84,12 @@ public class WebServer { List<ServerConnector> connectors = new ArrayList<>(); - HttpConfiguration http_config = new HttpConfiguration(); - http_config.setOutputBufferSize(config.getHttpOutputBufferSize()); + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize()); if (config.getWebServicePort().isPresent()) { this.externalServicePort = config.getWebServicePort().get(); - connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config)); + connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(httpConfig)); connector.setHost(config.getBindAddress()); connector.setPort(externalServicePort); connectors.add(connector); @@ -146,7 +146,8 @@ public class WebServer { addServlet(basePath, servletHolder, attributes, true); } - public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes, boolean requireAuthentication) { + public void addServlet(String basePath, ServletHolder servletHolder, + List<Pair<String, Object>> attributes, boolean requireAuthentication) { Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst(); if (existingPath.isPresent()) { throw new IllegalArgumentException( diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java similarity index 69% copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java index 844c7ca..07a8290 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java @@ -16,21 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.extensions; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.Map; -import java.util.TreeMap; - -/** - * The collection of Proxy Extensions. - */ -@Data -@Accessors(fluent = true) -class ExtensionsDefinitions { - - private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>(); - -} +package org.apache.pulsar.proxy.server; \ No newline at end of file diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java index f709a25..d021389 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java @@ -18,28 +18,27 @@ */ package org.apache.pulsar.proxy.stats; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; - import javax.servlet.ServletContext; import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; import javax.ws.rs.POST; +import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; - import org.apache.pulsar.proxy.server.ProxyService; -import io.netty.channel.Channel; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; + @Path("/") @Api(value = "/proxy-stats", description = "Stats for proxy", tags = "proxy-stats", hidden = true) @@ -55,7 +54,8 @@ public class ProxyStats { @GET @Path("/connections") - @ApiOperation(value = "Proxy stats api to get info for live connections", response = List.class, responseContainer = "List") + @ApiOperation(value = "Proxy stats api to get info for live connections", + response = List.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") }) public List<ConnectionStats> metrics() { List<ConnectionStats> stats = new ArrayList<>(); @@ -88,7 +88,8 @@ public class ProxyStats { @POST @Path("/logging/{logLevel}") - @ApiOperation(hidden = true, value = "Change proxy logging level dynamically", notes = "It only changes log-level in memory, change it config file to persist the change") + @ApiOperation(hidden = true, value = "Change proxy logging level dynamically", + notes = "It only changes log-level in memory, change it config file to persist the change") @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), }) public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) { if (logLevel < 0 || logLevel > 2) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java index 9769588..7ba8d99 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java @@ -20,11 +20,9 @@ package org.apache.pulsar.proxy.stats; import java.io.PrintWriter; import java.io.StringWriter; - import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import org.apache.pulsar.common.policies.data.ErrorData; /** diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java index 09b04fc..1c808a8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java @@ -18,11 +18,9 @@ */ package org.apache.pulsar.proxy.stats; -import org.apache.pulsar.common.stats.Rate; - import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - import lombok.Getter; +import org.apache.pulsar.common.stats.Rate; @Getter @JsonIgnoreProperties(value = { "msgInRate", "msgOutRate" }) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java similarity index 69% copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java index 844c7ca..2eb5cf8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java @@ -16,21 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.extensions; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.Map; -import java.util.TreeMap; - -/** - * The collection of Proxy Extensions. - */ -@Data -@Accessors(fluent = true) -class ExtensionsDefinitions { - - private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>(); - -} +package org.apache.pulsar.proxy.stats; \ No newline at end of file diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java similarity index 69% copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java index 844c7ca..eddea63 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java @@ -16,21 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.extensions; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.Map; -import java.util.TreeMap; - -/** - * The collection of Proxy Extensions. - */ -@Data -@Accessors(fluent = true) -class ExtensionsDefinitions { - - private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>(); - -} +package org.apache.pulsar.proxy.util; \ No newline at end of file diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 062db18..098d892 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -91,7 +91,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { .operationTimeout(1000, TimeUnit.MILLISECONDS) .build(); - Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d); + Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 0.0d); try { @Cleanup Producer<byte[]> producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create(); @@ -101,7 +101,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { // OK } // should add retry count since retry every 100ms and operation timeout is set to 1000ms - Assert.assertEquals(ProxyService.rejectedConnections.get(), 5.0d); + Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 5.0d); } private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index fa3c485..25df835 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -99,7 +99,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { } catch (Exception ex) { // Ignore } - Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d); + Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d); proxyService.getLookupRequestSemaphore().release(); try { @Cleanup @@ -109,6 +109,6 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { Assert.fail("Should not have failed since can acquire LookupRequestSemaphore"); } - Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d); + Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d); } }