nicoloboschi commented on code in PR #20067:
URL: https://github.com/apache/pulsar/pull/20067#discussion_r1162428289
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -371,10 +372,35 @@ private synchronized void completeConnect() throws
PulsarClientException {
});
} else {
// Client is doing a lookup, we can consider the handshake complete
- // and we'll take care of just topics and
- // partitions metadata lookups
+ // and we'll take care of just topics and partitions metadata
lookups
+ Supplier<ClientCnx> clientCnxSupplier;
+ if (service.getConfiguration().isAuthenticationEnabled()) {
+ clientCnxSupplier = () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole,
+ clientAuthMethod, protocolVersionToAdvertise,
+
service.getConfiguration().isForwardAuthorizationCredentials(), this);
+ } else {
+ clientCnxSupplier =
+ () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersionToAdvertise);
+ }
+
+ if (this.connectionPool == null) {
+ this.connectionPool = new ConnectionPool(clientConf,
service.getWorkerGroup(),
+ clientCnxSupplier,
+
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for
proxy connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
+ }
+
state = State.ProxyLookupRequests;
lookupProxyHandler = service.newLookupProxyHandler(this);
+ if (service.getConfiguration().isAuthenticationEnabled()
+ &&
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
+ authRefreshTask = ctx.executor().scheduleAtFixedRate(() ->
refreshAuthenticationCredentials(false),
Review Comment:
we should wrap it with `Runnables.catchingAndLoggingThrowables`
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -471,6 +497,47 @@ protected void authChallengeSuccessCallback(AuthData
authChallenge) {
}
}
+ private void refreshAuthenticationCredentials(boolean force) {
+ assert ctx.executor().inEventLoop();
+ if (state != State.ProxyLookupRequests) {
+ // Happens when an exception is thrown that causes this connection
to close.
+ return;
+ } else if (!authState.isExpired() || !force) {
+ // Credentials are still valid. Nothing to do at this point
+ return;
+ }
+
+ if (!supportsAuthenticationRefresh()) {
+ LOG.warn("[{}] Closing connection because client doesn't support
auth credentials refresh", remoteAddress);
+ ctx.close();
+ return;
+ }
+
+ if (System.nanoTime() - authChallengeSentTime
+ >
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
{
+ LOG.warn("[{}] Closing connection after timeout on refreshing auth
credentials", remoteAddress);
+ ctx.close();
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("[{}] Refreshing authentication credentials",
remoteAddress);
Review Comment:
LOG.debug ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]