gaborgsomogyi commented on code in PR #21511: URL: https://github.com/apache/flink/pull/21511#discussion_r1049390777
########## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java: ########## @@ -45,91 +42,86 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** * Manager for delegation tokens in a Flink cluster. * * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can - * run without interruption while accessing secured services. It periodically logs in to the KDC - * with user-provided credentials, and contacts all the configured secure services to obtain - * delegation tokens to be distributed to the rest of the application. + * run without interruption while accessing secured services. It periodically contacts all the + * configured secure services to obtain delegation tokens to be distributed to the rest of the + * application. */ @Internal -public class KerberosDelegationTokenManager implements DelegationTokenManager { +public class DefaultDelegationTokenManager implements DelegationTokenManager { - private static final Logger LOG = LoggerFactory.getLogger(KerberosDelegationTokenManager.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class); private final Configuration configuration; private final double tokensRenewalTimeRatio; private final long renewalRetryBackoffPeriod; - private final KerberosLoginProvider kerberosLoginProvider; - - @VisibleForTesting final Map<String, HadoopDelegationTokenProvider> delegationTokenProviders; + @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders; @Nullable private final ScheduledExecutor scheduledExecutor; @Nullable private final ExecutorService ioExecutor; - @Nullable private ScheduledFuture<?> tgtRenewalFuture; - private final Object tokensUpdateFutureLock = new Object(); @GuardedBy("tokensUpdateFutureLock") @Nullable private ScheduledFuture<?> tokensUpdateFuture; - @Nullable private DelegationTokenListener delegationTokenListener; + @Nullable private Listener listener; - public KerberosDelegationTokenManager( + public DefaultDelegationTokenManager( Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) { this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); - this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO); + this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO); this.renewalRetryBackoffPeriod = - configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); - this.kerberosLoginProvider = new KerberosLoginProvider(configuration); + configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); this.delegationTokenProviders = loadProviders(); this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; } - private Map<String, HadoopDelegationTokenProvider> loadProviders() { + private Map<String, DelegationTokenProvider> loadProviders() { LOG.info("Loading delegation token providers"); - ServiceLoader<HadoopDelegationTokenProvider> serviceLoader = - ServiceLoader.load(HadoopDelegationTokenProvider.class); + ServiceLoader<DelegationTokenProvider> serviceLoader = + ServiceLoader.load(DelegationTokenProvider.class); - Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>(); - for (HadoopDelegationTokenProvider provider : serviceLoader) { + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + for (DelegationTokenProvider provider : serviceLoader) { try { if (isProviderEnabled(provider.serviceName())) { provider.init(configuration); LOG.info( "Delegation token provider {} loaded and initialized", provider.serviceName()); + checkState( + !providers.containsKey(provider.serviceName()), + "Delegation token provider with service name {} has multiple implementations", + provider.serviceName()); providers.put(provider.serviceName(), provider); } else { LOG.info( "Delegation token provider {} is disabled so not loaded", provider.serviceName()); } } catch (Exception | NoClassDefFoundError e) { - LOG.error( + LOG.warn( Review Comment: The actual 2 providers are so in a bad shape (inherited from the old code) that I see this the best as TMP solution. My plan is to extract HDFS and HBase token providers to an external repo. When that happens we must put back fail fast here. Feel free to suggest something if you have in mind which is not horror complex. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org