[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164619782 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java ## @@ -54,6 +57,8 @@ public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { this.proxyConnection = proxyConnection; this.clientAddress = proxyConnection.clientAddress(); this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker(); +this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS() Review comment: should we also add another check `this.connectWithTLS && isNotBlank(proxy.getConfiguration().getBrokerServiceURLTLS()) ? proxy.getConfiguration().getBrokerServiceURLTLS() : proxy.getConfiguration().getBrokerServiceURL()` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164620502 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -150,8 +153,46 @@ public boolean canLookup(DestinationName destination, String role) throws Except return canProduce(destination, role) || canConsume(destination, role, null); } -private CompletableFuture checkAuthorization(DestinationName destination, String role, -AuthAction action) { +/** + * Check whether the specified role can perform a lookup for the specified destination. + * + * For that the caller needs to have producer or consumer permission. + * + * @param destination + * @param role + * @return + * @throws Exception + */ +public CompletableFuture canLookupAsync(DestinationName destination, String role) { +CompletableFuture finalResult = new CompletableFuture(); +canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> { +if (ex == null) { +if (produceAuthorized) { +finalResult.complete(produceAuthorized); +return; +} +} else if (log.isDebugEnabled()) { Review comment: ``` else { if (log.isDebugEnabled()) { : } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164201777 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -98,7 +98,7 @@ private String clientVersion = null; private int nonPersistentPendingMessages = 0; private final int MaxNonPersistentPendingMessages; -private String originalPrincipal; +private String proxyClientAuthRole = null; Review comment: In future, if we have another usecase such as proxy eg: websocket then are we going to create additional placeholder or we will use reuse the same one by renaming it something else? >> Principal is specific to athens I think [principal](https://en.wikipedia.org/wiki/Principal_(computer_security)) a generic term in auth. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164225641 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -55,7 +59,7 @@ private Set authenticationProviders = Sets.newTreeSet(); // Enforce authorization private boolean authorizationEnabled = false; - + Review comment: extra space? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164200457 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -150,8 +153,60 @@ public boolean canLookup(DestinationName destination, String role) throws Except return canProduce(destination, role) || canConsume(destination, role, null); } -private CompletableFuture checkAuthorization(DestinationName destination, String role, -AuthAction action) { +/** + * Check whether the specified role can perform a lookup for the specified destination. + * + * For that the caller needs to have producer or consumer permission. + * + * @param destination + * @param role + * @return + * @throws Exception + */ +public CompletableFuture canLookupAsync(DestinationName destination, String role) { +CompletableFuture produceFuture = canProduceAsync(destination, role); +CompletableFuture consumeFuture = canConsumeAsync(destination, role, null); +CompletableFuture finalResult = new CompletableFuture(); +AtomicBoolean futureCompleted = new AtomicBoolean(false); +produceFuture.whenComplete((authorized, ex) -> { +synchronized (this) { +if (ex == null) { +if (authorized) { +finalResult.complete(authorized); +return; +} +} else { +log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions", +destination.toString(), role, ex); +} +if (futureCompleted.get()) { Review comment: instead using `futureCompleted` boolean, should we use `finalResult.isDone()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164200841 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -150,8 +153,60 @@ public boolean canLookup(DestinationName destination, String role) throws Except return canProduce(destination, role) || canConsume(destination, role, null); } -private CompletableFuture checkAuthorization(DestinationName destination, String role, -AuthAction action) { +/** + * Check whether the specified role can perform a lookup for the specified destination. + * + * For that the caller needs to have producer or consumer permission. + * + * @param destination + * @param role + * @return + * @throws Exception + */ +public CompletableFuture canLookupAsync(DestinationName destination, String role) { +CompletableFuture produceFuture = canProduceAsync(destination, role); +CompletableFuture consumeFuture = canConsumeAsync(destination, role, null); +CompletableFuture finalResult = new CompletableFuture(); +AtomicBoolean futureCompleted = new AtomicBoolean(false); +produceFuture.whenComplete((authorized, ex) -> { +synchronized (this) { +if (ex == null) { +if (authorized) { +finalResult.complete(authorized); +return; +} +} else { +log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions", +destination.toString(), role, ex); +} +if (futureCompleted.get()) { +finalResult.complete(false); +} +futureCompleted.set(true); +} +}); + +consumeFuture.whenComplete((authorized, ex) -> { Review comment: I think checking `produceFuture` and `consumeFuture` would be very quick.. so, instead checking both in parallel, should we check `consumeFuture` once `produceFuture` completes with failure? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164227063 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java ## @@ -110,21 +112,23 @@ public ProxyService(ProxyConfiguration proxyConfig) throws IOException { } public void start() throws Exception { -localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), -proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs()); -localZooKeeperConnectionService.start(new ShutdownService() { -@Override -public void shutdown(int exitCode) { -LOG.error("Lost local ZK session. Shutting down the proxy"); -Runtime.getRuntime().halt(-1); -} -}); - -discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory()); -this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig); authenticationService = new AuthenticationService(serviceConfiguration); -authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService); + +if (!isEmpty(proxyConfig.getZookeeperServers()) && !isEmpty(proxyConfig.getGlobalZookeeperServers())) { Review comment: should we use `isBlank()` instead `isEmpty()` because user may make mistake by providing space (eg: I can see zk-dummy-value as space in your test case as well) in the string.?? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164199836 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -150,8 +153,60 @@ public boolean canLookup(DestinationName destination, String role) throws Except return canProduce(destination, role) || canConsume(destination, role, null); } -private CompletableFuture checkAuthorization(DestinationName destination, String role, -AuthAction action) { +/** + * Check whether the specified role can perform a lookup for the specified destination. + * + * For that the caller needs to have producer or consumer permission. + * + * @param destination + * @param role + * @return + * @throws Exception + */ +public CompletableFuture canLookupAsync(DestinationName destination, String role) { +CompletableFuture produceFuture = canProduceAsync(destination, role); +CompletableFuture consumeFuture = canConsumeAsync(destination, role, null); +CompletableFuture finalResult = new CompletableFuture(); +AtomicBoolean futureCompleted = new AtomicBoolean(false); +produceFuture.whenComplete((authorized, ex) -> { +synchronized (this) { +if (ex == null) { +if (authorized) { +finalResult.complete(authorized); +return; +} +} else { +log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions", Review comment: `if (log.isDebugEnabled()){}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164223737 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java ## @@ -64,20 +69,25 @@ public void handleLookup(CommandLookupTopic lookup) { lookupRequests.inc(); long clientRequestId = lookup.getRequestId(); String topic = lookup.getTopic(); - -ServiceLookupData availableBroker = null; -try { -availableBroker = service.getDiscoveryProvider().nextBroker(); -} catch (Exception e) { -log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); -proxyConnection.ctx().writeAndFlush( - Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId)); -return; +if (isBlank(brokerServiceURL)) { +ServiceLookupData availableBroker = null; +try { +availableBroker = service.getDiscoveryProvider().nextBroker(); +} catch (Exception e) { +log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); +proxyConnection.ctx().writeAndFlush( + Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId)); +return; +} +performLookup(clientRequestId, topic, +this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(), +false, 10); +} else { +performLookup(clientRequestId, topic, Review comment: can we take `performLookup(..)` out of the condition and we can derive url in if-else block so, we can avoid duplicate logic? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164210973 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() +: this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture; +if (service.isAuthorizationEnabled() && proxyClientAuthRole != null) { +isProxyAuthorizedFuture = service.getAuthorizationManager() +.canLookupAsync(DestinationName.get(topicName), authRole); +} else { +isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); +} + +isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { Review comment: we are not completing request if `isProxyAuthorizedFuture` fails because `thenApply` will only triggered below logic if `isProxyAuthorizedFuture` is successful.?? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164212953 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() +: this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture; +if (service.isAuthorizationEnabled() && proxyClientAuthRole != null) { Review comment: i think `isRequestViaProxy()` also checks `proxyClientAuthRole != null`.. so, either we can use that method or we can remove that method if it just checks the boolean and can avoid one function call? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164204429 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() Review comment: shouldn't we get `originalPrincipal` and `authRole` at the time of connect and might not need to again for any further operations like `CommandPartitionedTopicMetadata` and `CommandLookupTopic`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164213206 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() +: this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture; +if (service.isAuthorizationEnabled() && proxyClientAuthRole != null) { +isProxyAuthorizedFuture = service.getAuthorizationManager() +.canLookupAsync(DestinationName.get(topicName), authRole); +} else { +isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); +} + +isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { Review comment: can we also write a unit test case some -ve unit test case where `service.getAuthorizationManager(). canLookupAsync()` fails with exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164225775 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -170,6 +170,7 @@ protected void handleConnect(CommandConnect connect) { // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and // partitions metadata lookups state = State.ProxyLookupRequests; +ProxyConfiguration proxyConfig = service.getConfiguration(); Review comment: are we using `proxyConfig` anywhere else? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164194675 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -111,8 +115,7 @@ public boolean canProduce(DestinationName destination, String role) throws Excep permissionFuture.complete(isAuthorized); }); }).exceptionally(ex -> { -log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, -ex); +log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, ex); Review comment: as you are making change here, should we just print exception message here as authentication failure is very common when some client is keep trying to connect with auth failure and we can avoid printing entire stacktrace? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164205911 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() +: this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture; +if (service.isAuthorizationEnabled() && proxyClientAuthRole != null) { +isProxyAuthorizedFuture = service.getAuthorizationManager() +.canLookupAsync(DestinationName.get(topicName), authRole); Review comment: shouldn't we check `proxyClientAuthRole` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r164210238 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -175,73 +175,116 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() +: this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture; +if (service.isAuthorizationEnabled() && proxyClientAuthRole != null) { +isProxyAuthorizedFuture = service.getAuthorizationManager() +.canLookupAsync(DestinationName.get(topicName), authRole); +} else { +isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); +} + +isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { +if (isProxyAuthorized) { +lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName), +lookup.getAuthoritative(), proxyClientAuthRole != null ? proxyClientAuthRole : authRole, Review comment: if we have checked proxy authorization on ln#188 then shouldn't we just pass `authRole` for lookup? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161097024 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -70,6 +70,19 @@ public boolean canProduce(DestinationName destination, String role) throws Excep } } +/** + * Check if the specified role has permission to access the destination via a proxy + * + * @param destination + *the fully qualified destination name associated with the destination. + * @param role + *the app id used to receive messages from the destination. + */ +public CompletableFuture canProxyAsync(DestinationName destination, String role) { +return checkAuthorization(destination, role, AuthAction.proxy); Review comment: instead `AuthAction`, as @merlimat mentioned earlier, should we pass `proxy` as a flag to authorize request? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161093519 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupViaServiceUrl.java ## @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.naming.DestinationName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.prometheus.client.Counter; + +public class LookupViaServiceUrl implements LookupProxyHandler { Review comment: Actually `LookupViaServiceUrl` and `LookupWithDiscoveryServiceHandler` have most of the logic in common and only difference it requires is "service-url". Right now, discovery service provides next-broker url and we want to add lookup using broker-service url also. So, I think we don't need different implementation of `LookupProxyHandler` but we need provider that returns broker-url based on discovery-service/configured service-url and things will remain same in this `LookupProxyHandler`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161096620 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -170,7 +170,11 @@ protected void handleConnect(CommandConnect connect) { // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and // partitions metadata lookups state = State.ProxyLookupRequests; -lookupProxyHandler = new LookupProxyHandler(service, this); +if (service.getConfiguration().isDiscoveryServiceEnabled()) { Review comment: is mentioned above, should we derive it based on global-zk value present or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161094006 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -35,6 +35,14 @@ // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; +// If Discovery Service is Disabled the proxy will just authenticate the client +// and forward all requests to a VIP or any other service discovery port +private boolean discoveryServiceEnabled = true; Review comment: I think we might not need this flag? If global-zk address is not provided then proxy should fall to provided discovery-url? similar as what we have done for websocket `WebSocketProxyConfiguration.java`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161081263 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -174,77 +174,115 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); -} -final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); -if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); -} else { -if (log.isDebugEnabled()) { -log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic); -} - ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, -"Failed due to too many pending lookup requests", requestId)); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } - +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.proxyClientAuthRole; Review comment: can't we use `OriginalPrincipal` that broker received on `handleConnect()`? is this different auth-role? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161078193 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -98,7 +98,7 @@ private String clientVersion = null; private int nonPersistentPendingMessages = 0; private final int MaxNonPersistentPendingMessages; -private String originalPrincipal; +private String proxyClientAuthRole = null; Review comment: Is there any reason for renaming `originalPrincipal`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161096455 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -35,6 +35,14 @@ // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; +// If Discovery Service is Disabled the proxy will just authenticate the client +// and forward all requests to a VIP or any other service discovery port +private boolean discoveryServiceEnabled = true; + +// if Service Discovery is Disabled this url should point to the discovery service provider. +private String discoveryServiceURL = "pulsar://localhost:6650/"; Review comment: should we rename it to `brokerServiceUrl` as in this case proxy will directly try to communicate broker with brokerserviceUrl? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161081553 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -186,6 +187,7 @@ message CommandSubscribe { message CommandPartitionedTopicMetadata { required string topic= 1; required uint64 request_id = 2; +optional string original_principal = 3; Review comment: formatting.. and same here, can't we use `original_principal` received from `CommandConnect`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161078797 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -174,77 +174,115 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); -} -final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); -if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); -} else { -if (log.isDebugEnabled()) { -log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic); -} - ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, -"Failed due to too many pending lookup requests", requestId)); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } - +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture = isProxyAuthorized(topicName, proxyClientAuthRole); + +isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { +if (isProxyAuthorized) { +final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); Review comment: I think we should apply throttling before doing any action. So, can we move it to the beginning as before? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services