This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 38682a4 HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) 38682a4 is described below commit 38682a414708d810937012bae4b0c97deca5ef07 Author: Mahesh Kumar Behera <mbeh...@hortonworks.com> AuthorDate: Thu Mar 21 20:27:38 2019 +0530 HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) --- .../org/apache/hadoop/hive/common/FileUtils.java | 10 +- .../apache/hadoop/hive/common/TestFileUtils.java | 11 +- .../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 2 +- .../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 86 ++++++++---- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 150 ++++++++++++++------- .../hive/ql/parse/repl/dump/io/FileOperations.java | 5 +- .../hadoop/hive/ql/parse/repl/TestCopyUtils.java | 8 +- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 5 +- .../org/apache/hadoop/hive/shims/HadoopShims.java | 5 +- 9 files changed, 182 insertions(+), 100 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 23a3a6b..8b03faa 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -636,18 +636,18 @@ public final class FileUtils { } public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst, - boolean deleteSource, String doAsUser, + boolean deleteSource, UserGroupInformation proxyUser, HiveConf conf, HadoopShims shims) throws IOException { LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}", - StringUtils.join(",", srcPaths), dst.toString(), doAsUser); + StringUtils.join(",", srcPaths), dst.toString(), proxyUser); boolean copied = false; - if (doAsUser == null){ + if (proxyUser == null){ copied = shims.runDistCp(srcPaths, dst, conf); } else { - copied = shims.runDistCpAs(srcPaths, dst, conf, doAsUser); + copied = shims.runDistCpAs(srcPaths, dst, conf, proxyUser); } if (copied && deleteSource) { - if (doAsUser != null) { + if (proxyUser != null) { // if distcp is done using doAsUser, delete also should be done using same user. //TODO : Need to change the delete execution within doAs if doAsUser is given. throw new IOException("Distcp is called with doAsUser and delete source set as true"); diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java index b45832e..9b5748e 100644 --- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java +++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; @@ -239,14 +240,16 @@ public class TestFileUtils { FileSystem fs = copySrc.getFileSystem(conf); String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + doAsUser, UserGroupInformation.getLoginUser()); HadoopShims shims = mock(HadoopShims.class); - when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser)).thenReturn(true); + when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser)).thenReturn(true); when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(false); // doAs when asked - Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, doAsUser, conf, shims)); - verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser); + Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, proxyUser, conf, shims)); + verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser); // don't doAs when not asked Assert.assertFalse(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, null, conf, shims)); verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf); @@ -254,7 +257,7 @@ public class TestFileUtils { // When distcp is done with doAs, the delete should also be done as doAs. But in current code its broken. This // should be fixed. For now check is added to avoid wrong usage. So if doAs is set, delete source should be false. try { - FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, doAsUser, conf, shims); + FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, proxyUser, conf, shims); Assert.assertTrue("Should throw IOException as doAs is called with delete source set to true".equals("")); } catch (IOException e) { Assert.assertTrue(e.getMessage(). diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 55a0c1f..c34f075 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -257,7 +257,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(dstFs, toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index d7eed2c..6bc3cd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -90,7 +90,7 @@ public class ExternalTableCopyTaskBuilder { } catch (FileNotFoundException e) { // Don't delete target path created else ddl task will try to create it using user hive and may fail. LOG.warn("source path missing " + sourcePath); - return false; + return createdDir; } LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); @@ -99,53 +99,65 @@ public class ExternalTableCopyTaskBuilder { return createdDir; } - private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser) - throws IOException { - if (distCpDoAsUser == null) { + private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser) + throws IOException, InterruptedException { + if (proxyUser == null) { return createAndSetPathOwner(targetPath, sourcePath); } - UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( - distCpDoAsUser, UserGroupInformation.getLoginUser()); - try { - Path finalTargetPath = targetPath; - Path finalSourcePath = sourcePath; - return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> - createAndSetPathOwner(finalTargetPath, finalSourcePath)); - } catch (InterruptedException e) { - throw new IOException(e); + return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> + createAndSetPathOwner(targetPath, sourcePath)); + } + + private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception { + if (proxyUser == null) { + return sourcePath.getFileSystem(conf).exists(sourcePath); } + return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> + sourcePath.getFileSystem(conf).exists(sourcePath)); } - private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) { + private int handleException(Exception e, Path sourcePath, Path targetPath, + int currentRetry, UserGroupInformation proxyUser) { try { - if (!sourcePath.getFileSystem(conf).exists(sourcePath)) { - LOG.warn("Source path missing " + sourcePath, e); + LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e); + if (!checkIfPathExist(sourcePath, proxyUser)) { + LOG.info("Source path is missing. Ignoring exception."); return 0; } } catch (Exception ex) { - LOG.warn("Source path missing check failed" + sourcePath, ex); + LOG.warn("Source path missing check failed. ", ex); + } + + // retry logic only for i/o exception + if (!(e instanceof IOException)) { + LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } if (currentRetry <= MAX_COPY_RETRY) { - LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e); + LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e); } else { - LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); + LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode(); } int sleepTime = FileUtils.getSleepTime(currentRetry); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry)); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); try { Thread.sleep(sleepTime); } catch (InterruptedException timerEx) { - LOG.info("sleep interrupted", timerEx.getMessage()); + LOG.info("Sleep interrupted", timerEx.getMessage()); } try { - FileSystem.closeAllForUGI(Utils.getUGI()); + if (proxyUser == null) { + proxyUser = Utils.getUGI(); + } + FileSystem.closeAllForUGI(proxyUser); } catch (Exception ex) { - LOG.error("unable to closeAllForUGI", ex); + LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex); } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -162,14 +174,17 @@ public class ExternalTableCopyTaskBuilder { } int currentRetry = 0; int error = 0; + UserGroupInformation proxyUser = null; while (currentRetry <= MAX_COPY_RETRY) { try { UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); - boolean usePrivilegedUser = - distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser); + if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) { + proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + } - setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null); + setTargetPathOwner(targetPath, sourcePath, proxyUser); // do we create a new conf and only here provide this additional option so that we get away from // differences of data in two location for the same directories ? @@ -179,16 +194,29 @@ public class ExternalTableCopyTaskBuilder { Collections.singletonList(sourcePath), // list of source paths targetPath, false, - usePrivilegedUser ? distCpDoAsUser : null, + proxyUser, conf, ShimLoader.getHadoopShims()); return 0; } catch (Exception e) { currentRetry++; - error = handleException(e, sourcePath, targetPath, currentRetry); + error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser); if (error == 0) { return 0; } + } finally { + if (proxyUser != null) { + try { + FileSystem.closeAllForUGI(proxyUser); + } catch (IOException e) { + LOG.error("Unable to closeAllForUGI for user " + proxyUser, e); + if (error == 0) { + setException(e); + error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + break; + } + } } } return error; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 686fe7b..73c863e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -50,59 +50,70 @@ public class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/"; - private static final int MAX_COPY_RETRY = 5; + private static final int MAX_IO_RETRY = 5; private final HiveConf hiveConf; private final long maxCopyFileSize; private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; + private FileSystem destinationFs; - public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { + public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); this.copyAsUser = distCpDoAsUser; + this.destinationFs = destinationFs; } // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(FileSystem destinationFs, Path destRoot, + public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException, HiveFatalException { Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); - for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { - FileSystem sourceFs = entry.getKey(); - Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); - for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); - boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList); - - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); - } + UserGroupInformation proxyUser = getProxyUser(); + try { + for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { + Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); + for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { + Path destination = destMapEntry.getKey(); + List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + boolean useRegularCopy = regularCopy(sourceFs, fileInfoList); + + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); + } - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy); + } + } + } finally { + if (proxyUser != null) { + FileSystem.closeAllForUGI(proxyUser); } } } private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList, - FileSystem destinationFs, Path destination, + Path destination, UserGroupInformation proxyUser, boolean useRegularCopy) throws IOException, LoginException, HiveFatalException { int repeat = 0; boolean isCopyError = false; List<Path> pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath); - while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { + while (!pathList.isEmpty() && (repeat < MAX_IO_RETRY)) { try { // if its retrying, first regenerate the path list. if (repeat > 0) { - pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError); + pathList = getFilesToRetry(sourceFs, srcFileList, destination, isCopyError); if (pathList.isEmpty()) { // all files were copied successfully in last try. So can break from here. break; @@ -113,7 +124,7 @@ public class CopyUtils { // if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry. isCopyError = true; - doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); + doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser); // if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry. isCopyError = false; @@ -121,7 +132,7 @@ public class CopyUtils { // If copy fails, fall through the retry logic LOG.info("file operation failed", e); - if (repeat >= (MAX_COPY_RETRY - 1)) { + if (repeat >= (MAX_IO_RETRY - 1)) { //no need to wait in the last iteration break; } @@ -136,7 +147,11 @@ public class CopyUtils { } // looks like some network outrage, reset the file system object and retry. - FileSystem.closeAllForUGI(Utils.getUGI()); + if (proxyUser == null) { + FileSystem.closeAllForUGI(Utils.getUGI()); + } else { + FileSystem.closeAllForUGI(proxyUser); + } sourceFs = pathList.get(0).getFileSystem(hiveConf); destinationFs = destination.getFileSystem(hiveConf); } @@ -155,7 +170,7 @@ public class CopyUtils { // If yes, then add to the retry list. If source file missing, then retry with CM path. if CM path // itself is missing, then throw error. private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList, - FileSystem destinationFs, Path destination, boolean isCopyError) + Path destination, boolean isCopyError) throws IOException, HiveFatalException { List<Path> pathList = new ArrayList<Path>(); @@ -238,23 +253,54 @@ public class CopyUtils { return false; } + private UserGroupInformation getProxyUser() throws LoginException, IOException { + if (copyAsUser == null) { + return null; + } + UserGroupInformation proxyUser = null; + int currentRetry = 0; + while (currentRetry <= MAX_IO_RETRY) { + try { + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + if (!currentUser.equals(copyAsUser)) { + proxyUser = UserGroupInformation.createProxyUser( + copyAsUser, UserGroupInformation.getLoginUser()); + } + return proxyUser; + } catch (IOException e) { + currentRetry++; + if (currentRetry <= MAX_IO_RETRY) { + LOG.warn("Unable to get UGI info", e); + } else { + LOG.error("Unable to get UGI info", e); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); + } + int sleepTime = FileUtils.getSleepTime(currentRetry); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("Sleep interrupted", timerEx.getMessage()); + } + } + } + return null; + } + // Copy without retry private void doCopyOnce(FileSystem sourceFs, List<Path> srcList, - FileSystem destinationFs, Path destination, - boolean useRegularCopy) throws IOException, LoginException { - UserGroupInformation ugi = Utils.getUGI(); - String currentUser = ugi.getShortUserName(); - boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser); - + Path destination, + boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException { if (useRegularCopy) { - doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser); + doRegularCopyOnce(sourceFs, srcList, destination, proxyUser); } else { - doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser); + doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser); } } private void doDistCpCopyOnce(FileSystem sourceFs, List<Path> srcList, Path destination, - boolean usePrivilegedUser) throws IOException { + UserGroupInformation proxyUser) throws IOException { if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { srcList = srcList.stream().map(path -> { URI uri = path.toUri(); @@ -271,7 +317,7 @@ public class CopyUtils { srcList, // list of source paths destination, false, - usePrivilegedUser ? copyAsUser : null, + proxyUser, hiveConf, ShimLoader.getHadoopShims())) { LOG.error("Distcp failed to copy files: " + srcList + " to destination: " + destination); @@ -279,17 +325,15 @@ public class CopyUtils { } } - private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, FileSystem destinationFs, - Path destination, boolean usePrivilegedUser) throws IOException { + private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, + Path destination, UserGroupInformation proxyUser) throws IOException { /* even for regular copy we have to use the same user permissions that distCp will use since hive-server user might be different that the super user required to copy relevant files. */ final Path[] paths = srcList.toArray(new Path[] {}); - if (usePrivilegedUser) { + if (proxyUser != null) { final Path finalDestination = destination; - UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( - copyAsUser, UserGroupInformation.getLoginUser()); try { proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> { FileUtil @@ -306,15 +350,21 @@ public class CopyUtils { public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException { Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths); - FileSystem destinationFs = destination.getFileSystem(hiveConf); - - for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { - final FileSystem sourceFs = entry.getKey(); - List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), - path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); - doCopyOnce(sourceFs, entry.getValue(), - destinationFs, destination, - regularCopy(destinationFs, sourceFs, fileList)); + + UserGroupInformation proxyUser = getProxyUser(); + try { + for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { + final FileSystem sourceFs = entry.getKey(); + List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), + path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); + doCopyOnce(sourceFs, entry.getValue(), + destination, + regularCopy(sourceFs, fileList), proxyUser); + } + } finally { + if (proxyUser != null) { + FileSystem.closeAllForUGI(proxyUser); + } } } @@ -325,7 +375,7 @@ public class CopyUtils { 3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. 4. number of files of all source Paths(can be directory / file) is less than configured size. */ - boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList) + boolean regularCopy(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList) throws IOException { if (hiveInTest) { return true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index e8eaae6..fc5419c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -103,7 +103,7 @@ public class FileOperations { srcPaths.add(fileStatus.getPath()); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); + new CopyUtils(distCpDoAsUser, hiveConf, toPath.getFileSystem(hiveConf)).doCopy(toPath, srcPaths); } private void copyMmPath() throws LoginException, IOException { @@ -135,7 +135,8 @@ public class FileOperations { } Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}", dirWithOriginals, exportRootDataDir); - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + new CopyUtils(distCpDoAsUser, hiveConf, exportRootDataDir.getFileSystem(hiveConf)). + doCopy(exportRootDataDir, srcPaths); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java index 7bd660b..610af09 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java @@ -65,7 +65,7 @@ public class TestCopyUtils { HiveConf conf = Mockito.spy(new HiveConf()); doReturn(1L).when(conf).getLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 32L * 1024 * 1024); - CopyUtils copyUtils = new CopyUtils("", conf); + CopyUtils copyUtils = new CopyUtils("", conf, null); long MB_128 = 128 * 1024 * 1024; assertFalse(copyUtils.limitReachedForLocalCopy(MB_128, 1L)); } @@ -76,7 +76,7 @@ public class TestCopyUtils { when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class)); HiveConf conf = Mockito.spy(new HiveConf()); - CopyUtils copyUtils = new CopyUtils("", conf); + CopyUtils copyUtils = new CopyUtils("", conf, null); long MB_16 = 16 * 1024 * 1024; assertFalse(copyUtils.limitReachedForLocalCopy(MB_16, 100L)); } @@ -88,7 +88,7 @@ public class TestCopyUtils { FileSystem fs = mock(FileSystem.class); List<Path> srcPaths = Arrays.asList(source, source); HiveConf conf = mock(HiveConf.class); - CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf)); + CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf, fs)); mockStatic(FileUtils.class); mockStatic(Utils.class); @@ -99,7 +99,7 @@ public class TestCopyUtils { same(ShimLoader.getHadoopShims()))) .thenReturn(false); when(Utils.getUGI()).thenReturn(mock(UserGroupInformation.class)); - doReturn(false).when(copyUtils).regularCopy(same(fs), same(fs), anyListOf(ReplChangeManager.FileInfo.class)); + doReturn(false).when(copyUtils).regularCopy(same(fs), anyListOf(ReplChangeManager.FileInfo.class)); copyUtils.doCopy(destination, srcPaths); } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e774419..9a1e590 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1142,9 +1142,8 @@ public class Hadoop23Shims extends HadoopShimsSecure { } @Override - public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException { - UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( - doAsUser, UserGroupInformation.getLoginUser()); + public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, + UserGroupInformation proxyUser) throws IOException { try { return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() { @Override diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index c569b24..49a2ab3 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -500,10 +500,11 @@ public interface HadoopShims { * @param srcPaths List of Path to the source files or directories to copy * @param dst Path to the destination file or directory * @param conf The hadoop configuration object - * @param doAsUser The user to perform the distcp as + * @param proxyUser The user to perform the distcp as * @return True if it is successfull; False otherwise. */ - public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException; + boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, UserGroupInformation proxyUser) + throws IOException; /** * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.