pvary commented on code in PR #21511:
URL: https://github.com/apache/flink/pull/21511#discussion_r1049939031


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java:
##########
@@ -45,91 +42,86 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
+import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
+import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Manager for delegation tokens in a Flink cluster.
  *
  * <p>When delegation token renewal is enabled, this manager will make sure 
long-running apps can
- * run without interruption while accessing secured services. It periodically 
logs in to the KDC
- * with user-provided credentials, and contacts all the configured secure 
services to obtain
- * delegation tokens to be distributed to the rest of the application.
+ * run without interruption while accessing secured services. It periodically 
contacts all the
+ * configured secure services to obtain delegation tokens to be distributed to 
the rest of the
+ * application.
  */
 @Internal
-public class KerberosDelegationTokenManager implements DelegationTokenManager {
+public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KerberosDelegationTokenManager.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
 
     private final Configuration configuration;
 
     private final double tokensRenewalTimeRatio;
 
     private final long renewalRetryBackoffPeriod;
 
-    private final KerberosLoginProvider kerberosLoginProvider;
-
-    @VisibleForTesting final Map<String, HadoopDelegationTokenProvider> 
delegationTokenProviders;
+    @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
     @Nullable private final ScheduledExecutor scheduledExecutor;
 
     @Nullable private final ExecutorService ioExecutor;
 
-    @Nullable private ScheduledFuture<?> tgtRenewalFuture;
-
     private final Object tokensUpdateFutureLock = new Object();
 
     @GuardedBy("tokensUpdateFutureLock")
     @Nullable
     private ScheduledFuture<?> tokensUpdateFuture;
 
-    @Nullable private DelegationTokenListener delegationTokenListener;
+    @Nullable private Listener listener;
 
-    public KerberosDelegationTokenManager(
+    public DefaultDelegationTokenManager(
             Configuration configuration,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor) {
         this.configuration = checkNotNull(configuration, "Flink configuration 
must not be null");
-        this.tokensRenewalTimeRatio = 
configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
+        this.tokensRenewalTimeRatio = 
configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
         this.renewalRetryBackoffPeriod =
-                
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
-        this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
+                
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
         this.delegationTokenProviders = loadProviders();
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
     }
 
-    private Map<String, HadoopDelegationTokenProvider> loadProviders() {
+    private Map<String, DelegationTokenProvider> loadProviders() {
         LOG.info("Loading delegation token providers");
 
-        ServiceLoader<HadoopDelegationTokenProvider> serviceLoader =
-                ServiceLoader.load(HadoopDelegationTokenProvider.class);
+        ServiceLoader<DelegationTokenProvider> serviceLoader =
+                ServiceLoader.load(DelegationTokenProvider.class);
 
-        Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>();
-        for (HadoopDelegationTokenProvider provider : serviceLoader) {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        for (DelegationTokenProvider provider : serviceLoader) {
             try {
                 if (isProviderEnabled(provider.serviceName())) {
                     provider.init(configuration);
                     LOG.info(
                             "Delegation token provider {} loaded and 
initialized",
                             provider.serviceName());
+                    checkState(

Review Comment:
   I am fine with the solution, I was just interested in how this is solved in 
other places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to