Author: omalley
Date: Fri Mar 4 04:17:00 2011
New Revision: 1077457
URL: http://svn.apache.org/viewvc?rev=1077457&view=rev
Log:
commit 9058858665529dd723cbbf2198b7c2edcc39af66
Author: Arun C Murthy <[email protected]>
Date: Sun May 9 09:19:41 2010 -0700
MAPREDUCE-1744. Fixed DistributedCache apis to take a user-supplied
FileSystem to allow for better proxy behaviour for Oozie. Contribted by Richard
King.
From
https://issues.apache.org/jira/secure/attachment/12444060/MAPREDUCE-1744.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1744. Fixed DistributedCache apis to take a user-supplied
+ FileSystem to allow for better proxy behaviour for Oozie. (Richard
King)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestDistributedCacheAlternateFileSystem.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077457&r1=1077456&r2=1077457&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar 4 04:17:00 2011
@@ -396,15 +396,6 @@ public class DistributedCache {
return null;
}
}
-
- private static FileSystem getFileSystem(URI cache, Configuration conf)
- throws IOException {
- String fileSysName = getFileSysName(cache);
- if (fileSysName != null)
- return FileSystem.getNamed(fileSysName, conf);
- else
- return FileSystem.get(conf);
- }
/**
* Set the configuration with the given set of archives. Intended
@@ -567,18 +558,39 @@ public class DistributedCache {
/**
* Add a file path to the current set of classpath entries. It adds the file
* to cache as well. Intended to be used by user code.
+ *
+ * @deprecated
+ *
+ * Please use {@link #addFileToClassPath(Path, Configuration, FileSystem)}
+ * instead. The {@code FileSystem} should be obtained within an
+ * appropriate {@code doAs}.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
*/
+ @Deprecated
public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
+ addFileToClassPath(file, conf, FileSystem.get(conf));
+ }
+
+ /**
+ * Add a file path to the current set of classpath entries. It adds the file
+ * to cache as well. Intended to be used by user code.
+ *
+ * @param file Path of the file to be added
+ * @param conf Configuration that contains the classpath setting
+ * @param fs FileSystem with respect to which {@code archivefile} should
+ * be interpreted.
+ */
+ public static void addFileToClassPath
+ (Path file, Configuration conf, FileSystem fs)
+ throws IOException {
String classpath = conf.get("mapred.job.classpath.files");
conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
- : classpath + System.getProperty("path.separator") +
file.toString());
- FileSystem fs = FileSystem.get(conf);
+ : classpath
+ + System.getProperty("path.separator") + file.toString());
URI uri = fs.makeQualified(file).toUri();
-
addCacheFile(uri, conf);
}
@@ -604,17 +616,38 @@ public class DistributedCache {
/**
* Add an archive path to the current set of classpath entries. It adds the
* archive to cache as well. Intended to be used by user code.
+ *
+ * @deprecated
+ *
+ * Please use {@link #addArchiveToClassPath(Path, Configuration,
FileSystem)}
+ * instead. The {@code FileSystem} should be obtained within an
+ * appropriate {@code doAs}.
*
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
*/
- public static void addArchiveToClassPath(Path archive, Configuration conf)
- throws IOException {
+ @Deprecated
+ public static void addArchiveToClassPath
+ (Path archive, Configuration conf)
+ throws IOException {
+ addArchiveToClassPath(archive, conf, FileSystem.get(conf));
+ }
+
+ /**
+ * Add an archive path to the current set of classpath entries. It adds the
+ * archive to cache as well. Intended to be used by user code.
+ *
+ * @param archive Path of the archive to be added
+ * @param conf Configuration that contains the classpath setting
+ * @param fs FileSystem with respect to which {@code archive} should be
interpreted.
+ */
+ public static void addArchiveToClassPath
+ (Path archive, Configuration conf, FileSystem fs)
+ throws IOException {
String classpath = conf.get("mapred.job.classpath.archives");
conf.set("mapred.job.classpath.archives", classpath == null ? archive
.toString() : classpath + System.getProperty("path.separator")
+ archive.toString());
- FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(archive).toUri();
addCacheArchive(uri, conf);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077457&r1=1077456&r2=1077457&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
Fri Mar 4 04:17:00 2011
@@ -633,8 +633,8 @@ public class JobClient extends Configure
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
- DistributedCache.addArchiveToClassPath(
- new Path(newPath.toUri().getPath()), job);
+ DistributedCache.addArchiveToClassPath
+ (new Path(newPath.toUri().getPath()), job, fs);
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java?rev=1077457&r1=1077456&r2=1077457&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
Fri Mar 4 04:17:00 2011
@@ -138,8 +138,9 @@ public class TestMRWithDistributedCache
DistributedCache.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"),
conf);
- DistributedCache.addFileToClassPath(second, conf);
- DistributedCache.addArchiveToClassPath(third, conf);
+ FileSystem fs = FileSystem.get(conf);
+ DistributedCache.addFileToClassPath(second, conf, fs);
+ DistributedCache.addArchiveToClassPath(third, conf, fs);
DistributedCache.addCacheArchive(fourth.toUri(), conf);
DistributedCache.createSymlink(conf);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077457&r1=1077456&r2=1077457&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Fri Mar 4 04:17:00 2011
@@ -145,7 +145,8 @@ public class TestTrackerDistributedCache
String userName = getJobOwnerName();
subConf.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
- DistributedCache.addFileToClassPath(secondCacheFile, subConf);
+ DistributedCache.addFileToClassPath(secondCacheFile, subConf,
+ FileSystem.get(subConf));
TrackerDistributedCacheManager.determineTimestamps(subConf);
TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
// ****** End of imitating JobClient code
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestDistributedCacheAlternateFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestDistributedCacheAlternateFileSystem.java?rev=1077457&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestDistributedCacheAlternateFileSystem.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestDistributedCacheAlternateFileSystem.java
Fri Mar 4 04:17:00 2011
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+import java.io.File;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import java.net.URI;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.junit.Test;
+import org.apache.hadoop.ipc.TestSaslRPC;
+import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
+import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
+import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSelector;
+
+import org.apache.hadoop.filecache.DistributedCache;
+
+import org.apache.commons.logging.*;
+
+/**
+ *
+ */
+public class TestDistributedCacheAlternateFileSystem {
+ final private static String REAL_USER_NAME = "[email protected]";
+ final private static String REAL_USER_SHORT_NAME = "realUser1";
+ final private static String PROXY_USER_NAME = "proxyUser";
+ final private static String GROUP1_NAME = "group1";
+ final private static String GROUP2_NAME = "group2";
+ final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME,
+ GROUP2_NAME };
+ private static final String ADDRESS = "0.0.0.0";
+ private TestProtocol proxy;
+ private static Configuration masterConf = new Configuration();
+
+ final private static String ALTERNATE_FILE_BASE = "gqlpt";
+
+ public static final Log LOG = LogFactory
+ .getLog(TestDistributedCacheAlternateFileSystem.class);
+
+
+ static {
+ masterConf.set("hadoop.security.auth_to_local",
+ "RULE:[2:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" +
+ "RULE:[1:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//"
+ + "DEFAULT");
+ UserGroupInformation.setConfiguration(masterConf);
+ }
+
+ private void configureSuperUserIPAddresses(Configuration conf,
+ String superUserShortName) throws IOException {
+ ArrayList<String> ipList = new ArrayList<String>();
+ Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
+ .getNetworkInterfaces();
+ while (netInterfaceList.hasMoreElements()) {
+ NetworkInterface inf = netInterfaceList.nextElement();
+ Enumeration<InetAddress> addrList = inf.getInetAddresses();
+ while (addrList.hasMoreElements()) {
+ InetAddress addr = addrList.nextElement();
+ ipList.add(addr.getHostAddress());
+ }
+ }
+ StringBuilder builder = new StringBuilder();
+ for (String ip : ipList) {
+ builder.append(ip);
+ builder.append(',');
+ }
+ builder.append("127.0.1.1,");
+ builder.append(InetAddress.getLocalHost().getCanonicalHostName());
+ LOG.info("Local Ip addresses: "+builder.toString());
+ conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
+ builder.toString());
+ }
+
+ private FileSystem getFS
+ (UserGroupInformation ugi, final Configuration conf)
+ throws IOException {
+ final Path sysDir = new Path("/" + ALTERNATE_FILE_BASE); // getSystemDir()
+
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ return sysDir.getFileSystem(conf);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void addToClasspath(final FileSystem proxyUserFileSystem,
+ final FileSystem realUserFileSystem,
+ final UserGroupInformation ugi,
+ final String corePathString,
+ final Configuration conf)
+ throws IOException, InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ public Boolean run()throws IOException {
+ DistributedCache.addFileToClassPath
+ (new Path("proxy-fs-as-" + corePathString), conf,
+ proxyUserFileSystem);
+ DistributedCache.addFileToClassPath
+ (new Path("real-fs-as-" + corePathString), conf,
+ realUserFileSystem);
+ DistributedCache.addFileToClassPath
+ (new Path("no-fs-as-" + corePathString), conf);
+ return true;
+ }
+ });
+ }
+
+ private static boolean uriUsesProxyFS(URI uri) {
+ String uriString = uri.toString();
+
+ return uriString.contains("file:/" + ALTERNATE_FILE_BASE + "/");
+ }
+
+ private static boolean uriShouldUseProxyFS(URI uri) {
+ String uriString = uri.toString();
+
+ int filenameStart = uriString.lastIndexOf(File.separator) + 1;
+
+ String fileName = uriString.substring(filenameStart);
+
+ int lastProxyOccurrence = uriString.lastIndexOf("proxy");
+
+ return (fileName.contains("proxy-fs-as")
+ || (fileName.contains("no-fs-as") &&
fileName.contains("as-proxy")));
+ }
+
+ @Test
+ public void testDistributedCacheProxyUsers() throws Exception {
+ // ensure that doAs works correctly
+ UserGroupInformation realUserUgi = UserGroupInformation
+ .createRemoteUser(REAL_USER_NAME);
+ UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUser(
+ PROXY_USER_NAME, realUserUgi);
+ UserGroupInformation curUGI = proxyUserUgi
+ .doAs(new PrivilegedExceptionAction<UserGroupInformation>() {
+ public UserGroupInformation run() throws IOException {
+ return UserGroupInformation.getCurrentUser();
+ }
+ });
+ Assert.assertTrue(curUGI.toString().equals(
+ PROXY_USER_NAME + " via " + REAL_USER_NAME));
+
+
+ final Configuration conf = new Configuration();
+
+ final FileSystem realUserFileSystem = getFS(realUserUgi, conf);
+
+ FileSystem proxyUserFileSystemTemp;
+
+ String oldWorkingDir = System.getProperty("user.dir");
+ try {
+ System.setProperty("user.dir", "/" + ALTERNATE_FILE_BASE);
+
+ proxyUserFileSystemTemp = getFS(proxyUserUgi, conf);
+ } finally {
+ System.setProperty("user.dir", oldWorkingDir);
+ }
+
+ final FileSystem proxyUserFileSystem = proxyUserFileSystemTemp;
+
+ addToClasspath(proxyUserFileSystem, realUserFileSystem,
+ realUserUgi, "real.jar", conf);
+ addToClasspath(proxyUserFileSystem, realUserFileSystem,
+ proxyUserUgi, "proxy.jar", conf);
+
+ URI[] result = DistributedCache.getCacheFiles(conf);
+
+ for (URI uri : result) {
+ System.out.println("One URI is " + uri);
+ }
+
+ for (URI uri : result) {
+ Assert.assertEquals("Inconsistent file system usage for URI " + uri,
+ uriUsesProxyFS(uri),
+ uriShouldUseProxyFS(uri));
+ }
+ }
+
+
+ @TokenInfo(TestTokenSelector.class)
+ public interface TestProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
+ String aMethod() throws IOException;
+ }
+
+ public class TestImpl implements TestProtocol {
+
+ public String aMethod() throws IOException {
+ return UserGroupInformation.getCurrentUser().toString();
+ }
+
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return TestProtocol.versionID;
+ }
+ }
+
+ //
+ private void refreshConf(Configuration conf) throws IOException {
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+ }
+}