gaborgsomogyi commented on code in PR #21511: URL: https://github.com/apache/flink/pull/21511#discussion_r1049361162
########## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java: ########## @@ -45,91 +42,86 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** * Manager for delegation tokens in a Flink cluster. * * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can - * run without interruption while accessing secured services. It periodically logs in to the KDC - * with user-provided credentials, and contacts all the configured secure services to obtain - * delegation tokens to be distributed to the rest of the application. + * run without interruption while accessing secured services. It periodically contacts all the + * configured secure services to obtain delegation tokens to be distributed to the rest of the + * application. */ @Internal -public class KerberosDelegationTokenManager implements DelegationTokenManager { +public class DefaultDelegationTokenManager implements DelegationTokenManager { - private static final Logger LOG = LoggerFactory.getLogger(KerberosDelegationTokenManager.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class); private final Configuration configuration; private final double tokensRenewalTimeRatio; private final long renewalRetryBackoffPeriod; - private final KerberosLoginProvider kerberosLoginProvider; - - @VisibleForTesting final Map<String, HadoopDelegationTokenProvider> delegationTokenProviders; + @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders; @Nullable private final ScheduledExecutor scheduledExecutor; @Nullable private final ExecutorService ioExecutor; - @Nullable private ScheduledFuture<?> tgtRenewalFuture; - private final Object tokensUpdateFutureLock = new Object(); @GuardedBy("tokensUpdateFutureLock") @Nullable private ScheduledFuture<?> tokensUpdateFuture; - @Nullable private DelegationTokenListener delegationTokenListener; + @Nullable private Listener listener; - public KerberosDelegationTokenManager( + public DefaultDelegationTokenManager( Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) { this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); - this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO); + this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO); this.renewalRetryBackoffPeriod = - configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); - this.kerberosLoginProvider = new KerberosLoginProvider(configuration); + configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); this.delegationTokenProviders = loadProviders(); this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; } - private Map<String, HadoopDelegationTokenProvider> loadProviders() { + private Map<String, DelegationTokenProvider> loadProviders() { LOG.info("Loading delegation token providers"); - ServiceLoader<HadoopDelegationTokenProvider> serviceLoader = - ServiceLoader.load(HadoopDelegationTokenProvider.class); + ServiceLoader<DelegationTokenProvider> serviceLoader = + ServiceLoader.load(DelegationTokenProvider.class); - Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>(); - for (HadoopDelegationTokenProvider provider : serviceLoader) { + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + for (DelegationTokenProvider provider : serviceLoader) { try { if (isProviderEnabled(provider.serviceName())) { provider.init(configuration); LOG.info( "Delegation token provider {} loaded and initialized", provider.serviceName()); + checkState( Review Comment: If 2 providers are coming w/ the same name then it blows up and deployment fails fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org