This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 324b54cf413 [FLINK-30425][runtime][security] Generalize token receive side 324b54cf413 is described below commit 324b54cf413229f78e6e709aaaa2e16a03d45df1 Author: Gabor Somogyi <gabor_somog...@apple.com> AuthorDate: Mon Jan 2 15:30:45 2023 +0100 [FLINK-30425][runtime][security] Generalize token receive side --- flink-end-to-end-tests/test-scripts/common.sh | 1 + .../flink/runtime/minicluster/MiniCluster.java | 10 +- .../token/DefaultDelegationTokenManager.java | 64 ++++++++-- .../security/token/DelegationTokenProvider.java | 13 +- .../security/token/DelegationTokenReceiver.java | 61 +++++++++ .../token/DelegationTokenReceiverRepository.java | 140 +++++++++++++++++++++ .../token/hadoop/HBaseDelegationTokenProvider.java | 27 +++- .../token/hadoop/HBaseDelegationTokenReceiver.java | 35 ++++++ ...ter.java => HadoopDelegationTokenReceiver.java} | 39 +++--- .../hadoop/HadoopFSDelegationTokenProvider.java | 31 ++++- .../hadoop/HadoopFSDelegationTokenReceiver.java | 31 +++++ .../flink/runtime/taskexecutor/TaskExecutor.java | 12 +- .../runtime/taskexecutor/TaskManagerRunner.java | 22 +++- ....runtime.security.token.DelegationTokenReceiver | 17 +++ .../token/DefaultDelegationTokenManagerTest.java | 96 ++++++++++++-- .../DelegationTokenReceiverRepositoryTest.java | 75 +++++++++++ .../ExceptionThrowingDelegationTokenReceiver.java | 60 +++++++++ .../token/TestDelegationTokenReceiver.java | 36 ++++++ ...va => HadoopDelegationTokenReceiverITCase.java} | 55 ++++---- .../runtime/taskexecutor/TaskExecutorBuilder.java | 7 +- ...cutorExecutionDeploymentReconciliationTest.java | 4 +- .../TaskExecutorPartitionLifecycleTest.java | 4 +- .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 4 +- .../runtime/taskexecutor/TaskExecutorTest.java | 7 +- .../taskexecutor/TaskManagerRunnerStartupTest.java | 4 +- .../taskexecutor/TaskManagerRunnerTest.java | 7 +- .../TaskSubmissionTestEnvironment.java | 4 +- .../runtime/taskexecutor/TestingTaskExecutor.java | 7 +- ....runtime.security.token.DelegationTokenReceiver | 17 +++ .../apache/flink/yarn/YarnClusterDescriptor.java | 7 +- 30 files changed, 792 insertions(+), 105 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 31741710b3a..047a05a7bb4 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -380,6 +380,7 @@ function check_logs_for_errors { | grep -v "Error sending fetch request" \ | grep -v "WARN akka.remote.ReliableDeliverySupervisor" \ | grep -v "Options.*error_*" \ + | grep -v "not packaged with this application" \ | grep -ic "error" || true) if [[ ${error_count} -gt 0 ]]; then echo "Found error in log files; printing first 500 lines; see full logs for details:" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 59240a14c03..1d48126e141 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -91,6 +91,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory; import org.apache.flink.runtime.security.token.DelegationTokenManager; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; @@ -199,6 +200,9 @@ public class MiniCluster implements AutoCloseableAsync { @GuardedBy("lock") private DelegationTokenManager delegationTokenManager; + @GuardedBy("lock") + private DelegationTokenReceiverRepository delegationTokenReceiverRepository; + @GuardedBy("lock") private BlobCacheService blobCacheService; @@ -431,6 +435,9 @@ public class MiniCluster implements AutoCloseableAsync { DefaultDelegationTokenManagerFactory.create( configuration, commonRpcService.getScheduledExecutor(), ioExecutor); + delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration); + blobCacheService = BlobUtils.createBlobCacheService( configuration, @@ -749,7 +756,8 @@ public class MiniCluster implements AutoCloseableAsync { ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, workingDirectory.createSubWorkingDirectory("tm_" + taskManagers.size()), taskManagerTerminatingFatalErrorHandlerFactory.create( - taskManagers.size())); + taskManagers.size()), + delegationTokenReceiverRepository); taskExecutor.start(); taskManagers.add(taskExecutor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java index 1703d796dec..fb5a195d2cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -34,9 +33,11 @@ import javax.annotation.concurrent.GuardedBy; import java.time.Clock; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -58,6 +59,13 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public class DefaultDelegationTokenManager implements DelegationTokenManager { + private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR = + "There is an inconsistency between loaded delegation token providers and receivers. " + + "One must implement a DelegationTokenProvider and a DelegationTokenReceiver " + + "with the same service name and add them together to the classpath to make " + + "the system consistent. The mentioned classes are loaded with Java's service " + + "loader so the appropriate META-INF registration also needs to be created."; + private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class); private final Configuration configuration; @@ -68,6 +76,8 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders; + private final DelegationTokenReceiverRepository delegationTokenReceiverRepository; + @Nullable private final ScheduledExecutor scheduledExecutor; @Nullable private final ExecutorService ioExecutor; @@ -89,8 +99,13 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { this.renewalRetryBackoffPeriod = configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); this.delegationTokenProviders = loadProviders(); + this.delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration); this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; + checkProviderAndReceiverConsistency( + delegationTokenProviders, + delegationTokenReceiverRepository.delegationTokenReceivers); } private Map<String, DelegationTokenProvider> loadProviders() { @@ -102,7 +117,7 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { Map<String, DelegationTokenProvider> providers = new HashMap<>(); for (DelegationTokenProvider provider : serviceLoader) { try { - if (isProviderEnabled(provider.serviceName())) { + if (isProviderEnabled(configuration, provider.serviceName())) { provider.init(configuration); LOG.info( "Delegation token provider {} loaded and initialized", @@ -118,10 +133,13 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { provider.serviceName()); } } catch (Exception | NoClassDefFoundError e) { - LOG.warn( + // The intentional general rule is that if a provider's init method throws exception + // then stop the workload + LOG.error( "Failed to initialize delegation token provider {}", provider.serviceName(), e); + throw new FlinkRuntimeException(e); } } @@ -130,8 +148,7 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { return providers; } - @VisibleForTesting - boolean isProviderEnabled(String serviceName) { + static boolean isProviderEnabled(Configuration configuration, String serviceName) { return configuration.getBoolean( String.format("security.delegation.token.provider.%s.enabled", serviceName), true); } @@ -141,6 +158,38 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { return delegationTokenProviders.containsKey(serviceName); } + @VisibleForTesting + boolean isReceiverLoaded(String serviceName) { + return delegationTokenReceiverRepository.isReceiverLoaded(serviceName); + } + + @VisibleForTesting + static void checkProviderAndReceiverConsistency( + Map<String, DelegationTokenProvider> providers, + Map<String, DelegationTokenReceiver> receivers) { + LOG.info("Checking provider and receiver instances consistency"); + if (providers.size() != receivers.size()) { + Set<String> missingReceiverServiceNames = new HashSet<>(providers.keySet()); + missingReceiverServiceNames.removeAll(receivers.keySet()); + if (!missingReceiverServiceNames.isEmpty()) { + throw new IllegalStateException( + PROVIDER_RECEIVER_INCONSISTENCY_ERROR + + " Missing receivers: " + + String.join(",", missingReceiverServiceNames)); + } + + Set<String> missingProviderServiceNames = new HashSet<>(receivers.keySet()); + missingProviderServiceNames.removeAll(providers.keySet()); + if (!missingProviderServiceNames.isEmpty()) { + throw new IllegalStateException( + PROVIDER_RECEIVER_INCONSISTENCY_ERROR + + " Missing providers: " + + String.join(",", missingProviderServiceNames)); + } + } + LOG.info("Provider and receiver instances are consistent"); + } + /** * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them. */ @@ -212,12 +261,11 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager { Optional<Long> nextRenewal = obtainDelegationTokensAndGetNextRenewal(container); if (container.hasTokens()) { - byte[] containerBytes = InstantiationUtil.serializeObject(container); - HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes); + delegationTokenReceiverRepository.onNewTokensObtained(container); LOG.info("Notifying listener about new tokens"); checkNotNull(listener, "Listener must not be null"); - listener.onNewTokensObtained(containerBytes); + listener.onNewTokensObtained(InstantiationUtil.serializeObject(container)); LOG.info("Listener notified successfully"); } else { LOG.warn("No tokens obtained so skipping listener notification"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java index d98e5816fa9..c6d7dcea413 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java @@ -25,10 +25,16 @@ import java.util.Optional; /** * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by {@link - * DelegationTokenManager} through service loader. + * DelegationTokenManager} through service loader. Basically the implementation of this interface is + * responsible to produce the serialized form of tokens which will be handled by {@link + * DelegationTokenReceiver} instances both on JobManager and TaskManager side. */ @Experimental public interface DelegationTokenProvider { + + /** Config prefix of providers. */ + String CONFIG_PREFIX = "security.delegation.token.provider"; + /** Container for obtained delegation tokens. */ class ObtainedDelegationTokens { /** Serialized form of delegation tokens. */ @@ -57,6 +63,11 @@ public interface DelegationTokenProvider { /** Name of the service to provide delegation tokens. This name should be unique. */ String serviceName(); + /** Config prefix of the service. */ + default String serviceConfigPrefix() { + return String.format("%s.%s", CONFIG_PREFIX, serviceName()); + } + /** * Called by {@link DelegationTokenManager} to initialize provider after construction. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java new file mode 100644 index 00000000000..b296186edc3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; + +/** + * Delegation token receiver API. Instances of {@link DelegationTokenReceiver}s are loaded both on + * JobManager and TaskManager side through service loader. Basically the implementation of this + * interface is responsible to receive the serialized form of tokens produced by {@link + * DelegationTokenProvider}. + */ +@Experimental +public interface DelegationTokenReceiver { + + /** Config prefix of receivers. */ + String CONFIG_PREFIX = "security.delegation.token.receiver"; + + /** + * Name of the service to receive delegation tokens for. This name should be unique and the same + * as the one provided in the corresponding {@link DelegationTokenProvider}. + */ + String serviceName(); + + /** Config prefix of the service. */ + default String serviceConfigPrefix() { + return String.format("%s.%s", CONFIG_PREFIX, serviceName()); + } + + /** + * Called to initialize receiver after construction. + * + * @param configuration Configuration to initialize the receiver. + */ + void init(Configuration configuration) throws Exception; + + /** + * Callback function when new delegation tokens obtained. + * + * @param tokens Serialized form of delegation tokens. Must be deserialized the reverse way + * which is implemented in {@link DelegationTokenProvider}. + */ + void onNewTokensObtained(byte[] tokens) throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java new file mode 100644 index 00000000000..8e63c3b4390 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.InstantiationUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** Repository for delegation token receivers. */ +@Internal +public class DelegationTokenReceiverRepository { + + private static final Logger LOG = + LoggerFactory.getLogger(DelegationTokenReceiverRepository.class); + + private final Configuration configuration; + + @VisibleForTesting final Map<String, DelegationTokenReceiver> delegationTokenReceivers; + + public DelegationTokenReceiverRepository(Configuration configuration) { + this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); + this.delegationTokenReceivers = loadReceivers(); + } + + private Map<String, DelegationTokenReceiver> loadReceivers() { + LOG.info("Loading delegation token receivers"); + + ServiceLoader<DelegationTokenReceiver> serviceLoader = + ServiceLoader.load(DelegationTokenReceiver.class); + + Map<String, DelegationTokenReceiver> receivers = new HashMap<>(); + for (DelegationTokenReceiver receiver : serviceLoader) { + try { + if (isProviderEnabled(configuration, receiver.serviceName())) { + receiver.init(configuration); + LOG.info( + "Delegation token receiver {} loaded and initialized", + receiver.serviceName()); + checkState( + !receivers.containsKey(receiver.serviceName()), + "Delegation token receiver with service name {} has multiple implementations", + receiver.serviceName()); + receivers.put(receiver.serviceName(), receiver); + } else { + LOG.info( + "Delegation token receiver {} is disabled so not loaded", + receiver.serviceName()); + } + } catch (Exception | NoClassDefFoundError e) { + // The intentional general rule is that if a receiver's init method throws exception + // then stop the workload + LOG.error( + "Failed to initialize delegation token receiver {}", + receiver.serviceName(), + e); + throw new FlinkRuntimeException(e); + } + } + + LOG.info("Delegation token receivers loaded successfully"); + + return receivers; + } + + @VisibleForTesting + boolean isReceiverLoaded(String serviceName) { + return delegationTokenReceivers.containsKey(serviceName); + } + + /** + * Callback function when new delegation tokens obtained. + * + * @param containerBytes Serialized form of a DelegationTokenContainer. All the available tokens + * will be forwarded to the appropriate {@link DelegationTokenReceiver} based on service + * name. + */ + public void onNewTokensObtained(byte[] containerBytes) throws Exception { + if (containerBytes == null || containerBytes.length == 0) { + throw new IllegalArgumentException("Illegal container tried to be processed"); + } + DelegationTokenContainer container = + InstantiationUtil.deserializeObject( + containerBytes, DelegationTokenContainer.class.getClassLoader()); + onNewTokensObtained(container); + } + + /** + * Callback function when new delegation tokens obtained. + * + * @param container Serialized form of delegation tokens stored in DelegationTokenContainer. All + * the available tokens will be forwarded to the appropriate {@link DelegationTokenReceiver} + * based on service name. + */ + public void onNewTokensObtained(DelegationTokenContainer container) throws Exception { + LOG.info("New delegation tokens arrived, sending them to receivers"); + for (Map.Entry<String, byte[]> entry : container.getTokens().entrySet()) { + String serviceName = entry.getKey(); + byte[] tokens = entry.getValue(); + if (!delegationTokenReceivers.containsKey(serviceName)) { + throw new IllegalStateException( + "Tokens arrived for service but no receiver found for it: " + serviceName); + } + try { + delegationTokenReceivers.get(serviceName).onNewTokensObtained(tokens); + } catch (Exception e) { + LOG.warn("Failed to send tokens to delegation token receiver {}", serviceName, e); + } + } + LOG.info("Delegation tokens sent to receivers"); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java index 572a49eb8c3..7e8dd66ca54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.security.token.hadoop; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.security.token.DelegationTokenProvider; import org.apache.flink.runtime.util.HadoopUtils; @@ -34,7 +34,6 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.security.PrivilegedExceptionAction; -import java.util.Objects; import java.util.Optional; /** @@ -42,7 +41,7 @@ import java.util.Optional; * flink-connector-hbase-base but HBase connection can be made without the connector. All in all I * tend to move this but that would be a breaking change. */ -@Experimental +@Internal public class HBaseDelegationTokenProvider implements DelegationTokenProvider { private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class); @@ -80,7 +79,8 @@ public class HBaseDelegationTokenProvider implements DelegationTokenProvider { } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException - | ClassNotFoundException e) { + | ClassNotFoundException + | NoClassDefFoundError e) { LOG.info( "HBase is not available (not packaged with this application): {} : \"{}\".", e.getClass().getSimpleName(), @@ -91,6 +91,22 @@ public class HBaseDelegationTokenProvider implements DelegationTokenProvider { @Override public boolean delegationTokensRequired() throws Exception { + /** + * The general rule how a provider/receiver must behave is the following: The provider and + * the receiver must be added to the classpath together with all the additionally required + * dependencies. + * + * <p>This null check is required because the HBase provider is always on classpath but + * HBase jars are optional. Such case configuration is not able to be loaded. This construct + * is intended to be removed when HBase provider/receiver pair can be externalized (namely + * if a provider/receiver throws an exception then workload must be stopped). + */ + if (hbaseConf == null) { + LOG.debug( + "HBase is not available (not packaged with this application), hence no " + + "tokens will be acquired."); + return false; + } try { if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) { return false; @@ -99,8 +115,7 @@ public class HBaseDelegationTokenProvider implements DelegationTokenProvider { LOG.debug("Hadoop Kerberos is not enabled."); return false; } - return Objects.nonNull(hbaseConf) - && hbaseConf.get("hbase.security.authentication").equals("kerberos") + return hbaseConf.get("hbase.security.authentication").equals("kerberos") && kerberosLoginProvider.isLoginPossible(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java new file mode 100644 index 00000000000..9fba0cc5604 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token.hadoop; + +import org.apache.flink.annotation.Internal; + +/** + * Delegation token receiver implementation for HBase. Basically it would be good to move this to + * flink-connector-hbase-base but HBase connection can be made without the connector. All in all I + * tend to move this but that would be a breaking change. + */ +@Internal +public class HBaseDelegationTokenReceiver extends HadoopDelegationTokenReceiver { + + @Override + public String serviceName() { + return "hbase"; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java similarity index 54% rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java index d6ec75797a9..0b1cba0f738 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java @@ -19,46 +19,43 @@ package org.apache.flink.runtime.security.token.hadoop; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.security.token.DelegationTokenContainer; -import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.security.token.DelegationTokenReceiver; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Delegation token updater functionality. */ +/** Hadoop delegation token receiver base class. */ @Internal -public final class HadoopDelegationTokenUpdater { +public abstract class HadoopDelegationTokenReceiver implements DelegationTokenReceiver { - private static final Logger LOG = LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class); + private final Logger log = LoggerFactory.getLogger(getClass()); - private HadoopDelegationTokenUpdater() {} + public abstract String serviceName(); - /** Updates delegation tokens for the current user. */ - public static void addCurrentUserCredentials(byte[] containerBytes) throws Exception { - if (containerBytes == null || containerBytes.length == 0) { - throw new IllegalArgumentException("Illegal container tried to be processed"); - } - DelegationTokenContainer container = - InstantiationUtil.deserializeObject( - containerBytes, HadoopDelegationTokenUpdater.class.getClassLoader()); - Credentials credentials = new Credentials(); - for (byte[] v : container.getTokens().values()) { - credentials.addAll(HadoopDelegationTokenConverter.deserialize(v)); + public void init(Configuration configuration) throws Exception {} + + @Override + public void onNewTokensObtained(byte[] tokens) throws Exception { + if (tokens == null || tokens.length == 0) { + throw new IllegalArgumentException("Illegal tokens tried to be processed"); } - LOG.info("Updating delegation tokens for current user"); + Credentials credentials = HadoopDelegationTokenConverter.deserialize(tokens); + + log.info("Updating delegation tokens for current user"); dumpAllTokens(credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials); - LOG.info("Updated delegation tokens for current user successfully"); + log.info("Updated delegation tokens for current user successfully"); } - public static void dumpAllTokens(Credentials credentials) { + private void dumpAllTokens(Credentials credentials) { credentials .getAllTokens() .forEach( token -> - LOG.info( + log.info( "Token Service:{} Identifier:{}", token.getService(), token.getIdentifier())); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java index a6a13f3a6cb..23202d27df9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.security.token.hadoop; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; @@ -47,7 +47,7 @@ import java.util.Optional; import java.util.Set; /** Delegation token provider for Hadoop filesystems. */ -@Experimental +@Internal public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider { private static final Logger LOG = @@ -69,12 +69,35 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider @Override public void init(Configuration configuration) throws Exception { flinkConfiguration = configuration; - hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration); - kerberosLoginProvider = new KerberosLoginProvider(configuration); + try { + hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration); + kerberosLoginProvider = new KerberosLoginProvider(configuration); + } catch (NoClassDefFoundError e) { + LOG.info( + "Hadoop FS is not available (not packaged with this application): {} : \"{}\".", + e.getClass().getSimpleName(), + e.getMessage()); + } } @Override public boolean delegationTokensRequired() throws Exception { + /** + * The general rule how a provider/receiver must behave is the following: The provider and + * the receiver must be added to the classpath together with all the additionally required + * dependencies. + * + * <p>This null check is required because the Hadoop FS provider is always on classpath but + * Hadoop FS jars are optional. Such case configuration is not able to be loaded. This + * construct is intended to be removed when HBase provider/receiver pair can be externalized + * (namely if a provider/receiver throws an exception then workload must be stopped). + */ + if (hadoopConfiguration == null) { + LOG.debug( + "Hadoop FS is not available (not packaged with this application), hence no " + + "tokens will be acquired."); + return false; + } return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()) && kerberosLoginProvider.isLoginPossible(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java new file mode 100644 index 00000000000..113914a8651 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token.hadoop; + +import org.apache.flink.annotation.Internal; + +/** Delegation token receiver for Hadoop filesystems. */ +@Internal +public class HadoopFSDelegationTokenReceiver extends HadoopDelegationTokenReceiver { + + @Override + public String serviceName() { + return "hadoopfs"; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 218ecdaab46..9c3dd3744af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -94,7 +94,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; -import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; @@ -263,6 +263,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final TaskExecutorPartitionTracker partitionTracker; + private final DelegationTokenReceiverRepository delegationTokenReceiverRepository; + // --------- resource manager -------- @Nullable private ResourceManagerAddress resourceManagerAddress; @@ -289,7 +291,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Nullable String metricQueryServiceAddress, TaskExecutorBlobService taskExecutorBlobService, FatalErrorHandler fatalErrorHandler, - TaskExecutorPartitionTracker partitionTracker) { + TaskExecutorPartitionTracker partitionTracker, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) { super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); @@ -302,6 +305,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.haServices = checkNotNull(haServices); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.partitionTracker = partitionTracker; + this.delegationTokenReceiverRepository = checkNotNull(delegationTokenReceiverRepository); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.taskExecutorBlobService = checkNotNull(taskExecutorBlobService); this.metricQueryServiceAddress = metricQueryServiceAddress; @@ -1346,7 +1350,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } try { - HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens); + delegationTokenReceiverRepository.onNewTokensObtained(tokens); return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Throwable t) { log.error("Could not update delegation tokens.", t); @@ -2386,7 +2390,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { if (tokens != null) { try { log.info("Receive initial delegation tokens from resource manager"); - HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens); + delegationTokenReceiverRepository.onNewTokensObtained(tokens); } catch (Throwable t) { log.error("Could not update delegation tokens.", t); ExceptionUtils.rethrowIfFatalError(t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 0f08ccdaf7a..a7207e1fd8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.rpc.RpcSystemUtils; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; import org.apache.flink.runtime.taskmanager.MemoryLogger; import org.apache.flink.runtime.util.ConfigurationParserUtils; @@ -241,6 +242,9 @@ public class TaskManagerRunner implements FatalErrorHandler { ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig( configuration, pluginManager); + final DelegationTokenReceiverRepository delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration); + taskExecutorService = taskExecutorServiceFactory.createTaskExecutor( this.configuration, @@ -253,7 +257,8 @@ public class TaskManagerRunner implements FatalErrorHandler { false, externalResourceInfoProvider, workingDirectory.unwrap(), - this); + this, + delegationTokenReceiverRepository); handleUnexpectedTaskExecutorServiceTermination(); @@ -552,7 +557,8 @@ public class TaskManagerRunner implements FatalErrorHandler { boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, - FatalErrorHandler fatalErrorHandler) + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) throws Exception { final TaskExecutor taskExecutor = @@ -567,7 +573,8 @@ public class TaskManagerRunner implements FatalErrorHandler { localCommunicationOnly, externalResourceInfoProvider, workingDirectory, - fatalErrorHandler); + fatalErrorHandler, + delegationTokenReceiverRepository); return TaskExecutorToServiceAdapter.createFor(taskExecutor); } @@ -583,7 +590,8 @@ public class TaskManagerRunner implements FatalErrorHandler { boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, - FatalErrorHandler fatalErrorHandler) + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) throws Exception { checkNotNull(configuration); @@ -653,7 +661,8 @@ public class TaskManagerRunner implements FatalErrorHandler { metricQueryServiceAddress, taskExecutorBlobService, fatalErrorHandler, - new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment())); + new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), + delegationTokenReceiverRepository); } /** @@ -776,7 +785,8 @@ public class TaskManagerRunner implements FatalErrorHandler { boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, - FatalErrorHandler fatalErrorHandler) + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) throws Exception; } diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver new file mode 100644 index 00000000000..fc1f34c11b8 --- /dev/null +++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenReceiver +org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenReceiver diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java index 8092f330a99..ed34ea978de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java @@ -22,16 +22,20 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Clock; import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static java.time.Instant.ofEpochMilli; import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; +import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -43,30 +47,28 @@ public class DefaultDelegationTokenManagerTest { @BeforeEach public void beforeEach() { ExceptionThrowingDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenReceiver.reset(); } - @AfterAll - public static void afterAll() { + @AfterEach + public void afterEach() { ExceptionThrowingDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenReceiver.reset(); } @Test public void isProviderEnabledMustGiveBackTrueByDefault() { Configuration configuration = new Configuration(); - DefaultDelegationTokenManager delegationTokenManager = - new DefaultDelegationTokenManager(configuration, null, null); - assertTrue(delegationTokenManager.isProviderEnabled("test")); + assertTrue(DefaultDelegationTokenManager.isProviderEnabled(configuration, "test")); } @Test public void isProviderEnabledMustGiveBackFalseWhenDisabled() { Configuration configuration = new Configuration(); - configuration.setBoolean("security.delegation.token.provider.test.enabled", false); - DefaultDelegationTokenManager delegationTokenManager = - new DefaultDelegationTokenManager(configuration, null, null); + configuration.setBoolean(CONFIG_PREFIX + ".test.enabled", false); - assertFalse(delegationTokenManager.isProviderEnabled("test")); + assertFalse(DefaultDelegationTokenManager.isProviderEnabled(configuration, "test")); } @Test @@ -74,19 +76,87 @@ public class DefaultDelegationTokenManagerTest { assertThrows(Exception.class, () -> new DefaultDelegationTokenManager(null, null, null)); } + @Test + public void oneProviderThrowsExceptionMustFailFast() { + assertThrows( + Exception.class, + () -> { + ExceptionThrowingDelegationTokenProvider.throwInInit = true; + new DefaultDelegationTokenManager(new Configuration(), null, null); + }); + } + @Test public void testAllProvidersLoaded() { Configuration configuration = new Configuration(); - configuration.setBoolean("security.delegation.token.provider.throw.enabled", false); + configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(configuration, null, null); assertEquals(3, delegationTokenManager.delegationTokenProviders.size()); + assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs")); + assertTrue(delegationTokenManager.isReceiverLoaded("hadoopfs")); + assertTrue(delegationTokenManager.isProviderLoaded("hbase")); + assertTrue(delegationTokenManager.isReceiverLoaded("hbase")); + assertTrue(delegationTokenManager.isProviderLoaded("test")); + assertTrue(delegationTokenManager.isReceiverLoaded("test")); + assertTrue(ExceptionThrowingDelegationTokenProvider.constructed); + assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed); assertFalse(delegationTokenManager.isProviderLoaded("throw")); + assertFalse(delegationTokenManager.isReceiverLoaded("throw")); + } + + @Test + public void checkProviderAndReceiverConsistencyShouldNotThrowWhenNothingLoaded() { + DefaultDelegationTokenManager.checkProviderAndReceiverConsistency( + Collections.emptyMap(), Collections.emptyMap()); + } + + @Test + public void checkProviderAndReceiverConsistencyShouldThrowWhenMissingReceiver() { + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + providers.put("test", new TestDelegationTokenProvider()); + + IllegalStateException e = + assertThrows( + IllegalStateException.class, + () -> + DefaultDelegationTokenManager.checkProviderAndReceiverConsistency( + providers, Collections.emptyMap())); + assertTrue(e.getMessage().contains("Missing receivers: test")); + } + + @Test + public void checkProviderAndReceiverConsistencyShouldThrowWhenMissingProvider() { + Map<String, DelegationTokenReceiver> receivers = new HashMap<>(); + receivers.put("test", new TestDelegationTokenReceiver()); + + IllegalStateException e = + assertThrows( + IllegalStateException.class, + () -> + DefaultDelegationTokenManager.checkProviderAndReceiverConsistency( + Collections.emptyMap(), receivers)); + assertTrue(e.getMessage().contains("Missing providers: test")); + } + + @Test + public void checkProviderAndReceiverConsistencyShouldNotThrowWhenBothLoaded() { + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + providers.put("test", new TestDelegationTokenProvider()); + Map<String, DelegationTokenReceiver> receivers = new HashMap<>(); + receivers.put("test", new TestDelegationTokenReceiver()); + + DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(providers, receivers); + + assertEquals(1, providers.size()); + assertTrue(providers.containsKey("test")); + assertEquals(1, receivers.size()); + assertTrue(receivers.containsKey("test")); } @Test @@ -98,7 +168,7 @@ public class DefaultDelegationTokenManagerTest { ExceptionThrowingDelegationTokenProvider.addToken = true; Configuration configuration = new Configuration(); - configuration.setBoolean("security.delegation.token.provider.throw.enabled", true); + configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true); AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(configuration, scheduledExecutor, scheduler) { @@ -124,7 +194,7 @@ public class DefaultDelegationTokenManagerTest { @Test public void calculateRenewalDelayShouldConsiderRenewalRatio() { Configuration configuration = new Configuration(); - configuration.setBoolean("security.delegation.token.provider.throw.enabled", false); + configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(configuration, null, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java new file mode 100644 index 00000000000..db93128d7ab --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link DelegationTokenReceiverRepository}. */ +class DelegationTokenReceiverRepositoryTest { + + @BeforeEach + public void beforeEach() { + ExceptionThrowingDelegationTokenReceiver.reset(); + } + + @AfterEach + public void afterEach() { + ExceptionThrowingDelegationTokenReceiver.reset(); + } + + @Test + public void configurationIsNullMustFailFast() { + assertThrows(Exception.class, () -> new DelegationTokenReceiverRepository(null)); + } + + @Test + public void oneReceiverThrowsExceptionMustFailFast() { + assertThrows( + Exception.class, + () -> { + ExceptionThrowingDelegationTokenReceiver.throwInInit = true; + new DelegationTokenReceiverRepository(new Configuration()); + }); + } + + @Test + public void testAllReceiversLoaded() { + Configuration configuration = new Configuration(); + configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); + DelegationTokenReceiverRepository delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration); + + assertEquals(3, delegationTokenReceiverRepository.delegationTokenReceivers.size()); + assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs")); + assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hbase")); + assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("test")); + assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed); + assertFalse(delegationTokenReceiverRepository.isReceiverLoaded("throw")); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java new file mode 100644 index 00000000000..41e72c11cce --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +/** + * An example implementation of {@link DelegationTokenReceiver} which throws exception when enabled. + */ +public class ExceptionThrowingDelegationTokenReceiver implements DelegationTokenReceiver { + + public static volatile boolean throwInInit = false; + public static volatile boolean throwInUsage = false; + public static volatile boolean constructed = false; + + public static void reset() { + throwInInit = false; + throwInUsage = false; + constructed = false; + } + + public ExceptionThrowingDelegationTokenReceiver() { + constructed = true; + } + + @Override + public String serviceName() { + return "throw"; + } + + @Override + public void init(Configuration configuration) { + if (throwInInit) { + throw new IllegalArgumentException(); + } + } + + @Override + public void onNewTokensObtained(byte[] tokens) throws Exception { + if (throwInUsage) { + throw new IllegalArgumentException(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java new file mode 100644 index 00000000000..405d026a3a1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +/** An example implementation of {@link DelegationTokenReceiver} which does nothing. */ +public class TestDelegationTokenReceiver implements DelegationTokenReceiver { + + @Override + public String serviceName() { + return "test"; + } + + @Override + public void init(Configuration configuration) throws Exception {} + + @Override + public void onNewTokensObtained(byte[] tokens) throws Exception {} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java similarity index 60% rename from flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java index 7da8411dc96..9f98b4b2304 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.security.token.hadoop; -import org.apache.flink.runtime.security.token.DelegationTokenContainer; -import org.apache.flink.util.InstantiationUtil; - import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -37,51 +34,55 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -/** Test for {@link HadoopDelegationTokenConverter}. */ -public class HadoopDelegationTokenUpdaterITCase { +/** Test for {@link HadoopDelegationTokenReceiver}. */ +public class HadoopDelegationTokenReceiverITCase { @Test - public void addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() { - addCurrentUserCredentialsShouldThrowException(null); + public void onNewTokensObtainedShouldThrowExceptionWhenNullCredentials() { + onNewTokensObtainedShouldThrowException(null); } @Test - public void addCurrentUserCredentialsShouldThrowExceptionWhenEmptyCredentials() { - addCurrentUserCredentialsShouldThrowException(new byte[0]); + public void onNewTokensObtainedShouldThrowExceptionWhenEmptyCredentials() { + onNewTokensObtainedShouldThrowException(new byte[0]); } - private void addCurrentUserCredentialsShouldThrowException(byte[] credentialsBytes) { - try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { - UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); - ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - - IllegalArgumentException e = - assertThrows( - IllegalArgumentException.class, - () -> - HadoopDelegationTokenUpdater.addCurrentUserCredentials( - credentialsBytes)); - assertTrue(e.getMessage().contains("Illegal container")); - } + private void onNewTokensObtainedShouldThrowException(byte[] credentialsBytes) { + HadoopDelegationTokenReceiver receiver = + new HadoopDelegationTokenReceiver() { + @Override + public String serviceName() { + return "test"; + } + }; + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> receiver.onNewTokensObtained(credentialsBytes)); + assertTrue(e.getMessage().contains("Illegal tokens")); } @Test - public void addCurrentUserCredentialsShouldOverwriteCredentials() throws Exception { + public void onNewTokensObtainedShouldOverwriteCredentials() throws Exception { final Text tokenKind = new Text("TEST_TOKEN_KIND"); final Text tokenService = new Text("TEST_TOKEN_SERVICE"); Credentials credentials = new Credentials(); credentials.addToken( tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService)); byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials); - DelegationTokenContainer container = new DelegationTokenContainer(); - container.addToken("TEST_TOKEN_KEY", credentialsBytes); - byte[] containerBytes = InstantiationUtil.serializeObject(container); try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes); + HadoopDelegationTokenReceiver receiver = + new HadoopDelegationTokenReceiver() { + @Override + public String serviceName() { + return "test"; + } + }; + receiver.onNewTokensObtained(credentialsBytes); ArgumentCaptor<Credentials> argumentCaptor = ArgumentCaptor.forClass(Credentials.class); verify(userGroupInformation, times(1)).addCredentials(argumentCaptor.capture()); assertTrue( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java index 8d3367ac28e..362b4de5957 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.util.concurrent.Executors; import javax.annotation.Nullable; @@ -143,6 +144,9 @@ public class TaskExecutorBuilder { resolvedTaskManagerServices = taskManagerServices; } + final DelegationTokenReceiverRepository delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration); + return new TaskExecutor( rpcService, resolvedTaskManagerConfiguration, @@ -154,7 +158,8 @@ public class TaskExecutorBuilder { metricQueryServiceAddress, resolvedTaskExecutorBlobService, fatalErrorHandler, - partitionTracker); + partitionTracker, + delegationTokenReceiverRepository); } public static TaskExecutorBuilder newBuilder( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java index 43f9b75ee92..6933d29b551 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; @@ -251,7 +252,8 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge null, NoOpTaskExecutorBlobService.INSTANCE, testingFatalErrorHandlerResource.getFatalErrorHandler(), - new TestingTaskExecutorPartitionTracker()); + new TestingTaskExecutorPartitionTracker(), + new DelegationTokenReceiverRepository(configuration)); } private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 6136e12f9d3..e2afdfb703a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -599,7 +600,8 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { null, NoOpTaskExecutorBlobService.INSTANCE, new TestingFatalErrorHandler(), - partitionTracker); + partitionTracker, + new DelegationTokenReceiverRepository(configuration)); } private static TaskSlotTable<Task> createTaskSlotTable() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java index 35f099b4215..55cb4f7c6b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; @@ -236,7 +237,8 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger { null, NoOpTaskExecutorBlobService.INSTANCE, testingFatalErrorHandlerResource.getFatalErrorHandler(), - new TestingTaskExecutorPartitionTracker()); + new TestingTaskExecutorPartitionTracker(), + new DelegationTokenReceiverRepository(configuration)); } private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e13efdcec55..cc857ba0e1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -79,6 +79,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; @@ -2818,7 +2819,8 @@ public class TaskExecutorTest extends TestLogger { null, NoOpTaskExecutorBlobService.INSTANCE, testingFatalErrorHandler, - taskExecutorPartitionTracker); + taskExecutorPartitionTracker, + new DelegationTokenReceiverRepository(configuration)); } private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) @@ -2853,7 +2855,8 @@ public class TaskExecutorTest extends TestLogger { null, NoOpTaskExecutorBlobService.INSTANCE, testingFatalErrorHandler, - new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment())); + new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), + new DelegationTokenReceiverRepository(configuration)); } private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java index 2ea9bacaa8a..79ff1dc541d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.testutils.WorkingDirectoryResource; import org.apache.flink.util.IOUtils; import org.apache.flink.util.TestLogger; @@ -292,6 +293,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger { false, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, workingDirectory, - error -> {}); + error -> {}, + new DelegationTokenReceiverRepository(configuration)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java index 6406c1d06de..1cde71b77ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.TimeUtils; @@ -276,7 +277,8 @@ public class TaskManagerRunnerTest extends TestLogger { localCommunicationOnly, externalResourceInfoProvider, workingDirectory, - fatalErrorHandler) -> taskExecutorService; + fatalErrorHandler, + delegationTokenReceiverRepository) -> taskExecutorService; } private static Configuration createConfiguration() { @@ -317,7 +319,8 @@ public class TaskManagerRunnerTest extends TestLogger { boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) { return TestingTaskExecutorService.newBuilder() .setStartRunnable( () -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 24ac627fa9b..70f42962e51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; @@ -268,7 +269,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { metricQueryServiceAddress, taskExecutorBlobService, testingFatalErrorHandler, - new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment())); + new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), + new DelegationTokenReceiverRepository(configuration)); } private static ShuffleEnvironment<?, ?> createShuffleEnvironment( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index 871784caf1e..9bf4b5be894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; import javax.annotation.Nullable; @@ -47,7 +48,8 @@ class TestingTaskExecutor extends TaskExecutor { @Nullable String metricQueryServiceAddress, TaskExecutorBlobService taskExecutorBlobService, FatalErrorHandler fatalErrorHandler, - TaskExecutorPartitionTracker partitionTracker) { + TaskExecutorPartitionTracker partitionTracker, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) { super( rpcService, taskManagerConfiguration, @@ -59,7 +61,8 @@ class TestingTaskExecutor extends TaskExecutor { metricQueryServiceAddress, taskExecutorBlobService, fatalErrorHandler, - partitionTracker); + partitionTracker, + delegationTokenReceiverRepository); } @Override diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver new file mode 100644 index 00000000000..b186e9e09b9 --- /dev/null +++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.security.token.TestDelegationTokenReceiver +org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index f692a501d41..2653fc46d9f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1300,9 +1300,12 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { DelegationTokenContainer container = new DelegationTokenContainer(); delegationTokenManager.obtainDelegationTokens(container); + // This is here for backward compatibility to make log aggregation work Credentials credentials = new Credentials(); - for (byte[] v : container.getTokens().values()) { - credentials.addAll(HadoopDelegationTokenConverter.deserialize(v)); + for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) { + if (e.getKey().equals("hadoopfs")) { + credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue())); + } } ByteBuffer tokens = ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials)); containerLaunchContext.setTokens(tokens);