This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 324b54cf413 [FLINK-30425][runtime][security] Generalize token receive 
side
324b54cf413 is described below

commit 324b54cf413229f78e6e709aaaa2e16a03d45df1
Author: Gabor Somogyi <gabor_somog...@apple.com>
AuthorDate: Mon Jan 2 15:30:45 2023 +0100

    [FLINK-30425][runtime][security] Generalize token receive side
---
 flink-end-to-end-tests/test-scripts/common.sh      |   1 +
 .../flink/runtime/minicluster/MiniCluster.java     |  10 +-
 .../token/DefaultDelegationTokenManager.java       |  64 ++++++++--
 .../security/token/DelegationTokenProvider.java    |  13 +-
 .../security/token/DelegationTokenReceiver.java    |  61 +++++++++
 .../token/DelegationTokenReceiverRepository.java   | 140 +++++++++++++++++++++
 .../token/hadoop/HBaseDelegationTokenProvider.java |  27 +++-
 .../token/hadoop/HBaseDelegationTokenReceiver.java |  35 ++++++
 ...ter.java => HadoopDelegationTokenReceiver.java} |  39 +++---
 .../hadoop/HadoopFSDelegationTokenProvider.java    |  31 ++++-
 .../hadoop/HadoopFSDelegationTokenReceiver.java    |  31 +++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  12 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  22 +++-
 ....runtime.security.token.DelegationTokenReceiver |  17 +++
 .../token/DefaultDelegationTokenManagerTest.java   |  96 ++++++++++++--
 .../DelegationTokenReceiverRepositoryTest.java     |  75 +++++++++++
 .../ExceptionThrowingDelegationTokenReceiver.java  |  60 +++++++++
 .../token/TestDelegationTokenReceiver.java         |  36 ++++++
 ...va => HadoopDelegationTokenReceiverITCase.java} |  55 ++++----
 .../runtime/taskexecutor/TaskExecutorBuilder.java  |   7 +-
 ...cutorExecutionDeploymentReconciliationTest.java |   4 +-
 .../TaskExecutorPartitionLifecycleTest.java        |   4 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |   4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   7 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |   4 +-
 .../taskexecutor/TaskManagerRunnerTest.java        |   7 +-
 .../TaskSubmissionTestEnvironment.java             |   4 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   7 +-
 ....runtime.security.token.DelegationTokenReceiver |  17 +++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   7 +-
 30 files changed, 792 insertions(+), 105 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 31741710b3a..047a05a7bb4 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -380,6 +380,7 @@ function check_logs_for_errors {
       | grep -v "Error sending fetch request" \
       | grep -v "WARN  akka.remote.ReliableDeliverySupervisor" \
       | grep -v "Options.*error_*" \
+      | grep -v "not packaged with this application" \
       | grep -ic "error" || true)
   if [[ ${error_count} -gt 0 ]]; then
     echo "Found error in log files; printing first 500 lines; see full logs 
for details:"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 59240a14c03..1d48126e141 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -91,6 +91,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import 
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
@@ -199,6 +200,9 @@ public class MiniCluster implements AutoCloseableAsync {
     @GuardedBy("lock")
     private DelegationTokenManager delegationTokenManager;
 
+    @GuardedBy("lock")
+    private DelegationTokenReceiverRepository 
delegationTokenReceiverRepository;
+
     @GuardedBy("lock")
     private BlobCacheService blobCacheService;
 
@@ -431,6 +435,9 @@ public class MiniCluster implements AutoCloseableAsync {
                         DefaultDelegationTokenManagerFactory.create(
                                 configuration, 
commonRpcService.getScheduledExecutor(), ioExecutor);
 
+                delegationTokenReceiverRepository =
+                        new DelegationTokenReceiverRepository(configuration);
+
                 blobCacheService =
                         BlobUtils.createBlobCacheService(
                                 configuration,
@@ -749,7 +756,8 @@ public class MiniCluster implements AutoCloseableAsync {
                             ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                             workingDirectory.createSubWorkingDirectory("tm_" + 
taskManagers.size()),
                             
taskManagerTerminatingFatalErrorHandlerFactory.create(
-                                    taskManagers.size()));
+                                    taskManagers.size()),
+                            delegationTokenReceiverRepository);
 
             taskExecutor.start();
             taskManagers.add(taskExecutor);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index 1703d796dec..fb5a195d2cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.security.token;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -34,9 +33,11 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.time.Clock;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +59,13 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
+    private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR =
+            "There is an inconsistency between loaded delegation token 
providers and receivers. "
+                    + "One must implement a DelegationTokenProvider and a 
DelegationTokenReceiver "
+                    + "with the same service name and add them together to the 
classpath to make "
+                    + "the system consistent. The mentioned classes are loaded 
with Java's service "
+                    + "loader so the appropriate META-INF registration also 
needs to be created.";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
 
     private final Configuration configuration;
@@ -68,6 +76,8 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
 
     @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
+    private final DelegationTokenReceiverRepository 
delegationTokenReceiverRepository;
+
     @Nullable private final ScheduledExecutor scheduledExecutor;
 
     @Nullable private final ExecutorService ioExecutor;
@@ -89,8 +99,13 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         this.renewalRetryBackoffPeriod =
                 
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
         this.delegationTokenProviders = loadProviders();
+        this.delegationTokenReceiverRepository =
+                new DelegationTokenReceiverRepository(configuration);
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
+        checkProviderAndReceiverConsistency(
+                delegationTokenProviders,
+                delegationTokenReceiverRepository.delegationTokenReceivers);
     }
 
     private Map<String, DelegationTokenProvider> loadProviders() {
@@ -102,7 +117,7 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         Map<String, DelegationTokenProvider> providers = new HashMap<>();
         for (DelegationTokenProvider provider : serviceLoader) {
             try {
-                if (isProviderEnabled(provider.serviceName())) {
+                if (isProviderEnabled(configuration, provider.serviceName())) {
                     provider.init(configuration);
                     LOG.info(
                             "Delegation token provider {} loaded and 
initialized",
@@ -118,10 +133,13 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
                             provider.serviceName());
                 }
             } catch (Exception | NoClassDefFoundError e) {
-                LOG.warn(
+                // The intentional general rule is that if a provider's init 
method throws exception
+                // then stop the workload
+                LOG.error(
                         "Failed to initialize delegation token provider {}",
                         provider.serviceName(),
                         e);
+                throw new FlinkRuntimeException(e);
             }
         }
 
@@ -130,8 +148,7 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         return providers;
     }
 
-    @VisibleForTesting
-    boolean isProviderEnabled(String serviceName) {
+    static boolean isProviderEnabled(Configuration configuration, String 
serviceName) {
         return configuration.getBoolean(
                 String.format("security.delegation.token.provider.%s.enabled", 
serviceName), true);
     }
@@ -141,6 +158,38 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         return delegationTokenProviders.containsKey(serviceName);
     }
 
+    @VisibleForTesting
+    boolean isReceiverLoaded(String serviceName) {
+        return delegationTokenReceiverRepository.isReceiverLoaded(serviceName);
+    }
+
+    @VisibleForTesting
+    static void checkProviderAndReceiverConsistency(
+            Map<String, DelegationTokenProvider> providers,
+            Map<String, DelegationTokenReceiver> receivers) {
+        LOG.info("Checking provider and receiver instances consistency");
+        if (providers.size() != receivers.size()) {
+            Set<String> missingReceiverServiceNames = new 
HashSet<>(providers.keySet());
+            missingReceiverServiceNames.removeAll(receivers.keySet());
+            if (!missingReceiverServiceNames.isEmpty()) {
+                throw new IllegalStateException(
+                        PROVIDER_RECEIVER_INCONSISTENCY_ERROR
+                                + " Missing receivers: "
+                                + String.join(",", 
missingReceiverServiceNames));
+            }
+
+            Set<String> missingProviderServiceNames = new 
HashSet<>(receivers.keySet());
+            missingProviderServiceNames.removeAll(providers.keySet());
+            if (!missingProviderServiceNames.isEmpty()) {
+                throw new IllegalStateException(
+                        PROVIDER_RECEIVER_INCONSISTENCY_ERROR
+                                + " Missing providers: "
+                                + String.join(",", 
missingProviderServiceNames));
+            }
+        }
+        LOG.info("Provider and receiver instances are consistent");
+    }
+
     /**
      * Obtains new tokens in a one-time fashion and leaves it up to the caller 
to distribute them.
      */
@@ -212,12 +261,11 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
             Optional<Long> nextRenewal = 
obtainDelegationTokensAndGetNextRenewal(container);
 
             if (container.hasTokens()) {
-                byte[] containerBytes = 
InstantiationUtil.serializeObject(container);
-                
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
+                
delegationTokenReceiverRepository.onNewTokensObtained(container);
 
                 LOG.info("Notifying listener about new tokens");
                 checkNotNull(listener, "Listener must not be null");
-                listener.onNewTokensObtained(containerBytes);
+                
listener.onNewTokensObtained(InstantiationUtil.serializeObject(container));
                 LOG.info("Listener notified successfully");
             } else {
                 LOG.warn("No tokens obtained so skipping listener 
notification");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
index d98e5816fa9..c6d7dcea413 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
@@ -25,10 +25,16 @@ import java.util.Optional;
 
 /**
  * Delegation token provider API. Instances of {@link 
DelegationTokenProvider}s are loaded by {@link
- * DelegationTokenManager} through service loader.
+ * DelegationTokenManager} through service loader. Basically the 
implementation of this interface is
+ * responsible to produce the serialized form of tokens which will be handled 
by {@link
+ * DelegationTokenReceiver} instances both on JobManager and TaskManager side.
  */
 @Experimental
 public interface DelegationTokenProvider {
+
+    /** Config prefix of providers. */
+    String CONFIG_PREFIX = "security.delegation.token.provider";
+
     /** Container for obtained delegation tokens. */
     class ObtainedDelegationTokens {
         /** Serialized form of delegation tokens. */
@@ -57,6 +63,11 @@ public interface DelegationTokenProvider {
     /** Name of the service to provide delegation tokens. This name should be 
unique. */
     String serviceName();
 
+    /** Config prefix of the service. */
+    default String serviceConfigPrefix() {
+        return String.format("%s.%s", CONFIG_PREFIX, serviceName());
+    }
+
     /**
      * Called by {@link DelegationTokenManager} to initialize provider after 
construction.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
new file mode 100644
index 00000000000..b296186edc3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Delegation token receiver API. Instances of {@link 
DelegationTokenReceiver}s are loaded both on
+ * JobManager and TaskManager side through service loader. Basically the 
implementation of this
+ * interface is responsible to receive the serialized form of tokens produced 
by {@link
+ * DelegationTokenProvider}.
+ */
+@Experimental
+public interface DelegationTokenReceiver {
+
+    /** Config prefix of receivers. */
+    String CONFIG_PREFIX = "security.delegation.token.receiver";
+
+    /**
+     * Name of the service to receive delegation tokens for. This name should 
be unique and the same
+     * as the one provided in the corresponding {@link 
DelegationTokenProvider}.
+     */
+    String serviceName();
+
+    /** Config prefix of the service. */
+    default String serviceConfigPrefix() {
+        return String.format("%s.%s", CONFIG_PREFIX, serviceName());
+    }
+
+    /**
+     * Called to initialize receiver after construction.
+     *
+     * @param configuration Configuration to initialize the receiver.
+     */
+    void init(Configuration configuration) throws Exception;
+
+    /**
+     * Callback function when new delegation tokens obtained.
+     *
+     * @param tokens Serialized form of delegation tokens. Must be 
deserialized the reverse way
+     *     which is implemented in {@link DelegationTokenProvider}.
+     */
+    void onNewTokensObtained(byte[] tokens) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
new file mode 100644
index 00000000000..8e63c3b4390
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import static 
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Repository for delegation token receivers. */
+@Internal
+public class DelegationTokenReceiverRepository {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DelegationTokenReceiverRepository.class);
+
+    private final Configuration configuration;
+
+    @VisibleForTesting final Map<String, DelegationTokenReceiver> 
delegationTokenReceivers;
+
+    public DelegationTokenReceiverRepository(Configuration configuration) {
+        this.configuration = checkNotNull(configuration, "Flink configuration 
must not be null");
+        this.delegationTokenReceivers = loadReceivers();
+    }
+
+    private Map<String, DelegationTokenReceiver> loadReceivers() {
+        LOG.info("Loading delegation token receivers");
+
+        ServiceLoader<DelegationTokenReceiver> serviceLoader =
+                ServiceLoader.load(DelegationTokenReceiver.class);
+
+        Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+        for (DelegationTokenReceiver receiver : serviceLoader) {
+            try {
+                if (isProviderEnabled(configuration, receiver.serviceName())) {
+                    receiver.init(configuration);
+                    LOG.info(
+                            "Delegation token receiver {} loaded and 
initialized",
+                            receiver.serviceName());
+                    checkState(
+                            !receivers.containsKey(receiver.serviceName()),
+                            "Delegation token receiver with service name {} 
has multiple implementations",
+                            receiver.serviceName());
+                    receivers.put(receiver.serviceName(), receiver);
+                } else {
+                    LOG.info(
+                            "Delegation token receiver {} is disabled so not 
loaded",
+                            receiver.serviceName());
+                }
+            } catch (Exception | NoClassDefFoundError e) {
+                // The intentional general rule is that if a receiver's init 
method throws exception
+                // then stop the workload
+                LOG.error(
+                        "Failed to initialize delegation token receiver {}",
+                        receiver.serviceName(),
+                        e);
+                throw new FlinkRuntimeException(e);
+            }
+        }
+
+        LOG.info("Delegation token receivers loaded successfully");
+
+        return receivers;
+    }
+
+    @VisibleForTesting
+    boolean isReceiverLoaded(String serviceName) {
+        return delegationTokenReceivers.containsKey(serviceName);
+    }
+
+    /**
+     * Callback function when new delegation tokens obtained.
+     *
+     * @param containerBytes Serialized form of a DelegationTokenContainer. 
All the available tokens
+     *     will be forwarded to the appropriate {@link 
DelegationTokenReceiver} based on service
+     *     name.
+     */
+    public void onNewTokensObtained(byte[] containerBytes) throws Exception {
+        if (containerBytes == null || containerBytes.length == 0) {
+            throw new IllegalArgumentException("Illegal container tried to be 
processed");
+        }
+        DelegationTokenContainer container =
+                InstantiationUtil.deserializeObject(
+                        containerBytes, 
DelegationTokenContainer.class.getClassLoader());
+        onNewTokensObtained(container);
+    }
+
+    /**
+     * Callback function when new delegation tokens obtained.
+     *
+     * @param container Serialized form of delegation tokens stored in 
DelegationTokenContainer. All
+     *     the available tokens will be forwarded to the appropriate {@link 
DelegationTokenReceiver}
+     *     based on service name.
+     */
+    public void onNewTokensObtained(DelegationTokenContainer container) throws 
Exception {
+        LOG.info("New delegation tokens arrived, sending them to receivers");
+        for (Map.Entry<String, byte[]> entry : 
container.getTokens().entrySet()) {
+            String serviceName = entry.getKey();
+            byte[] tokens = entry.getValue();
+            if (!delegationTokenReceivers.containsKey(serviceName)) {
+                throw new IllegalStateException(
+                        "Tokens arrived for service but no receiver found for 
it: " + serviceName);
+            }
+            try {
+                
delegationTokenReceivers.get(serviceName).onNewTokensObtained(tokens);
+            } catch (Exception e) {
+                LOG.warn("Failed to send tokens to delegation token receiver 
{}", serviceName, e);
+            }
+        }
+        LOG.info("Delegation tokens sent to receivers");
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 572a49eb8c3..7e8dd66ca54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.security.token.hadoop;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.security.token.DelegationTokenProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
@@ -34,7 +34,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -42,7 +41,7 @@ import java.util.Optional;
  * flink-connector-hbase-base but HBase connection can be made without the 
connector. All in all I
  * tend to move this but that would be a breaking change.
  */
-@Experimental
+@Internal
 public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
@@ -80,7 +79,8 @@ public class HBaseDelegationTokenProvider implements 
DelegationTokenProvider {
         } catch (InvocationTargetException
                 | NoSuchMethodException
                 | IllegalAccessException
-                | ClassNotFoundException e) {
+                | ClassNotFoundException
+                | NoClassDefFoundError e) {
             LOG.info(
                     "HBase is not available (not packaged with this 
application): {} : \"{}\".",
                     e.getClass().getSimpleName(),
@@ -91,6 +91,22 @@ public class HBaseDelegationTokenProvider implements 
DelegationTokenProvider {
 
     @Override
     public boolean delegationTokensRequired() throws Exception {
+        /**
+         * The general rule how a provider/receiver must behave is the 
following: The provider and
+         * the receiver must be added to the classpath together with all the 
additionally required
+         * dependencies.
+         *
+         * <p>This null check is required because the HBase provider is always 
on classpath but
+         * HBase jars are optional. Such case configuration is not able to be 
loaded. This construct
+         * is intended to be removed when HBase provider/receiver pair can be 
externalized (namely
+         * if a provider/receiver throws an exception then workload must be 
stopped).
+         */
+        if (hbaseConf == null) {
+            LOG.debug(
+                    "HBase is not available (not packaged with this 
application), hence no "
+                            + "tokens will be acquired.");
+            return false;
+        }
         try {
             if 
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) 
{
                 return false;
@@ -99,8 +115,7 @@ public class HBaseDelegationTokenProvider implements 
DelegationTokenProvider {
             LOG.debug("Hadoop Kerberos is not enabled.");
             return false;
         }
-        return Objects.nonNull(hbaseConf)
-                && 
hbaseConf.get("hbase.security.authentication").equals("kerberos")
+        return 
hbaseConf.get("hbase.security.authentication").equals("kerberos")
                 && kerberosLoginProvider.isLoginPossible();
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
new file mode 100644
index 00000000000..9fba0cc5604
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token.hadoop;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Delegation token receiver implementation for HBase. Basically it would be 
good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the 
connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+@Internal
+public class HBaseDelegationTokenReceiver extends 
HadoopDelegationTokenReceiver {
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
similarity index 54%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
index d6ec75797a9..0b1cba0f738 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
@@ -19,46 +19,43 @@
 package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.token.DelegationTokenContainer;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenReceiver;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Delegation token updater functionality. */
+/** Hadoop delegation token receiver base class. */
 @Internal
-public final class HadoopDelegationTokenUpdater {
+public abstract class HadoopDelegationTokenReceiver implements 
DelegationTokenReceiver {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class);
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private HadoopDelegationTokenUpdater() {}
+    public abstract String serviceName();
 
-    /** Updates delegation tokens for the current user. */
-    public static void addCurrentUserCredentials(byte[] containerBytes) throws 
Exception {
-        if (containerBytes == null || containerBytes.length == 0) {
-            throw new IllegalArgumentException("Illegal container tried to be 
processed");
-        }
-        DelegationTokenContainer container =
-                InstantiationUtil.deserializeObject(
-                        containerBytes, 
HadoopDelegationTokenUpdater.class.getClassLoader());
-        Credentials credentials = new Credentials();
-        for (byte[] v : container.getTokens().values()) {
-            credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
+    public void init(Configuration configuration) throws Exception {}
+
+    @Override
+    public void onNewTokensObtained(byte[] tokens) throws Exception {
+        if (tokens == null || tokens.length == 0) {
+            throw new IllegalArgumentException("Illegal tokens tried to be 
processed");
         }
-        LOG.info("Updating delegation tokens for current user");
+        Credentials credentials = 
HadoopDelegationTokenConverter.deserialize(tokens);
+
+        log.info("Updating delegation tokens for current user");
         dumpAllTokens(credentials);
         UserGroupInformation.getCurrentUser().addCredentials(credentials);
-        LOG.info("Updated delegation tokens for current user successfully");
+        log.info("Updated delegation tokens for current user successfully");
     }
 
-    public static void dumpAllTokens(Credentials credentials) {
+    private void dumpAllTokens(Credentials credentials) {
         credentials
                 .getAllTokens()
                 .forEach(
                         token ->
-                                LOG.info(
+                                log.info(
                                         "Token Service:{} Identifier:{}",
                                         token.getService(),
                                         token.getIdentifier()));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index a6a13f3a6cb..23202d27df9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.security.token.hadoop;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
@@ -47,7 +47,7 @@ import java.util.Optional;
 import java.util.Set;
 
 /** Delegation token provider for Hadoop filesystems. */
-@Experimental
+@Internal
 public class HadoopFSDelegationTokenProvider implements 
DelegationTokenProvider {
 
     private static final Logger LOG =
@@ -69,12 +69,35 @@ public class HadoopFSDelegationTokenProvider implements 
DelegationTokenProvider
     @Override
     public void init(Configuration configuration) throws Exception {
         flinkConfiguration = configuration;
-        hadoopConfiguration = 
HadoopUtils.getHadoopConfiguration(configuration);
-        kerberosLoginProvider = new KerberosLoginProvider(configuration);
+        try {
+            hadoopConfiguration = 
HadoopUtils.getHadoopConfiguration(configuration);
+            kerberosLoginProvider = new KerberosLoginProvider(configuration);
+        } catch (NoClassDefFoundError e) {
+            LOG.info(
+                    "Hadoop FS is not available (not packaged with this 
application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
     }
 
     @Override
     public boolean delegationTokensRequired() throws Exception {
+        /**
+         * The general rule how a provider/receiver must behave is the 
following: The provider and
+         * the receiver must be added to the classpath together with all the 
additionally required
+         * dependencies.
+         *
+         * <p>This null check is required because the Hadoop FS provider is 
always on classpath but
+         * Hadoop FS jars are optional. Such case configuration is not able to 
be loaded. This
+         * construct is intended to be removed when HBase provider/receiver 
pair can be externalized
+         * (namely if a provider/receiver throws an exception then workload 
must be stopped).
+         */
+        if (hadoopConfiguration == null) {
+            LOG.debug(
+                    "Hadoop FS is not available (not packaged with this 
application), hence no "
+                            + "tokens will be acquired.");
+            return false;
+        }
         return 
HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())
                 && kerberosLoginProvider.isLoginPossible();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
new file mode 100644
index 00000000000..113914a8651
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token.hadoop;
+
+import org.apache.flink.annotation.Internal;
+
+/** Delegation token receiver for Hadoop filesystems. */
+@Internal
+public class HadoopFSDelegationTokenReceiver extends 
HadoopDelegationTokenReceiver {
+
+    @Override
+    public String serviceName() {
+        return "hadoopfs";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 218ecdaab46..9c3dd3744af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
-import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -263,6 +263,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
     private final TaskExecutorPartitionTracker partitionTracker;
 
+    private final DelegationTokenReceiverRepository 
delegationTokenReceiverRepository;
+
     // --------- resource manager --------
 
     @Nullable private ResourceManagerAddress resourceManagerAddress;
@@ -289,7 +291,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             @Nullable String metricQueryServiceAddress,
             TaskExecutorBlobService taskExecutorBlobService,
             FatalErrorHandler fatalErrorHandler,
-            TaskExecutorPartitionTracker partitionTracker) {
+            TaskExecutorPartitionTracker partitionTracker,
+            DelegationTokenReceiverRepository 
delegationTokenReceiverRepository) {
 
         super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
 
@@ -302,6 +305,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         this.haServices = checkNotNull(haServices);
         this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
         this.partitionTracker = partitionTracker;
+        this.delegationTokenReceiverRepository = 
checkNotNull(delegationTokenReceiverRepository);
         this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
         this.taskExecutorBlobService = checkNotNull(taskExecutorBlobService);
         this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -1346,7 +1350,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         }
 
         try {
-            HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
+            delegationTokenReceiverRepository.onNewTokensObtained(tokens);
             return CompletableFuture.completedFuture(Acknowledge.get());
         } catch (Throwable t) {
             log.error("Could not update delegation tokens.", t);
@@ -2386,7 +2390,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             if (tokens != null) {
                 try {
                     log.info("Receive initial delegation tokens from resource 
manager");
-                    
HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
+                    
delegationTokenReceiverRepository.onNewTokensObtained(tokens);
                 } catch (Throwable t) {
                     log.error("Could not update delegation tokens.", t);
                     ExceptionUtils.rethrowIfFatalError(t);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0f08ccdaf7a..a7207e1fd8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.rpc.RpcSystemUtils;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
 import org.apache.flink.runtime.taskmanager.MemoryLogger;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -241,6 +242,9 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                     
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
                             configuration, pluginManager);
 
+            final DelegationTokenReceiverRepository 
delegationTokenReceiverRepository =
+                    new DelegationTokenReceiverRepository(configuration);
+
             taskExecutorService =
                     taskExecutorServiceFactory.createTaskExecutor(
                             this.configuration,
@@ -253,7 +257,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                             false,
                             externalResourceInfoProvider,
                             workingDirectory.unwrap(),
-                            this);
+                            this,
+                            delegationTokenReceiverRepository);
 
             handleUnexpectedTaskExecutorServiceTermination();
 
@@ -552,7 +557,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             boolean localCommunicationOnly,
             ExternalResourceInfoProvider externalResourceInfoProvider,
             WorkingDirectory workingDirectory,
-            FatalErrorHandler fatalErrorHandler)
+            FatalErrorHandler fatalErrorHandler,
+            DelegationTokenReceiverRepository 
delegationTokenReceiverRepository)
             throws Exception {
 
         final TaskExecutor taskExecutor =
@@ -567,7 +573,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                         localCommunicationOnly,
                         externalResourceInfoProvider,
                         workingDirectory,
-                        fatalErrorHandler);
+                        fatalErrorHandler,
+                        delegationTokenReceiverRepository);
 
         return TaskExecutorToServiceAdapter.createFor(taskExecutor);
     }
@@ -583,7 +590,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             boolean localCommunicationOnly,
             ExternalResourceInfoProvider externalResourceInfoProvider,
             WorkingDirectory workingDirectory,
-            FatalErrorHandler fatalErrorHandler)
+            FatalErrorHandler fatalErrorHandler,
+            DelegationTokenReceiverRepository 
delegationTokenReceiverRepository)
             throws Exception {
 
         checkNotNull(configuration);
@@ -653,7 +661,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                 metricQueryServiceAddress,
                 taskExecutorBlobService,
                 fatalErrorHandler,
-                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+                delegationTokenReceiverRepository);
     }
 
     /**
@@ -776,7 +785,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                 boolean localCommunicationOnly,
                 ExternalResourceInfoProvider externalResourceInfoProvider,
                 WorkingDirectory workingDirectory,
-                FatalErrorHandler fatalErrorHandler)
+                FatalErrorHandler fatalErrorHandler,
+                DelegationTokenReceiverRepository 
delegationTokenReceiverRepository)
                 throws Exception;
     }
 
diff --git 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..fc1f34c11b8
--- /dev/null
+++ 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenReceiver
+org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenReceiver
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index 8092f330a99..ed34ea978de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -22,16 +22,20 @@ import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Clock;
 import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Instant.ofEpochMilli;
 import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
+import static 
org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -43,30 +47,28 @@ public class DefaultDelegationTokenManagerTest {
     @BeforeEach
     public void beforeEach() {
         ExceptionThrowingDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenReceiver.reset();
     }
 
-    @AfterAll
-    public static void afterAll() {
+    @AfterEach
+    public void afterEach() {
         ExceptionThrowingDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenReceiver.reset();
     }
 
     @Test
     public void isProviderEnabledMustGiveBackTrueByDefault() {
         Configuration configuration = new Configuration();
-        DefaultDelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(configuration, null, null);
 
-        assertTrue(delegationTokenManager.isProviderEnabled("test"));
+        
assertTrue(DefaultDelegationTokenManager.isProviderEnabled(configuration, 
"test"));
     }
 
     @Test
     public void isProviderEnabledMustGiveBackFalseWhenDisabled() {
         Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.delegation.token.provider.test.enabled", 
false);
-        DefaultDelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(configuration, null, null);
+        configuration.setBoolean(CONFIG_PREFIX + ".test.enabled", false);
 
-        assertFalse(delegationTokenManager.isProviderEnabled("test"));
+        
assertFalse(DefaultDelegationTokenManager.isProviderEnabled(configuration, 
"test"));
     }
 
     @Test
@@ -74,19 +76,87 @@ public class DefaultDelegationTokenManagerTest {
         assertThrows(Exception.class, () -> new 
DefaultDelegationTokenManager(null, null, null));
     }
 
+    @Test
+    public void oneProviderThrowsExceptionMustFailFast() {
+        assertThrows(
+                Exception.class,
+                () -> {
+                    ExceptionThrowingDelegationTokenProvider.throwInInit = 
true;
+                    new DefaultDelegationTokenManager(new Configuration(), 
null, null);
+                });
+    }
+
     @Test
     public void testAllProvidersLoaded() {
         Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
false);
+        configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
         DefaultDelegationTokenManager delegationTokenManager =
                 new DefaultDelegationTokenManager(configuration, null, null);
 
         assertEquals(3, 
delegationTokenManager.delegationTokenProviders.size());
+
         assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
+        assertTrue(delegationTokenManager.isReceiverLoaded("hadoopfs"));
+
         assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
+        assertTrue(delegationTokenManager.isReceiverLoaded("hbase"));
+
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
+        assertTrue(delegationTokenManager.isReceiverLoaded("test"));
+
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
+        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
+        assertFalse(delegationTokenManager.isReceiverLoaded("throw"));
+    }
+
+    @Test
+    public void 
checkProviderAndReceiverConsistencyShouldNotThrowWhenNothingLoaded() {
+        DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+                Collections.emptyMap(), Collections.emptyMap());
+    }
+
+    @Test
+    public void 
checkProviderAndReceiverConsistencyShouldThrowWhenMissingReceiver() {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        providers.put("test", new TestDelegationTokenProvider());
+
+        IllegalStateException e =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+                                        providers, Collections.emptyMap()));
+        assertTrue(e.getMessage().contains("Missing receivers: test"));
+    }
+
+    @Test
+    public void 
checkProviderAndReceiverConsistencyShouldThrowWhenMissingProvider() {
+        Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+        receivers.put("test", new TestDelegationTokenReceiver());
+
+        IllegalStateException e =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+                                        Collections.emptyMap(), receivers));
+        assertTrue(e.getMessage().contains("Missing providers: test"));
+    }
+
+    @Test
+    public void 
checkProviderAndReceiverConsistencyShouldNotThrowWhenBothLoaded() {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        providers.put("test", new TestDelegationTokenProvider());
+        Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+        receivers.put("test", new TestDelegationTokenReceiver());
+
+        
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(providers, 
receivers);
+
+        assertEquals(1, providers.size());
+        assertTrue(providers.containsKey("test"));
+        assertEquals(1, receivers.size());
+        assertTrue(receivers.containsKey("test"));
     }
 
     @Test
@@ -98,7 +168,7 @@ public class DefaultDelegationTokenManagerTest {
 
         ExceptionThrowingDelegationTokenProvider.addToken = true;
         Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
true);
+        configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
         AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
         DefaultDelegationTokenManager delegationTokenManager =
                 new DefaultDelegationTokenManager(configuration, 
scheduledExecutor, scheduler) {
@@ -124,7 +194,7 @@ public class DefaultDelegationTokenManagerTest {
     @Test
     public void calculateRenewalDelayShouldConsiderRenewalRatio() {
         Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
false);
+        configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
         configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5);
         DefaultDelegationTokenManager delegationTokenManager =
                 new DefaultDelegationTokenManager(configuration, null, null);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
new file mode 100644
index 00000000000..db93128d7ab
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link DelegationTokenReceiverRepository}. */
+class DelegationTokenReceiverRepositoryTest {
+
+    @BeforeEach
+    public void beforeEach() {
+        ExceptionThrowingDelegationTokenReceiver.reset();
+    }
+
+    @AfterEach
+    public void afterEach() {
+        ExceptionThrowingDelegationTokenReceiver.reset();
+    }
+
+    @Test
+    public void configurationIsNullMustFailFast() {
+        assertThrows(Exception.class, () -> new 
DelegationTokenReceiverRepository(null));
+    }
+
+    @Test
+    public void oneReceiverThrowsExceptionMustFailFast() {
+        assertThrows(
+                Exception.class,
+                () -> {
+                    ExceptionThrowingDelegationTokenReceiver.throwInInit = 
true;
+                    new DelegationTokenReceiverRepository(new Configuration());
+                });
+    }
+
+    @Test
+    public void testAllReceiversLoaded() {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
+        DelegationTokenReceiverRepository delegationTokenReceiverRepository =
+                new DelegationTokenReceiverRepository(configuration);
+
+        assertEquals(3, 
delegationTokenReceiverRepository.delegationTokenReceivers.size());
+        
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
+        
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hbase"));
+        assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("test"));
+        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+        
assertFalse(delegationTokenReceiverRepository.isReceiverLoaded("throw"));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
new file mode 100644
index 00000000000..41e72c11cce
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An example implementation of {@link DelegationTokenReceiver} which throws 
exception when enabled.
+ */
+public class ExceptionThrowingDelegationTokenReceiver implements 
DelegationTokenReceiver {
+
+    public static volatile boolean throwInInit = false;
+    public static volatile boolean throwInUsage = false;
+    public static volatile boolean constructed = false;
+
+    public static void reset() {
+        throwInInit = false;
+        throwInUsage = false;
+        constructed = false;
+    }
+
+    public ExceptionThrowingDelegationTokenReceiver() {
+        constructed = true;
+    }
+
+    @Override
+    public String serviceName() {
+        return "throw";
+    }
+
+    @Override
+    public void init(Configuration configuration) {
+        if (throwInInit) {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    @Override
+    public void onNewTokensObtained(byte[] tokens) throws Exception {
+        if (throwInUsage) {
+            throw new IllegalArgumentException();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
new file mode 100644
index 00000000000..405d026a3a1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+/** An example implementation of {@link DelegationTokenReceiver} which does 
nothing. */
+public class TestDelegationTokenReceiver implements DelegationTokenReceiver {
+
+    @Override
+    public String serviceName() {
+        return "test";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {}
+
+    @Override
+    public void onNewTokensObtained(byte[] tokens) throws Exception {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
similarity index 60%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
index 7da8411dc96..9f98b4b2304 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.security.token.hadoop;
 
-import org.apache.flink.runtime.security.token.DelegationTokenContainer;
-import org.apache.flink.util.InstantiationUtil;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -37,51 +34,55 @@ import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-/** Test for {@link HadoopDelegationTokenConverter}. */
-public class HadoopDelegationTokenUpdaterITCase {
+/** Test for {@link HadoopDelegationTokenReceiver}. */
+public class HadoopDelegationTokenReceiverITCase {
 
     @Test
-    public void 
addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() {
-        addCurrentUserCredentialsShouldThrowException(null);
+    public void onNewTokensObtainedShouldThrowExceptionWhenNullCredentials() {
+        onNewTokensObtainedShouldThrowException(null);
     }
 
     @Test
-    public void 
addCurrentUserCredentialsShouldThrowExceptionWhenEmptyCredentials() {
-        addCurrentUserCredentialsShouldThrowException(new byte[0]);
+    public void onNewTokensObtainedShouldThrowExceptionWhenEmptyCredentials() {
+        onNewTokensObtainedShouldThrowException(new byte[0]);
     }
 
-    private void addCurrentUserCredentialsShouldThrowException(byte[] 
credentialsBytes) {
-        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
-            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
-            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
-
-            IllegalArgumentException e =
-                    assertThrows(
-                            IllegalArgumentException.class,
-                            () ->
-                                    
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
-                                            credentialsBytes));
-            assertTrue(e.getMessage().contains("Illegal container"));
-        }
+    private void onNewTokensObtainedShouldThrowException(byte[] 
credentialsBytes) {
+        HadoopDelegationTokenReceiver receiver =
+                new HadoopDelegationTokenReceiver() {
+                    @Override
+                    public String serviceName() {
+                        return "test";
+                    }
+                };
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> receiver.onNewTokensObtained(credentialsBytes));
+        assertTrue(e.getMessage().contains("Illegal tokens"));
     }
 
     @Test
-    public void addCurrentUserCredentialsShouldOverwriteCredentials() throws 
Exception {
+    public void onNewTokensObtainedShouldOverwriteCredentials() throws 
Exception {
         final Text tokenKind = new Text("TEST_TOKEN_KIND");
         final Text tokenService = new Text("TEST_TOKEN_SERVICE");
         Credentials credentials = new Credentials();
         credentials.addToken(
                 tokenService, new Token<>(new byte[4], new byte[4], tokenKind, 
tokenService));
         byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
-        DelegationTokenContainer container = new DelegationTokenContainer();
-        container.addToken("TEST_TOKEN_KEY", credentialsBytes);
-        byte[] containerBytes = InstantiationUtil.serializeObject(container);
 
         try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
+            HadoopDelegationTokenReceiver receiver =
+                    new HadoopDelegationTokenReceiver() {
+                        @Override
+                        public String serviceName() {
+                            return "test";
+                        }
+                    };
+            receiver.onNewTokensObtained(credentialsBytes);
             ArgumentCaptor<Credentials> argumentCaptor = 
ArgumentCaptor.forClass(Credentials.class);
             verify(userGroupInformation, 
times(1)).addCredentials(argumentCaptor.capture());
             assertTrue(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
index 8d3367ac28e..362b4de5957 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
@@ -143,6 +144,9 @@ public class TaskExecutorBuilder {
             resolvedTaskManagerServices = taskManagerServices;
         }
 
+        final DelegationTokenReceiverRepository 
delegationTokenReceiverRepository =
+                new DelegationTokenReceiverRepository(configuration);
+
         return new TaskExecutor(
                 rpcService,
                 resolvedTaskManagerConfiguration,
@@ -154,7 +158,8 @@ public class TaskExecutorBuilder {
                 metricQueryServiceAddress,
                 resolvedTaskExecutorBlobService,
                 fatalErrorHandler,
-                partitionTracker);
+                partitionTracker,
+                delegationTokenReceiverRepository);
     }
 
     public static TaskExecutorBuilder newBuilder(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 43f9b75ee92..6933d29b551 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -251,7 +252,8 @@ public class 
TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
                 null,
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                new TestingTaskExecutorPartitionTracker());
+                new TestingTaskExecutorPartitionTracker(),
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private static TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(JobID jobId)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 6136e12f9d3..e2afdfb703a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -599,7 +600,8 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                 null,
                 NoOpTaskExecutorBlobService.INSTANCE,
                 new TestingFatalErrorHandler(),
-                partitionTracker);
+                partitionTracker,
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private static TaskSlotTable<Task> createTaskSlotTable() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 35f099b4215..55cb4f7c6b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
@@ -236,7 +237,8 @@ public class TaskExecutorSlotLifetimeTest extends 
TestLogger {
                 null,
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                new TestingTaskExecutorPartitionTracker());
+                new TestingTaskExecutorPartitionTracker(),
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private TaskExecutorLocalStateStoresManager 
createTaskExecutorLocalStateStoresManager()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e13efdcec55..cc857ba0e1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -79,6 +79,7 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -2818,7 +2819,8 @@ public class TaskExecutorTest extends TestLogger {
                 null,
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandler,
-                taskExecutorPartitionTracker);
+                taskExecutorPartitionTracker,
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices 
taskManagerServices)
@@ -2853,7 +2855,8 @@ public class TaskExecutorTest extends TestLogger {
                 null,
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandler,
-                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private TaskExecutorTestingContext createTaskExecutorTestingContext(int 
numberOfSlots)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 2ea9bacaa8a..79ff1dc541d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
@@ -292,6 +293,7 @@ public class TaskManagerRunnerStartupTest extends 
TestLogger {
                 false,
                 ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                 workingDirectory,
-                error -> {});
+                error -> {},
+                new DelegationTokenReceiverRepository(configuration));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 6406c1d06de..1cde71b77ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
@@ -276,7 +277,8 @@ public class TaskManagerRunnerTest extends TestLogger {
                 localCommunicationOnly,
                 externalResourceInfoProvider,
                 workingDirectory,
-                fatalErrorHandler) -> taskExecutorService;
+                fatalErrorHandler,
+                delegationTokenReceiverRepository) -> taskExecutorService;
     }
 
     private static Configuration createConfiguration() {
@@ -317,7 +319,8 @@ public class TaskManagerRunnerTest extends TestLogger {
                 boolean localCommunicationOnly,
                 ExternalResourceInfoProvider externalResourceInfoProvider,
                 WorkingDirectory workingDirectory,
-                FatalErrorHandler fatalErrorHandler) {
+                FatalErrorHandler fatalErrorHandler,
+                DelegationTokenReceiverRepository 
delegationTokenReceiverRepository) {
             return TestingTaskExecutorService.newBuilder()
                     .setStartRunnable(
                             () ->
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 24ac627fa9b..70f42962e51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
@@ -268,7 +269,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                 metricQueryServiceAddress,
                 taskExecutorBlobService,
                 testingFatalErrorHandler,
-                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+                new 
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+                new DelegationTokenReceiverRepository(configuration));
     }
 
     private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 871784caf1e..9bf4b5be894 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
 
 import javax.annotation.Nullable;
 
@@ -47,7 +48,8 @@ class TestingTaskExecutor extends TaskExecutor {
             @Nullable String metricQueryServiceAddress,
             TaskExecutorBlobService taskExecutorBlobService,
             FatalErrorHandler fatalErrorHandler,
-            TaskExecutorPartitionTracker partitionTracker) {
+            TaskExecutorPartitionTracker partitionTracker,
+            DelegationTokenReceiverRepository 
delegationTokenReceiverRepository) {
         super(
                 rpcService,
                 taskManagerConfiguration,
@@ -59,7 +61,8 @@ class TestingTaskExecutor extends TaskExecutor {
                 metricQueryServiceAddress,
                 taskExecutorBlobService,
                 fatalErrorHandler,
-                partitionTracker);
+                partitionTracker,
+                delegationTokenReceiverRepository);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..b186e9e09b9
--- /dev/null
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.TestDelegationTokenReceiver
+org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f692a501d41..2653fc46d9f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1300,9 +1300,12 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         DelegationTokenContainer container = new DelegationTokenContainer();
         delegationTokenManager.obtainDelegationTokens(container);
 
+        // This is here for backward compatibility to make log aggregation work
         Credentials credentials = new Credentials();
-        for (byte[] v : container.getTokens().values()) {
-            credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
+        for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) {
+            if (e.getKey().equals("hadoopfs")) {
+                
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
+            }
         }
         ByteBuffer tokens = 
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
         containerLaunchContext.setTokens(tokens);

Reply via email to