HIVE-16686 : repl invocations of distcp needs additional handling (Sushanth Sowmyan, reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c891ad4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c891ad4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c891ad4 Branch: refs/heads/hive-14535 Commit: 1c891ad4e228aee22aae6c4f8ab572f8867ea441 Parents: dec96ca Author: Sushanth Sowmyan <[email protected]> Authored: Mon May 22 13:55:20 2017 -0700 Committer: Sushanth Sowmyan <[email protected]> Committed: Wed May 24 09:38:10 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 42 +++++++-- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../hadoop/hive/common/TestFileUtils.java | 22 +++++ .../hadoop/hive/ql/exec/ReplCopyTask.java | 15 ++- shims/0.23/pom.xml | 10 +- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 46 +++++++++- .../hadoop/hive/shims/TestHadoop23Shims.java | 96 ++++++++++++++++++++ .../apache/hadoop/hive/shims/HadoopShims.java | 14 +++ 8 files changed, 234 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 985fd8c..c0388f6 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -583,13 +583,23 @@ public final class FileUtils { * Copies files between filesystems. */ public static boolean copy(FileSystem srcFS, Path src, - FileSystem dstFS, Path dst, - boolean deleteSource, - boolean overwrite, - HiveConf conf) throws IOException { + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + HiveConf conf) throws IOException { return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims()); } + /** + * Copies files between filesystems as a fs super user using distcp, and runs + * as a privileged user. + */ + public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst, + HiveConf conf) throws IOException { + String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims()); + } + @VisibleForTesting static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, @@ -612,18 +622,34 @@ public final class FileUtils { HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); LOG.info("Launch distributed copy (distcp) job."); triedDistcp = true; - copied = shims.runDistCp(src, dst, conf); - if (copied && deleteSource) { - srcFS.delete(src, true); - } + copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims); } } if (!triedDistcp) { + // Note : Currently, this implementation does not "fall back" to regular copy if distcp + // is tried and it fails. We depend upon that behaviour in cases like replication, + // wherein if distcp fails, there is good reason to not plod along with a trivial + // implementation, and fail instead. copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } return copied; } + static boolean distCp(FileSystem srcFS, Path src, Path dst, + boolean deleteSource, String doAsUser, + HiveConf conf, HadoopShims shims) throws IOException { + boolean copied = false; + if (doAsUser == null){ + copied = shims.runDistCp(src, dst, conf); + } else { + copied = shims.runDistCpAs(src, dst, conf, doAsUser); + } + if (copied && deleteSource) { + srcFS.delete(src, true); + } + return copied; + } + /** * Move a particular file or directory to the trash. * @param fs FileSystem to use http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 06332ac..2dfc8b6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2611,6 +2611,10 @@ public class HiveConf extends Configuration { HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true, "Setting this property to true will have HiveServer2 execute\n" + "Hive operations as the user making the calls to it."), + HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs", + "This property allows privileged distcp executions done by hive\n" + + "to run as this user. Typically, it should be the user you\n" + + "run the namenode as, such as the 'hdfs' user."), HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC", new StringSet("CLASSIC", "HIVE"), "This setting reflects how HiveServer2 will report the table types for JDBC and other\n" + "client implementations that retrieve the available tables and supported table types\n" + @@ -3401,6 +3405,7 @@ public class HiveConf extends Configuration { "hive.security.authenticator.manager,hive.security.authorization.manager," + "hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," + "hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled," + + "hive.distcp.privileged.doAs," + "hive.server2.authentication.ldap.baseDN," + "hive.server2.authentication.ldap.url," + "hive.server2.authentication.ldap.Domain," + http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java index adc9b0c..d3c8761 100644 --- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java +++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java @@ -228,4 +228,26 @@ public class TestFileUtils { Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf, shims)); verify(shims).runDistCp(copySrc, copyDst, conf); } + + @Test + public void testCopyWithDistCpAs() throws IOException { + Path copySrc = new Path("copySrc"); + Path copyDst = new Path("copyDst"); + HiveConf conf = new HiveConf(TestFileUtils.class); + + FileSystem fs = copySrc.getFileSystem(conf); + + String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + + HadoopShims shims = mock(HadoopShims.class); + when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true); + when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false); + + // doAs when asked + Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims)); + verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser); + // don't doAs when not asked + Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims)); + verify(shims).runDistCp(copySrc, copyDst, conf); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index d2f9e79..f277284 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -139,10 +139,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); - if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath, - false, // delete source - true, // overwrite destination - conf)) { + if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs)) { console.printError("Failed to copy: '" + oneSrc.getPath().toString() + "to: '" + toPath.toString() + "'"); return 1; @@ -169,6 +166,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } + private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)){ + // regular copy in test env. + return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf); + } else { + // distcp in actual deployment with privilege escalation + return FileUtils.privilegedCopy(srcFs, src, dst, conf); + } + } + private List<FileStatus> filesInFileListing(FileSystem fs, Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/pom.xml ---------------------------------------------------------------------- diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml index 7c586fa..3ff1d38 100644 --- a/shims/0.23/pom.xml +++ b/shims/0.23/pom.xml @@ -205,6 +205,14 @@ <version>${hadoop.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> - + <build> + <sourceDirectory>${basedir}/src/main/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/main/test</testSourceDirectory> + </build> </project> http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 0483e91..4319bed 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -27,6 +27,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -37,6 +38,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; @@ -1081,16 +1084,53 @@ public class Hadoop23Shims extends HadoopShimsSecure { } } + private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; + + List<String> constructDistCpParams(Path src, Path dst, Configuration conf) { + List<String> params = new ArrayList<String>(); + for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ + String distCpOption = entry.getKey(); + String distCpVal = entry.getValue(); + params.add("-" + distCpOption); + if ((distCpVal != null) && (!distCpVal.isEmpty())){ + params.add(distCpVal); + } + } + if (params.size() == 0){ + // if no entries were added via conf, we initiate our defaults + params.add("-update"); + params.add("-skipcrccheck"); + } + params.add(src.toString()); + params.add(dst.toString()); + return params; + } + @Override - public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException { + public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException { + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + doAsUser, UserGroupInformation.getLoginUser()); + try { + return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + return runDistCp(src, dst, conf); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + @Override + public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException { DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst); options.setSyncFolder(true); options.setSkipCRC(true); options.preserve(FileAttribute.BLOCKSIZE); // Creates the command-line parameters for distcp - String[] params = {"-update", "-skipcrccheck", src.toString(), dst.toString()}; + List<String> params = constructDistCpParams(src, dst, conf); try { conf.setBoolean("mapred.mapper.new-api", true); @@ -1098,7 +1138,7 @@ public class Hadoop23Shims extends HadoopShimsSecure { // HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue // added by HADOOP-10459 - if (distcp.run(params) == 0) { + if (distcp.run(params.toArray(new String[0])) == 0) { return true; } else { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java new file mode 100644 index 0000000..ba1086c --- /dev/null +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.shims; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.DistCpOptions; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestHadoop23Shims { + + @Test + public void testConstructDistCpParams() { + Path copySrc = new Path("copySrc"); + Path copyDst = new Path("copyDst"); + Configuration conf = new Configuration(); + + Hadoop23Shims shims = new Hadoop23Shims(); + List<String> paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf); + + assertEquals(4, paramsDefault.size()); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck")); + assertEquals(copySrc.toString(), paramsDefault.get(2)); + assertEquals(copyDst.toString(), paramsDefault.get(3)); + + conf.set("distcp.options.foo", "bar"); // should set "-foo bar" + conf.set("distcp.options.blah", ""); // should set "-blah" + conf.set("dummy", "option"); // should be ignored. + List<String> paramsWithCustomParamInjection = + shims.constructDistCpParams(copySrc, copyDst, conf); + + assertEquals(5, paramsWithCustomParamInjection.size()); + + // check that the defaults did not remain. + assertTrue("Distcp -update not set if not requested", + !paramsWithCustomParamInjection.contains("-update")); + assertTrue("Distcp -skipcrccheck not set if not requested", + !paramsWithCustomParamInjection.contains("-skipcrccheck")); + + // the "-foo bar" and "-blah" params order is not guaranteed + String firstParam = paramsWithCustomParamInjection.get(0); + if (firstParam.equals("-foo")){ + // "-foo bar -blah" form + assertEquals("bar", paramsWithCustomParamInjection.get(1)); + assertEquals("-blah", paramsWithCustomParamInjection.get(2)); + } else { + // "-blah -foo bar" form + assertEquals("-blah", paramsWithCustomParamInjection.get(0)); + assertEquals("-foo", paramsWithCustomParamInjection.get(1)); + assertEquals("bar", paramsWithCustomParamInjection.get(2)); + } + + // the dummy option should not have made it either - only options + // beginning with distcp.options. should be honoured + assertTrue(!paramsWithCustomParamInjection.contains("dummy")); + assertTrue(!paramsWithCustomParamInjection.contains("-dummy")); + assertTrue(!paramsWithCustomParamInjection.contains("option")); + assertTrue(!paramsWithCustomParamInjection.contains("-option")); + + assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3)); + assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4)); + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index c280d49..d08ad04 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -479,6 +479,20 @@ public interface HadoopShims { /** * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes. * This distributed process is meant to copy huge files that could take some time if a single + * copy is done. This is a variation which allows proxying as a different user to perform + * the distcp, and requires that the caller have requisite proxy user privileges. + * + * @param src Path to the source file or directory to copy + * @param dst Path to the destination file or directory + * @param conf The hadoop configuration object + * @param doAsUser The user to perform the distcp as + * @return True if it is successfull; False otherwise. + */ + public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException; + + /** + * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes. + * This distributed process is meant to copy huge files that could take some time if a single * copy is done. * * @param src Path to the source file or directory to copy
