nicoloboschi commented on code in PR #20067:
URL: https://github.com/apache/pulsar/pull/20067#discussion_r1162428289


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -371,10 +372,35 @@ private synchronized void completeConnect() throws 
PulsarClientException {
                     });
         } else {
             // Client is doing a lookup, we can consider the handshake complete
-            // and we'll take care of just topics and
-            // partitions metadata lookups
+            // and we'll take care of just topics and partitions metadata 
lookups
+            Supplier<ClientCnx> clientCnxSupplier;
+            if (service.getConfiguration().isAuthenticationEnabled()) {
+                clientCnxSupplier = () -> new ProxyClientCnx(clientConf, 
service.getWorkerGroup(), clientAuthRole,
+                        clientAuthMethod, protocolVersionToAdvertise,
+                        
service.getConfiguration().isForwardAuthorizationCredentials(), this);
+            } else {
+                clientCnxSupplier =
+                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersionToAdvertise);
+            }
+
+            if (this.connectionPool == null) {
+                this.connectionPool = new ConnectionPool(clientConf, 
service.getWorkerGroup(),
+                        clientCnxSupplier,
+                        
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for 
proxy connection to {} state {} role {}",
+                        remoteAddress, state, clientAuthRole);
+            }
+
             state = State.ProxyLookupRequests;
             lookupProxyHandler = service.newLookupProxyHandler(this);
+            if (service.getConfiguration().isAuthenticationEnabled()
+                    && 
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
+                authRefreshTask = ctx.executor().scheduleAtFixedRate(() -> 
refreshAuthenticationCredentials(false),

Review Comment:
   we should wrap it with `Runnables.catchingAndLoggingThrowables`



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -471,6 +497,47 @@ protected void authChallengeSuccessCallback(AuthData 
authChallenge) {
         }
     }
 
+    private void refreshAuthenticationCredentials(boolean force) {
+        assert ctx.executor().inEventLoop();
+        if (state != State.ProxyLookupRequests) {
+            // Happens when an exception is thrown that causes this connection 
to close.
+            return;
+        } else if (!authState.isExpired() || !force) {
+            // Credentials are still valid. Nothing to do at this point
+            return;
+        }
+
+        if (!supportsAuthenticationRefresh()) {
+            LOG.warn("[{}] Closing connection because client doesn't support 
auth credentials refresh", remoteAddress);
+            ctx.close();
+            return;
+        }
+
+        if (System.nanoTime() - authChallengeSentTime
+                > 
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
 {
+            LOG.warn("[{}] Closing connection after timeout on refreshing auth 
credentials", remoteAddress);
+            ctx.close();
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.info("[{}] Refreshing authentication credentials", 
remoteAddress);

Review Comment:
   LOG.debug ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to