Repository: hive Updated Branches: refs/heads/master 2948c160f -> b1ca2a5e3
HIVE-16898 : alidation of source file after distcp in repl load (Daniel Dai, Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1ca2a5e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1ca2a5e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1ca2a5e Branch: refs/heads/master Commit: b1ca2a5e35895f83beb13344159f5ade2545fc8e Parents: 2948c16 Author: Thejas M Nair <the...@hortonworks.com> Authored: Sun Oct 1 23:16:40 2017 -0700 Committer: Thejas M Nair <the...@hortonworks.com> Committed: Sun Oct 1 23:16:40 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/ReplCopyTask.java | 33 ++-- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 161 ++++++++++++++++--- .../hive/metastore/ReplChangeManager.java | 73 +++++++-- 3 files changed, 219 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 54746d3..6e722f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -76,11 +76,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // be a CM uri in the from path. if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); - Path sourcePath = ReplChangeManager - .getFileStatus(new Path(result[0]), result[1], conf) - .getPath(); + ReplChangeManager.FileInfo sourceInfo = ReplChangeManager + .getFileInfo(new Path(result[0]), result[1], conf); if (FileUtils.copy( - sourcePath.getFileSystem(conf), sourcePath, + sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), dstFs, toPath, false, false, conf)) { return 0; } else { @@ -90,13 +89,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - List<Path> srcPaths = new ArrayList<>(); + List<ReplChangeManager.FileInfo> srcFiles = new ArrayList<>(); if (rwork.readSrcAsFilesList()) { // This flow is usually taken for REPL LOAD // Our input is the result of a _files listing, we should expand out _files. - srcPaths = filesInFileListing(srcFs, fromPath); - LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" : srcPaths.size())); - if ((srcPaths == null) || (srcPaths.isEmpty())) { + srcFiles = filesInFileListing(srcFs, fromPath); + LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size())); + if ((srcFiles == null) || (srcFiles.isEmpty())) { if (work.isErrorOnSrcEmpty()) { console.printError("No _files entry found on source: " + fromPath.toString()); return 5; @@ -120,17 +119,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { for (FileStatus oneSrc : srcs) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); - srcPaths.add(oneSrc.getPath()); + srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), + oneSrc.getPath())); } } - LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size()); + LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths); + new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, srcFiles); return 0; } catch (Exception e) { @@ -140,7 +140,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - private List<Path> filesInFileListing(FileSystem fs, Path dataPath) + private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); @@ -150,19 +150,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // On success, but with nothing to return, we can return an empty list. } - List<Path> filePaths = new ArrayList<>(); + List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing))); // TODO : verify if skipping charset here is okay String line = null; - while ( (line = br.readLine()) != null){ + while ((line = br.readLine()) != null) { LOG.debug("ReplCopyTask :_filesReadLine:" + line); String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); try { - Path f = ReplChangeManager - .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], conf) - .getPath(); + ReplChangeManager.FileInfo f = ReplChangeManager + .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index a022b5d..71cdbde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -24,12 +24,15 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import javax.security.auth.login.LoginException; import java.io.IOException; import java.util.ArrayList; @@ -46,6 +49,7 @@ public class CopyUtils { private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; + private final int MAX_COPY_RETRY = 3; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -55,29 +59,121 @@ public class CopyUtils { this.copyAsUser = distCpDoAsUser; } - public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException { - Map<FileSystem, List<Path>> map = fsToFileMap(srcPaths); + // Used by replication, copy files from source to destination. It is possible source file is + // changed/removed during copy, so double check the checksum after copy, + // if not match, copy again from cm + public void copyAndVerify(Path destination, List<ReplChangeManager.FileInfo> srcFiles) + throws IOException, LoginException { + Map<FileSystem, List<ReplChangeManager.FileInfo>> map = fsToFileMap(srcFiles); FileSystem destinationFs = destination.getFileSystem(hiveConf); + for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : map.entrySet()) { + FileSystem sourceFs = entry.getKey(); + List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue(); + boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList); + + doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); + + // Verify checksum, retry if checksum changed + List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>(); + for (ReplChangeManager.FileInfo srcFile : srcFiles) { + if(!srcFile.isUseSourcePath()) { + // If already use cmpath, nothing we can do here, skip this file + continue; + } + String sourceChecksumString = srcFile.getCheckSum(); + if (sourceChecksumString != null) { + String verifySourceChecksumString; + try { + verifySourceChecksumString + = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); + } catch (IOException e) { + // Retry with CM path + verifySourceChecksumString = null; + } + if ((verifySourceChecksumString == null) + || !sourceChecksumString.equals(verifySourceChecksumString)) { + // If checksum does not match, likely the file is changed/removed, copy again from cm + srcFile.setIsUseSourcePath(false); + retryFileInfoList.add(srcFile); + } + } + } + if (!retryFileInfoList.isEmpty()) { + doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy); + } + } + } + + private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList, + FileSystem destinationFs, Path destination, + boolean useRegularCopy) throws IOException, LoginException { + int repeat = 0; + List<Path> pathList = Lists.transform(fileList, + fileInfo -> { return fileInfo.getEffectivePath(); }); + while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { + try { + doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); + return; + } catch (IOException e) { + pathList = new ArrayList<>(); + + // Going through file list, retry with CM if applicable + for (ReplChangeManager.FileInfo file : fileList) { + Path copyPath = file.getEffectivePath(); + if (!destinationFs.exists(new Path(destination, copyPath.getName()))) { + if (!sourceFs.exists(copyPath)) { + if (file.isUseSourcePath()) { + // Source file missing, then try with CM path + file.setIsUseSourcePath(false); + } else { + // CM path itself is missing, so, cannot recover from this error + throw e; + } + } + pathList.add(file.getEffectivePath()); + } + } + } + repeat++; + } + } + + // Copy without retry + private void doCopyOnce(FileSystem sourceFs, List<Path> srcList, + FileSystem destinationFs, Path destination, + boolean useRegularCopy) throws IOException, LoginException { UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser); + if (useRegularCopy) { + Path[] paths = srcList.toArray(new Path[] {}); + FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); + } else { + FileUtils.distCp( + sourceFs, // source file system + srcList, // list of source paths + destination, + false, + usePrivilegedDistCp ? copyAsUser : null, + hiveConf, + ShimLoader.getHadoopShims() + ); + } + } + + public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException { + Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths); + FileSystem destinationFs = destination.getFileSystem(hiveConf); + for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { - if (regularCopy(destinationFs, entry)) { - Path[] paths = entry.getValue().toArray(new Path[] {}); - FileUtil.copy(entry.getKey(), paths, destinationFs, destination, false, true, hiveConf); - } else { - FileUtils.distCp( - entry.getKey(), // source file system - entry.getValue(), // list of source paths - destination, - false, - usePrivilegedDistCp ? copyAsUser : null, - hiveConf, - ShimLoader.getHadoopShims() - ); - } + final FileSystem sourceFs = entry.getKey(); + List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), + path -> { return new ReplChangeManager.FileInfo(sourceFs, path);}); + doCopyOnce(sourceFs, entry.getValue(), + destinationFs, destination, + regularCopy(destinationFs, sourceFs, fileList)); } } @@ -88,12 +184,11 @@ public class CopyUtils { 3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. 4. number of files of all source Paths(can be directory / file) is less than configured size. */ - private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, List<Path>> entry) + private boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList) throws IOException { if (hiveInTest) { return true; } - FileSystem sourceFs = entry.getKey(); if (isLocal(sourceFs) || isLocal(destinationFs)) { return true; } @@ -104,8 +199,17 @@ public class CopyUtils { long size = 0; long numberOfFiles = 0; - for (Path path : entry.getValue()) { - ContentSummary contentSummary = sourceFs.getContentSummary(path); + for (ReplChangeManager.FileInfo fileInfo : fileList) { + ContentSummary contentSummary = null; + try { + contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath()); + } catch (IOException e) { + // in replication, if source file does not exist, try cmroot + if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) { + contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath()); + fileInfo.setIsUseSourcePath(false); + } + } size += contentSummary.getLength(); numberOfFiles += contentSummary.getFileCount(); if (limitReachedForLocalCopy(size, numberOfFiles)) { @@ -129,15 +233,28 @@ public class CopyUtils { return fs.getScheme().equals("file"); } - private Map<FileSystem, List<Path>> fsToFileMap(List<Path> srcPaths) throws IOException { + private Map<FileSystem, List<Path>> fsToPathMap(List<Path> srcPaths) throws IOException { Map<FileSystem, List<Path>> result = new HashMap<>(); for (Path path : srcPaths) { FileSystem fileSystem = path.getFileSystem(hiveConf); if (!result.containsKey(fileSystem)) { - result.put(fileSystem, new ArrayList<>()); + result.put(fileSystem, new ArrayList<Path>()); } result.get(fileSystem).add(path); } return result; } + + private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap( + List<ReplChangeManager.FileInfo> srcFiles) throws IOException { + Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>(); + for (ReplChangeManager.FileInfo file : srcFiles) { + FileSystem fileSystem = file.getSrcFs(); + if (!result.containsKey(fileSystem)) { + result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>()); + } + result.get(fileSystem).add(file); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index dd9296a..95fa0a9 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -63,6 +63,54 @@ public class ReplChangeManager { COPY } + public static class FileInfo { + FileSystem srcFs; + Path sourcePath; + Path cmPath; + String checkSum; + boolean useSourcePath; + + public FileInfo(FileSystem srcFs, Path sourcePath) { + this.srcFs = srcFs; + this.sourcePath = sourcePath; + this.cmPath = null; + this.checkSum = null; + this.useSourcePath = true; + } + public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) { + this.srcFs = srcFs; + this.sourcePath = sourcePath; + this.cmPath = cmPath; + this.checkSum = checkSum; + this.useSourcePath = useSourcePath; + } + public FileSystem getSrcFs() { + return srcFs; + } + public Path getSourcePath() { + return sourcePath; + } + public Path getCmPath() { + return cmPath; + } + public String getCheckSum() { + return checkSum; + } + public boolean isUseSourcePath() { + return useSourcePath; + } + public void setIsUseSourcePath(boolean useSourcePath) { + this.useSourcePath = useSourcePath; + } + public Path getEffectivePath() { + if (useSourcePath) { + return sourcePath; + } else { + return cmPath; + } + } + } + public static ReplChangeManager getInstance(Configuration conf) throws MetaException { if (instance == null) { instance = new ReplChangeManager(conf); @@ -259,25 +307,32 @@ public class ReplChangeManager { * @param src Original file location * @param checksumString Checksum of the original file * @param conf - * @return Corresponding FileStatus object + * @return Corresponding FileInfo object */ - static public FileStatus getFileStatus(Path src, String checksumString, - Configuration conf) throws MetaException { + public static FileInfo getFileInfo(Path src, String checksumString, Configuration conf) + throws MetaException { try { FileSystem srcFs = src.getFileSystem(conf); if (checksumString == null) { - return srcFs.getFileStatus(src); + return new FileInfo(srcFs, src); } + Path cmPath = getCMPath(conf, src.getName(), checksumString); if (!srcFs.exists(src)) { - return srcFs.getFileStatus(getCMPath(conf, src.getName(), checksumString)); + return new FileInfo(srcFs, src, cmPath, checksumString, false); } - String currentChecksumString = checksumFor(src, srcFs); - if (currentChecksumString == null || checksumString.equals(currentChecksumString)) { - return srcFs.getFileStatus(src); + String currentChecksumString; + try { + currentChecksumString = checksumFor(src, srcFs); + } catch (IOException ex) { + // If the file is missing or getting modified, then refer CM path + return new FileInfo(srcFs, src, cmPath, checksumString, false); + } + if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) { + return new FileInfo(srcFs, src, cmPath, checksumString, true); } else { - return srcFs.getFileStatus(getCMPath(conf, src.getName(), checksumString)); + return new FileInfo(srcFs, src, cmPath, checksumString, false); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e));