Author: daryn Date: Fri Aug 17 14:05:11 2012 New Revision: 1374271 URL: http://svn.apache.org/viewvc?rev=1374271&view=rev Log: HADOOP-7967. Need generalized multi-token filesystem support (daryn)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1374271&r1=1374270&r2=1374271&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Aug 17 14:05:11 2012 @@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.secu import java.io.IOException; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.commons.logging.Log; @@ -110,7 +109,6 @@ public class TokenCache { * @param conf * @throws IOException */ - @SuppressWarnings("deprecation") static void obtainTokensForNamenodesInternal(FileSystem fs, Credentials credentials, Configuration conf) throws IOException { String delegTokenRenewer = Master.getMasterPrincipal(conf); @@ -120,26 +118,11 @@ public class TokenCache { } mergeBinaryTokens(credentials, conf); - String fsName = fs.getCanonicalServiceName(); - if (TokenCache.getDelegationToken(credentials, fsName) == null) { - List<Token<?>> tokens = - fs.getDelegationTokens(delegTokenRenewer, credentials); - if (tokens != null) { - for (Token<?> token : tokens) { - credentials.addToken(token.getService(), token); - LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + - ";t.service="+token.getService()); - } - } - //Call getDelegationToken as well for now - for FS implementations - // which may not have implmented getDelegationTokens (hftp) - if (tokens == null || tokens.size() == 0) { - Token<?> token = fs.getDelegationToken(delegTokenRenewer); - if (token != null) { - credentials.addToken(token.getService(), token); - LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName - + ";t.service=" + token.getService()); - } + final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, + credentials); + if (tokens != null) { + for (Token<?> token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; "+token); } } } @@ -174,21 +157,6 @@ public class TokenCache { private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken"); /** - * - * @param namenode - * @return delegation token - */ - @InterfaceAudience.Private - public static Token<?> getDelegationToken( - Credentials credentials, String namenode) { - //No fs specific tokens issues by this fs. It may however issue tokens - // for other filesystems - which would be keyed by that filesystems name. - if (namenode == null) - return null; - return (Token<?>) credentials.getToken(new Text(namenode)); - } - - /** * load job token from a file * @param conf * @throws IOException Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1374271&r1=1374270&r2=1374271&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Aug 17 14:05:11 2012 @@ -18,23 +18,16 @@ package org.apache.hadoop.mapreduce.security; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.Assert.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; import java.io.IOException; import java.net.URI; -import java.util.LinkedList; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Master; @@ -43,145 +36,42 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestTokenCache { - - @Test - @SuppressWarnings("deprecation") - public void testGetDelegationTokensNotImplemented() throws Exception { - Credentials credentials = new Credentials(); - Configuration conf = new Configuration(); + private static Configuration conf; + private static String renewer; + + @BeforeClass + public static void setup() throws Exception { + conf = new Configuration(); conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); - String renewer = Master.getMasterPrincipal(conf); - - FileSystem fs = setupSingleFsWithoutGetDelegationTokens(); - TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf); - assertEquals(1, credentials.getAllTokens().size()); - - verify(fs).getDelegationTokens(renewer, credentials); - verify(fs).getDelegationToken(renewer); + renewer = Master.getMasterPrincipal(conf); } @Test - public void testManagedFileSystem() throws Exception { + public void testObtainTokens() throws Exception { Credentials credentials = new Credentials(); - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); - String renewer = Master.getMasterPrincipal(conf); - - FileSystem singleFs = setupSingleFs(); - FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials); - - TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf); - assertEquals(1, credentials.getAllTokens().size()); - - TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf); - assertEquals(1, credentials.getAllTokens().size()); - - TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf); - assertEquals(2, credentials.getAllTokens().size()); - - TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf); - assertEquals(2, credentials.getAllTokens().size()); - - verify(singleFs, times(1)).getDelegationTokens(renewer, credentials); - verify(multiFs, times(2)).getDelegationTokens(renewer, credentials); - // A call to getDelegationToken would have generated an exception. - } - - @SuppressWarnings("deprecation") - private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception { - FileSystem mockFs = mock(FileSystem.class); - when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4"); - when(mockFs.getUri()).thenReturn(new URI("singlefs4:///")); - - final Token<?> mockToken = (Token<?>) mock(Token.class); - when(mockToken.getService()).thenReturn(new Text("singlefs4")); - - when(mockFs.getDelegationToken(any(String.class))).thenAnswer( - new Answer<Token<?>>() { - @Override - public Token<?> answer(InvocationOnMock invocation) throws Throwable { - return mockToken; - } - }); - - when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class))) - .thenReturn(new LinkedList<Token<?>>()); - - return mockFs; - } - - private FileSystem setupSingleFs() throws Exception { - FileSystem mockFs = mock(FileSystem.class); - when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1"); - when(mockFs.getUri()).thenReturn(new URI("singlefs1:///")); - - List<Token<?>> tokens = new LinkedList<Token<?>>(); - Token<?> mockToken = mock(Token.class); - when(mockToken.getService()).thenReturn(new Text("singlefs1")); - tokens.add(mockToken); - - when(mockFs.getDelegationTokens(any(String.class))).thenThrow( - new RuntimeException( - "getDelegationTokens(renewer) should not be called")); - when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class))) - .thenReturn(tokens); - - return mockFs; - } - - private FileSystem setupMultiFs(final FileSystem singleFs, - final String renewer, final Credentials credentials) throws Exception { - FileSystem mockFs = mock(FileSystem.class); - when(mockFs.getCanonicalServiceName()).thenReturn(null); - when(mockFs.getUri()).thenReturn(new URI("multifs:///")); - - when(mockFs.getDelegationTokens(any(String.class))).thenThrow( - new RuntimeException( - "getDelegationTokens(renewer) should not be called")); - when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer( - new Answer<List<Token<?>>>() { - - @Override - public List<Token<?>> answer(InvocationOnMock invocation) - throws Throwable { - List<Token<?>> newTokens = new LinkedList<Token<?>>(); - if (credentials.getToken(new Text("singlefs1")) == null) { - newTokens.addAll(singleFs.getDelegationTokens(renewer, - credentials)); - } else { - newTokens.add(credentials.getToken(new Text("singlefs1"))); - } - Token<?> mockToken2 = mock(Token.class); - when(mockToken2.getService()).thenReturn(new Text("singlefs2")); - newTokens.add(mockToken2); - return newTokens; - } - }); - - return mockFs; + FileSystem fs = mock(FileSystem.class); + TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf); + verify(fs).addDelegationTokens(eq(renewer), eq(credentials)); } @Test @SuppressWarnings("deprecation") public void testBinaryCredentials() throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); - String renewer = Master.getMasterPrincipal(conf); - Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","test/build/data")); // ick, but need fq path minus file:/ String binaryTokenFile = FileSystem.getLocal(conf).makeQualified( new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath(); - FileSystem fs1 = createFileSystemForService("service1"); - FileSystem fs2 = createFileSystemForService("service2"); - FileSystem fs3 = createFileSystemForService("service3"); + MockFileSystem fs1 = createFileSystemForServiceName("service1"); + MockFileSystem fs2 = createFileSystemForServiceName("service2"); + MockFileSystem fs3 = createFileSystemForServiceName("service3"); // get the tokens for fs1 & fs2 and write out to binary creds file Credentials creds = new Credentials(); @@ -196,7 +86,7 @@ public class TestTokenCache { // re-init creds and add a newer token for fs1 creds = new Credentials(); Token<?> newerToken1 = fs1.getDelegationToken(renewer); - assertFalse(newerToken1.equals(token1)); + assertNotSame(newerToken1, token1); creds.addToken(newerToken1.getService(), newerToken1); checkToken(creds, newerToken1); @@ -230,10 +120,9 @@ public class TestTokenCache { } } - @SuppressWarnings("deprecation") - private FileSystem createFileSystemForService(final String service) + private MockFileSystem createFileSystemForServiceName(final String service) throws IOException { - FileSystem mockFs = mock(FileSystem.class); + MockFileSystem mockFs = new MockFileSystem(); when(mockFs.getCanonicalServiceName()).thenReturn(service); when(mockFs.getDelegationToken(any(String.class))).thenAnswer( new Answer<Token<?>>() { @@ -258,7 +147,8 @@ public class TestTokenCache { String renewer = Master.getMasterPrincipal(conf); Credentials credentials = new Credentials(); - FileSystem mockFs = mock(FileSystem.class); + final MockFileSystem fs = new MockFileSystem(); + final MockFileSystem mockFs = (MockFileSystem) fs.getRawFileSystem(); when(mockFs.getCanonicalServiceName()).thenReturn("host:0"); when(mockFs.getUri()).thenReturn(new URI("mockfs://host:0")); @@ -266,9 +156,9 @@ public class TestTokenCache { when(mockPath.getFileSystem(conf)).thenReturn(mockFs); Path[] paths = new Path[]{ mockPath, mockPath }; - when(mockFs.getDelegationTokens("me", credentials)).thenReturn(null); + when(mockFs.addDelegationTokens("me", credentials)).thenReturn(null); TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf); - verify(mockFs, times(1)).getDelegationTokens(renewer, credentials); + verify(mockFs, times(1)).addDelegationTokens(renewer, credentials); } @Test @@ -278,5 +168,4 @@ public class TestTokenCache { TokenCache.cleanUpTokenReferral(conf); assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY)); } - -} +} \ No newline at end of file