This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new b4870bc Revert "YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R." b4870bc is described below commit b4870bce3a8336dbd638d26b8662037c4d4cdae9 Author: Inigo Goiri <inigo...@apache.org> AuthorDate: Tue Jan 21 17:45:17 2020 -0800 Revert "YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R." This reverts commit 0696828a090bc06446f75b29c967697f1d6d845b. --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 14 -- .../src/main/resources/yarn-default.xml | 24 --- .../security/DelegationTokenRenewer.java | 144 +---------------- .../security/TestDelegationTokenRenewer.java | 177 +-------------------- 4 files changed, 4 insertions(+), 355 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index be7cc89..06c3fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -730,19 +729,6 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; - public static final String RM_DT_RENEWER_THREAD_TIMEOUT = - RM_PREFIX + "delegation-token-renewer.thread-timeout"; - public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT = - TimeUnit.SECONDS.toMillis(60); // 60 Seconds - public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL = - RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; - public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL = - TimeUnit.SECONDS.toMillis(60); // 60 Seconds - public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = - RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; - public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = - 10; - public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5277be4..c96a7e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -959,30 +959,6 @@ <property> <description> - RM DelegationTokenRenewer thread timeout - </description> - <name>yarn.resourcemanager.delegation-token-renewer.thread-timeout</name> - <value>60s</value> - </property> - - <property> - <description> - Default maximum number of retries for each RM DelegationTokenRenewer thread - </description> - <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts</name> - <value>10</value> - </property> - - <property> - <description> - Time interval between each RM DelegationTokenRenewer thread retry attempt - </description> - <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-interval</name> - <value>60s</value> - </property> - - <property> - <description> Thread pool size for RMApplicationHistoryWriter. </description> <name>yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size</name> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index fd8935d..d3ed503 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -37,12 +36,10 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,12 +115,6 @@ public class DelegationTokenRenewer extends AbstractService { private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; - private long tokenRenewerThreadTimeout; - private long tokenRenewerThreadRetryInterval; - private int tokenRenewerThreadRetryMaxAttempts; - private final Map<DelegationTokenRenewerEvent, Future<?>> futures = - new HashMap<>(); - private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -149,17 +140,6 @@ public class DelegationTokenRenewer extends AbstractService { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); - tokenRenewerThreadTimeout = - conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, - YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT, - TimeUnit.MILLISECONDS); - tokenRenewerThreadRetryInterval = conf.getTimeDuration( - YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, - YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL, - TimeUnit.MILLISECONDS); - tokenRenewerThreadRetryMaxAttempts = - conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>(); @@ -204,11 +184,6 @@ public class DelegationTokenRenewer extends AbstractService { serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); - - if (delegationTokenRenewerPoolTrackerFlag) { - renewerService.submit(new DelegationTokenRenewerPoolTracker()); - } - while(!pendingEventQueue.isEmpty()) { processDelegationTokenRenewerEvent(pendingEventQueue.take()); } @@ -220,9 +195,7 @@ public class DelegationTokenRenewer extends AbstractService { serviceStateLock.readLock().lock(); try { if (isServiceStarted) { - Future<?> future = - renewerService.submit(new DelegationTokenRenewerRunnable(evt)); - futures.put(evt, future); + renewerService.execute(new DelegationTokenRenewerRunnable(evt)); } else { pendingEventQueue.add(evt); } @@ -503,8 +476,7 @@ public class DelegationTokenRenewer extends AbstractService { for (Iterator<Map.Entry<String, String>> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry<String, String> entry = itor.next(); - LOG.debug("Token conf key is {} and value is {}", - entry.getKey(), entry.getValue()); + LOG.info(entry.getKey() + " ===> " + entry.getValue()); } } } else { @@ -922,100 +894,7 @@ public class DelegationTokenRenewer extends AbstractService { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } - - @VisibleForTesting - public void setDelegationTokenRenewerPoolTracker(boolean flag) { - delegationTokenRenewerPoolTrackerFlag = flag; - } - - /** - * Create a timer task to retry the token renewer event which would be - * scheduled at defined intervals based on the configuration. - * - * @param evt - * @return Timer Task - */ - private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) { - return new TimerTask() { - @Override - public void run() { - LOG.info("Retrying token renewer thread for appid = {} and " - + "attempt is {}", evt.getApplicationId(), - evt.getAttempt()); - evt.incrAttempt(); - - Collection<Token<?>> tokens = - evt.getCredentials().getAllTokens(); - for (Token<?> token : tokens) { - DelegationTokenToRenew dttr = allTokens.get(token); - if (dttr != null) { - removeFailedDelegationToken(dttr); - } - } - - DelegationTokenRenewerAppRecoverEvent event = - new DelegationTokenRenewerAppRecoverEvent( - evt.getApplicationId(), evt.getCredentials(), - evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf()); - event.setAttempt(evt.getAttempt()); - processDelegationTokenRenewerEvent(event); - } - }; - } - - /** - * Runnable class to set timeout for futures of all threads running in - * renewerService thread pool executor asynchronously. - * - * In case of timeout exception, retries would be attempted with defined - * intervals till no. of retry attempt reaches max attempt. - */ - private final class DelegationTokenRenewerPoolTracker - implements Runnable { - - DelegationTokenRenewerPoolTracker() { - } - - /** - * Keep traversing <Future> of renewer pool threads and wait for specific - * timeout. In case of timeout exception, retry the event till no. of - * attempts reaches max attempts with specific interval. - */ - @Override - public void run() { - while (true) { - for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures - .entrySet()) { - DelegationTokenRenewerEvent evt = entry.getKey(); - Future<?> future = entry.getValue(); - try { - future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - - // Cancel thread and retry the same event in case of timeout - if (future != null && !future.isDone() && !future.isCancelled()) { - future.cancel(true); - futures.remove(evt); - if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { - renewalTimer.schedule( - getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt), - tokenRenewerThreadRetryInterval); - } else { - LOG.info( - "Exhausted max retry attempts {} in token renewer " - + "thread for {}", - tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); - } - } - } catch (Exception e) { - LOG.info("Problem in submitting renew tasks in token renewer " - + "thread.", e); - } - } - } - } - } - + /* * This will run as a separate thread and will process individual events. It * is done in this way to make sure that the token renewal as a part of @@ -1137,10 +1016,6 @@ public class DelegationTokenRenewer extends AbstractService { public String getUser() { return user; } - - private Configuration getTokenConf() { - return tokenConf; - } } enum DelegationTokenRenewerEventType { @@ -1153,7 +1028,6 @@ public class DelegationTokenRenewer extends AbstractService { AbstractEvent<DelegationTokenRenewerEventType> { private ApplicationId appId; - private int attempt = 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1164,18 +1038,6 @@ public class DelegationTokenRenewer extends AbstractService { public ApplicationId getApplicationId() { return appId; } - - public void incrAttempt() { - attempt++; - } - - public int getAttempt() { - return attempt; - } - - public void setAttempt(int attempt) { - this.attempt = attempt; - } } // only for testing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 0205460..5f6d440 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -43,7 +42,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -95,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -233,7 +230,6 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false); delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); @@ -636,7 +632,6 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -717,7 +712,6 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -1618,173 +1612,4 @@ public class TestDelegationTokenRenewer { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } - - /** - * Test case to ensure token renewer threads are timed out by inducing - * artificial delay. - * - * Because of time out, retries would be attempted till it reaches max retry - * attempt and finally asserted using used threads count. - * - * @throws Exception - */ - @Test(timeout = 30000) - public void testTokenThreadTimeout() throws Exception { - Configuration yarnConf = new YarnConfiguration(); - yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, - true); - yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - "kerberos"); - yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class, - RMStateStore.class); - yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5, - TimeUnit.SECONDS); - yarnConf.setTimeDuration( - YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5, - TimeUnit.SECONDS); - yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, - 3); - UserGroupInformation.setConfiguration(yarnConf); - - Text userText = new Text("user1"); - DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText, - new Text("renewer1"), userText); - final Token<DelegationTokenIdentifier> originalToken = - new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(), - new Text("service1")); - - Credentials credentials = new Credentials(); - credentials.addToken(userText, originalToken); - - AtomicBoolean renewDelay = new AtomicBoolean(false); - - // -1 is because of thread allocated to pool tracker runnable tasks - AtomicInteger threadCounter = new AtomicInteger(-1); - renewDelay.set(true); - DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( - yarnConf, threadCounter, renewDelay); - - MockRM rm = new TestSecurityMockRM(yarnConf) { - @Override - protected DelegationTokenRenewer createDelegationTokenRenewer() { - return renewer; - } - }; - - rm.start(); - rm.submitApp(200, "name", "user", - new HashMap<ApplicationAccessType, String>(), false, "default", 1, - credentials); - - int attempts = yarnConf.getInt( - YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); - - GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000, - 30000); - - // Ensure no. of threads has been used in renewer service thread pool is - // higher than the configured max retry attempts - assertTrue(threadCounter.get() >= attempts); - rm.close(); - } - - /** - * Test case to ensure token renewer threads are running as usual and finally - * asserted only 1 thread has been used. - * - * @throws Exception - */ - @Test(timeout = 30000) - public void testTokenThreadTimeoutWithoutDelay() throws Exception { - Configuration yarnConf = new YarnConfiguration(); - yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, - true); - yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - "kerberos"); - yarnConf.set(YarnConfiguration.RM_STORE, - MemoryRMStateStore.class.getName()); - yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5, - TimeUnit.SECONDS); - yarnConf.setTimeDuration( - YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5, - TimeUnit.SECONDS); - yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, - 3); - UserGroupInformation.setConfiguration(yarnConf); - - Text userText = new Text("user1"); - DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText, - new Text("renewer1"), userText); - final Token<DelegationTokenIdentifier> originalToken = - new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(), - new Text("service1")); - - Credentials credentials = new Credentials(); - credentials.addToken(userText, originalToken); - - AtomicBoolean renewDelay = new AtomicBoolean(false); - - // -1 is because of thread allocated to pool tracker runnable tasks - AtomicInteger threadCounter = new AtomicInteger(-1); - DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( - yarnConf, threadCounter, renewDelay); - - MockRM rm = new TestSecurityMockRM(yarnConf) { - @Override - protected DelegationTokenRenewer createDelegationTokenRenewer() { - return renwer; - } - }; - - rm.start(); - rm.submitApp(200, "name", "user", - new HashMap<ApplicationAccessType, String>(), false, "default", 1, - credentials); - - GenericTestUtils.waitFor(() -> threadCounter.get() == 1, 2000, 40000); - - // Ensure only one thread has been used in renewer service thread pool. - assertEquals(threadCounter.get(), 1); - rm.close(); - } - - private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( - Configuration config, final AtomicInteger renewerCounter, - final AtomicBoolean renewDelay) { - DelegationTokenRenewer renew = new DelegationTokenRenewer() { - @Override - protected ThreadPoolExecutor createNewThreadPoolService( - Configuration configuration) { - ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L, - TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) { - @Override - public Future<?> submit(Runnable r) { - renewerCounter.incrementAndGet(); - return super.submit(r); - } - }; - return pool; - } - - @Override - protected void renewToken(final DelegationTokenToRenew dttr) - throws IOException { - try { - if (renewDelay.get()) { - // Delay for 4 times than the configured timeout - Thread.sleep(config.getTimeDuration( - YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, - YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT, - TimeUnit.MILLISECONDS) * 4); - } - super.renewToken(dttr); - } catch (InterruptedException e) { - LOG.info("Sleep Interrupted", e); - } - } - }; - renew.setDelegationTokenRenewerPoolTracker(true); - return renew; - } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org