HIVE-19248: REPL LOAD couldn't copy file from source CM path and also doesn't throw error if file copy fails (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8fc0e67 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8fc0e67 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8fc0e67 Branch: refs/heads/branch-3 Commit: a8fc0e67183ed063bb2b3aee69a290ac734b3f51 Parents: 3102737 Author: Sankar Hariappan <sank...@apache.org> Authored: Thu May 10 00:04:52 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Thu May 10 00:04:52 2018 +0530 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 19 +-- .../hive/metastore/TestReplChangeManager.java | 54 +++--- .../hadoop/hive/ql/exec/ReplCopyTask.java | 12 +- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 168 ++++++++++++------- .../hive/ql/parse/repl/load/DumpMetaData.java | 1 - .../load/message/CreateFunctionHandler.java | 4 +- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 3 +- .../hadoop/hive/shims/TestHadoop23Shims.java | 24 +-- .../hive/metastore/ReplChangeManager.java | 94 ++++++----- .../hadoop/hive/metastore/utils/FileUtils.java | 64 ++++++- .../hadoop/hive/metastore/utils/HdfsUtils.java | 3 +- .../hive/metastore/utils/StringUtils.java | 24 ++- 12 files changed, 302 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 7835691..6321f9b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -426,7 +426,14 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public String next() { - String result = encodeFileUri(files.get(i), chksums != null? chksums.get(i) : null); + String result; + try { + result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null, null); + } catch (IOException e) { + // File operations failed + LOG.error("Encoding file URI failed with error " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } i++; return result; } @@ -788,14 +795,4 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener } } - - // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum as the format - private String encodeFileUri(String fileUriStr, String fileChecksum) { - if (fileChecksum != null) { - return fileUriStr + "#" + fileChecksum; - } else { - return fileUriStr; - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 6ade76d..e63250c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -175,19 +175,19 @@ public class TestReplChangeManager { // verify cm.recycle(db, table, part) api moves file to cmroot dir int ret = cm.recycle(part1Path, RecycleType.MOVE, false); Assert.assertEquals(ret, 1); - Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, part1Path.getName(), path1Chksum); + Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, part1Path.getName(), path1Chksum, cmroot.toString()); assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); // Verify dropPartition recycle part files client.dropPartition(dbName, tblName, Arrays.asList("20160102")); assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); - Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, part2Path.getName(), path2Chksum); + Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, part2Path.getName(), path2Chksum, cmroot.toString()); assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); // Verify dropTable recycle partition files client.dropTable(dbName, tblName); assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); - Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, part3Path.getName(), path3Chksum); + Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, part3Path.getName(), path3Chksum, cmroot.toString()); assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); client.dropDatabase(dbName, true, true); @@ -246,17 +246,17 @@ public class TestReplChangeManager { cm.recycle(filePath1, RecycleType.MOVE, false); assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); - Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, filePath1.getName(), fileChksum1); + Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, filePath1.getName(), fileChksum1, cmroot.toString()); assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); // Verify dropTable recycle table files client.dropTable(dbName, tblName); - Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, filePath2.getName(), fileChksum2); + Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, filePath2.getName(), fileChksum2,cmroot.toString()); assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); - Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, filePath3.getName(), fileChksum3); + Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, filePath3.getName(), fileChksum3, cmroot.toString()); assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); @@ -298,17 +298,21 @@ public class TestReplChangeManager { ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, RecycleType.MOVE, false); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, RecycleType.MOVE, true); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32))); - - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32), now - 86400*1000*2, now - 86400*1000*2); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()))); + + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString()), + now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString()), + now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString()), + now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()), + now - 86400*1000*2, now - 86400*1000*2); ReplChangeManager.scheduleCMClearer(hiveConf); @@ -321,12 +325,12 @@ public class TestReplChangeManager { if (end - start > 5000) { Assert.fail("timeout, cmroot has not been cleared"); } - if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11)) && - fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21)) && - fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32))) { + if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString())) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString())) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()))) { cleared = true; } } while (!cleared); @@ -335,8 +339,8 @@ public class TestReplChangeManager { @Test public void shouldIdentifyCMURIs() { assertTrue(ReplChangeManager - .isCMFileUri(new Path("hdfs://localhost:90000/somepath/adir/", "ab.jar#e239s2233"), fs)); + .isCMFileUri(new Path("hdfs://localhost:90000/somepath/adir/", "ab.jar#e239s2233"))); assertFalse(ReplChangeManager - .isCMFileUri(new Path("/somepath/adir/", "ab.jar"), fs)); + .isCMFileUri(new Path("/somepath/adir/", "ab.jar"))); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 4f38a5f..5fecf81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -80,10 +80,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // This should only be true for copy tasks created from functions, otherwise there should never // be a CM uri in the from path. - if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { - String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); + if (ReplChangeManager.isCMFileUri(fromPath)) { + String[] result = ReplChangeManager.decodeFileUri(fromPath.toString()); ReplChangeManager.FileInfo sourceInfo = ReplChangeManager - .getFileInfo(new Path(result[0]), result[1], result[2], conf); + .getFileInfo(new Path(result[0]), result[1], result[2], result[3], conf); if (FileUtils.copy( sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), dstFs, toPath, false, false, conf)) { @@ -187,14 +187,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { while ((line = br.readLine()) != null) { LOG.debug("ReplCopyTask :_filesReadLine: {}", line); - String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); + String[] fragments = ReplChangeManager.decodeFileUri(line); try { ReplChangeManager.FileInfo f = ReplChangeManager - .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], fileWithChksum[2], conf); + .getFileInfo(new Path(fragments[0]), fragments[1], fragments[2], fragments[3], conf); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception - LOG.warn("Cannot find {} in source repo or cmroot", fileWithChksum[0]); + LOG.warn("Cannot find {} in source repo or cmroot", fragments[0]); throw new IOException(e.getMessage()); } // Note - we need srcFs rather than fs, because it is possible that the _files lists files http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 529ea21..2557121 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -47,7 +47,7 @@ public class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/"; - private static final int MAX_COPY_RETRY = 3; + private static final int MAX_COPY_RETRY = 5; private final HiveConf hiveConf; private final long maxCopyFileSize; @@ -82,71 +82,126 @@ public class CopyUtils { LOG.error("Failed to create destination directory: " + destination); throw new IOException("Destination directory creation failed"); } - doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); - // Verify checksum, retry if checksum changed - List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>(); - for (ReplChangeManager.FileInfo srcFile : srcFiles) { - if (!srcFile.isUseSourcePath()) { - // If already use cmpath, nothing we can do here, skip this file - continue; - } - String sourceChecksumString = srcFile.getCheckSum(); - if (sourceChecksumString != null) { - String verifySourceChecksumString; - try { - verifySourceChecksumString - = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); - } catch (IOException e) { - // Retry with CM path - verifySourceChecksumString = null; - } - if ((verifySourceChecksumString == null) - || !sourceChecksumString.equals(verifySourceChecksumString)) { - // If checksum does not match, likely the file is changed/removed, copy again from cm - srcFile.setIsUseSourcePath(false); - retryFileInfoList.add(srcFile); - } - } - } - if (!retryFileInfoList.isEmpty()) { - doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy); - } + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); } } } - private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList, + private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList, FileSystem destinationFs, Path destination, boolean useRegularCopy) throws IOException, LoginException { int repeat = 0; - List<Path> pathList = Lists.transform(fileList, ReplChangeManager.FileInfo::getEffectivePath); + boolean isCopyError = false; + List<Path> pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath); while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { + LOG.info("Attempt: " + (repeat+1) + ". Copying files: " + pathList); try { + isCopyError = false; doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); - return; } catch (IOException e) { - pathList = new ArrayList<>(); + // If copy fails, fall through the retry logic + isCopyError = true; + } + pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError); + repeat++; + } + + // If still files remains to be copied due to failure/checksum mismatch after several attempts, then throw error + if (!pathList.isEmpty()) { + LOG.error("File copy failed even after several attempts. Files list: " + srcFileList); + throw new IOException("File copy failed even after several attempts."); + } + } + + // Traverse through all the source files and see if any file is not copied or partially copied. + // If yes, then add to the retry list. If source file missing, then retry with CM path. if CM path + // itself is missing, then throw error. + private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList, + FileSystem destinationFs, Path destination, boolean isCopyError) + throws IOException { + List<Path> pathList = new ArrayList<Path>(); + + // Going through file list and make the retry list + for (ReplChangeManager.FileInfo srcFile : srcFileList) { + if (srcFile.isCopyDone()) { + // If already copied successfully, ignore it. + continue; + } + Path srcPath = srcFile.getEffectivePath(); + Path destPath = new Path(destination, srcPath.getName()); + if (destinationFs.exists(destPath)) { + // If destination file is present and checksum of source mismatch, then retry copy. + if (isSourceFileMismatch(sourceFs, srcFile)) { + // Delete the incorrectly copied file and retry with CM path + destinationFs.delete(destPath, true); + srcFile.setIsUseSourcePath(false); + } else { + // If the retry logic is reached after copy error, then include the copied file as well. + // This is needed as we cannot figure out which file is incorrectly copied. + // Expecting distcp to skip the properly copied file based on CRC check or copy it if CRC mismatch. - // Going through file list, retry with CM if applicable - for (ReplChangeManager.FileInfo file : fileList) { - Path copyPath = file.getEffectivePath(); - if (!destinationFs.exists(new Path(destination, copyPath.getName()))) { - if (!sourceFs.exists(copyPath)) { - if (file.isUseSourcePath()) { - // Source file missing, then try with CM path - file.setIsUseSourcePath(false); - } else { - // CM path itself is missing, so, cannot recover from this error - throw e; - } - } - pathList.add(file.getEffectivePath()); + if (!isCopyError) { + // File is successfully copied, just skip this file from retry. + srcFile.setCopyDone(true); + continue; + } + } + } else { + // If destination file is missing, then retry copy + if (sourceFs.exists(srcPath)) { + // If checksum does not match, likely the file is changed/removed, retry from CM path + if (isSourceFileMismatch(sourceFs, srcFile)) { + srcFile.setIsUseSourcePath(false); + } + } else { + if (srcFile.isUseSourcePath()) { + // Source file missing, then try with CM path + srcFile.setIsUseSourcePath(false); + } else { + // CM path itself is missing, cannot recover from this error + LOG.error("File Copy Failed. Both source and CM files are missing from source. " + + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " + + "Try setting higher value for hive.repl.cm.retain in source warehouse. " + + "Also, bootstrap the system again to get back the consistent replicated state."); + throw new IOException("Both source and CM path are missing from source."); } } } - repeat++; + srcPath = srcFile.getEffectivePath(); + if (null == srcPath) { + // This case possible if CM path is not enabled. + LOG.error("File copy failed and likely source file is deleted or modified. " + + "Source File: " + srcFile.getSourcePath()); + throw new IOException("File copy failed and likely source file is deleted or modified."); + } + pathList.add(srcPath); } + return pathList; + } + + // Check if the source file unmodified even after copy to see if we copied the right file + private boolean isSourceFileMismatch(FileSystem sourceFs, ReplChangeManager.FileInfo srcFile) { + // If source is already CM path, the checksum will be always matching + if (srcFile.isUseSourcePath()) { + String sourceChecksumString = srcFile.getCheckSum(); + if (sourceChecksumString != null) { + String verifySourceChecksumString; + try { + verifySourceChecksumString + = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); + } catch (IOException e) { + // Retry with CM path + LOG.debug("Unable to calculate checksum for source file: " + srcFile.getSourcePath()); + return true; + } + if (!sourceChecksumString.equals(verifySourceChecksumString)) { + return true; + } + } + } + return false; } // Copy without retry @@ -175,7 +230,6 @@ public class CopyUtils { URI destinationUri = destination.toUri(); destination = new Path(destinationUri.getScheme(), destinationUri.getAuthority(), RAW_RESERVED_VIRTUAL_PATH + destinationUri.getPath()); - hiveConf.set("distcp.options.px",""); } FileUtils.distCp( @@ -255,16 +309,18 @@ public class CopyUtils { try { contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath()); } catch (IOException e) { - // in replication, if source file does not exist, try cmroot + // In replication, if source file does not exist, try cmroot if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) { contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath()); fileInfo.setIsUseSourcePath(false); } } - size += contentSummary.getLength(); - numberOfFiles += contentSummary.getFileCount(); - if (limitReachedForLocalCopy(size, numberOfFiles)) { - return false; + if (contentSummary != null) { + size += contentSummary.getLength(); + numberOfFiles += contentSummary.getFileCount(); + if (limitReachedForLocalCopy(size, numberOfFiles)) { + return false; + } } } return true; @@ -289,7 +345,7 @@ public class CopyUtils { for (Path path : srcPaths) { FileSystem fileSystem = path.getFileSystem(hiveConf); if (!result.containsKey(fileSystem)) { - result.put(fileSystem, new ArrayList<Path>()); + result.put(fileSystem, new ArrayList<>()); } result.get(fileSystem).add(path); } http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index 9dfd7cd..974e105 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -78,7 +78,6 @@ public class DumpMetaData { Long.valueOf(lineContents[2]), new Path(lineContents[3])); setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); - ReplChangeManager.setCmRoot(cmRoot); } else { throw new IOException( "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index a3331b4..3a32885 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -181,14 +181,14 @@ public class CreateFunctionHandler extends AbstractMessageHandler { ResourceUri destinationResourceUri(ResourceUri resourceUri) throws IOException, SemanticException { String sourceUri = resourceUri.getUri(); - String[] split = sourceUri.split(Path.SEPARATOR); + String[] split = ReplChangeManager.decodeFileUri(sourceUri)[0].split(Path.SEPARATOR); PathBuilder pathBuilder = new PathBuilder(functionsRootDir); Path qualifiedDestinationPath = PathBuilder.fullyQualifiedHDFSUri( pathBuilder .addDescendant(destinationDbName.toLowerCase()) .addDescendant(metadata.function.getFunctionName().toLowerCase()) .addDescendant(String.valueOf(System.nanoTime())) - .addDescendant(ReplChangeManager.getFileWithChksumFromURI(split[split.length - 1])[0]) + .addDescendant(split[split.length - 1]) .build(), new Path(functionsRootDir).getFileSystem(context.hiveConf) ); http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 285d616..ec06a88 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1100,8 +1100,7 @@ public class Hadoop23Shims extends HadoopShimsSecure { if (params.size() == 0){ // if no entries were added via conf, we initiate our defaults params.add("-update"); - params.add("-skipcrccheck"); - params.add("-pb"); + params.add("-pbx"); } for (Path src : srcPaths) { params.add(src.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java index 108289c..9a9311b 100644 --- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -20,23 +20,14 @@ package org.apache.hadoop.hive.shims; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.tools.DistCpOptions; import org.junit.Test; -import java.io.IOException; import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; + public class TestHadoop23Shims { @@ -49,12 +40,11 @@ public class TestHadoop23Shims { Hadoop23Shims shims = new Hadoop23Shims(); List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); - assertEquals(5, paramsDefault.size()); + assertEquals(4, paramsDefault.size()); assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); - assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck")); - assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb")); - assertEquals(copySrc.toString(), paramsDefault.get(3)); - assertEquals(copyDst.toString(), paramsDefault.get(4)); + assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx")); + assertEquals(copySrc.toString(), paramsDefault.get(2)); + assertEquals(copyDst.toString(), paramsDefault.get(3)); conf.set("distcp.options.foo", "bar"); // should set "-foo bar" conf.set("distcp.options.blah", ""); // should set "-blah" @@ -69,8 +59,8 @@ public class TestHadoop23Shims { !paramsWithCustomParamInjection.contains("-update")); assertTrue("Distcp -skipcrccheck not set if not requested", !paramsWithCustomParamInjection.contains("-skipcrccheck")); - assertTrue("Distcp -pb not set if not requested", - !paramsWithCustomParamInjection.contains("-pb")); + assertTrue("Distcp -pbx not set if not requested", + !paramsWithCustomParamInjection.contains("-pbx")); // the "-foo bar" and "-blah" params order is not guaranteed String firstParam = paramsWithCustomParamInjection.get(0); http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index abe1226..79ba7ff 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,6 @@ public class ReplChangeManager { private static Configuration conf; private String msUser; private String msGroup; - private FileSystem fs; private static final String ORIG_LOC_TAG = "user.original-loc"; static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; @@ -70,6 +69,7 @@ public class ReplChangeManager { private String checkSum; private boolean useSourcePath; private String subDir; + private boolean copyDone; public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) { this(srcFs, sourcePath, null, null, true, subDir); @@ -82,6 +82,7 @@ public class ReplChangeManager { this.checkSum = checkSum; this.useSourcePath = useSourcePath; this.subDir = subDir; + this.copyDone = false; } public FileSystem getSrcFs() { return srcFs; @@ -104,6 +105,12 @@ public class ReplChangeManager { public String getSubDir() { return subDir; } + public boolean isCopyDone() { + return copyDone; + } + public void setCopyDone(boolean copyDone) { + this.copyDone = copyDone; + } public Path getEffectivePath() { if (useSourcePath) { return sourcePath; @@ -128,11 +135,11 @@ public class ReplChangeManager { ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); ReplChangeManager.conf = conf; - fs = cmroot.getFileSystem(conf); + FileSystem cmFs = cmroot.getFileSystem(conf); // Create cmroot with permission 700 if not exist - if (!fs.exists(cmroot)) { - fs.mkdirs(cmroot); - fs.setPermission(cmroot, new FsPermission("700")); + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); } UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); @@ -173,6 +180,7 @@ public class ReplChangeManager { } int count = 0; + FileSystem fs = path.getFileSystem(conf); if (fs.isDirectory(path)) { FileStatus[] files = fs.listStatus(path, hiddenFileFilter); for (FileStatus file : files) { @@ -180,7 +188,7 @@ public class ReplChangeManager { } } else { String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting @@ -198,17 +206,15 @@ public class ReplChangeManager { } else { switch (type) { case MOVE: { - if (LOG.isDebugEnabled()) { - LOG.debug("Moving {} to {}", path.toString(), cmPath.toString()); - } + LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); + // Rename fails if the file with same name already exist. success = fs.rename(path, cmPath); break; } case COPY: { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying {} to {}", path.toString(), cmPath.toString()); - } + LOG.info("Copying {} to {}", path.toString(), cmPath.toString()); + // It is possible to have a file with same checksum in cmPath but the content is // partially copied or corrupted. In this case, just overwrite the existing file with // new one. @@ -273,20 +279,17 @@ public class ReplChangeManager { return checksumString; } - static public void setCmRoot(Path cmRoot) { - ReplChangeManager.cmroot = cmRoot; - } - /*** * Convert a path of file inside a partition or table (if non-partitioned) * to a deterministic location of cmroot. So user can retrieve the file back * with the original location plus checksum. - * @param conf + * @param conf Hive configuration * @param name original filename * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} + * @param cmRootUri CM Root URI. (From remote source if REPL LOAD flow. From local config if recycle.) * @return Path */ - static Path getCMPath(Configuration conf, String name, String checkSum) { + static Path getCMPath(Configuration conf, String name, String checkSum, String cmRootUri) { String newFileName = name + "_" + checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); @@ -295,7 +298,7 @@ public class ReplChangeManager { newFileName = newFileName.substring(0, maxLength-1); } - return new Path(cmroot, newFileName); + return new Path(cmRootUri, newFileName); } /*** @@ -303,19 +306,20 @@ public class ReplChangeManager { * matches, return the file; otherwise, use chksumString to retrieve it from cmroot * @param src Original file location * @param checksumString Checksum of the original file + * @param srcCMRootURI CM root URI of the source cluster * @param subDir Sub directory to which the source file belongs to - * @param conf + * @param conf Hive configuration * @return Corresponding FileInfo object */ - public static FileInfo getFileInfo(Path src, String checksumString, String subDir, Configuration conf) - throws MetaException { + public static FileInfo getFileInfo(Path src, String checksumString, String srcCMRootURI, String subDir, + Configuration conf) throws MetaException { try { FileSystem srcFs = src.getFileSystem(conf); if (checksumString == null) { return new FileInfo(srcFs, src, subDir); } - Path cmPath = getCMPath(conf, src.getName(), checksumString); + Path cmPath = getCMPath(conf, src.getName(), checksumString, srcCMRootURI); if (!srcFs.exists(src)) { return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); } @@ -338,7 +342,7 @@ public class ReplChangeManager { } /*** - * Concatenate filename, checksum and subdirectory with "#" + * Concatenate filename, checksum, source cmroot uri and subdirectory with "#" * @param fileUriStr Filename string * @param fileChecksum Checksum string * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means, @@ -346,38 +350,50 @@ public class ReplChangeManager { * @return Concatenated Uri string */ // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum#subdirs as the format - public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) { + // Currently using fileuri#checksum#cmrooturi#subdirs as the format + public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) + throws IOException { String encodedUri = fileUriStr; - if (fileChecksum != null) { - encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum; + if ((fileChecksum != null) && (cmroot != null)) { + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); + } else { + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; } - if (encodedSubDir != null) { - encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + encodedSubDir; + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != null) ? encodedSubDir : ""); + if (LOG.isDebugEnabled()) { + LOG.debug("Encoded URI: " + encodedUri); } return encodedUri; } /*** - * Split uri with fragment into file uri and checksum + * Split uri with fragment into file uri, subdirs, checksum and source cmroot uri. + * Currently using fileuri#checksum#cmrooturi#subdirs as the format. * @param fileURIStr uri with fragment - * @return array of file name and checksum + * @return array of file name, subdirs, checksum and source CM root URI */ - static public String[] getFileWithChksumFromURI(String fileURIStr) { + public static String[] decodeFileUri(String fileURIStr) { String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); - String[] result = new String[3]; + String[] result = new String[4]; result[0] = uriAndFragment[0]; - if (uriAndFragment.length>1) { + if ((uriAndFragment.length > 1) && !StringUtils.isEmpty(uriAndFragment[1])) { result[1] = uriAndFragment[1]; } - if (uriAndFragment.length>2) { + if ((uriAndFragment.length > 2) && !StringUtils.isEmpty(uriAndFragment[2])) { result[2] = uriAndFragment[2]; } + if ((uriAndFragment.length > 3) && !StringUtils.isEmpty(uriAndFragment[3])) { + result[3] = uriAndFragment[3]; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Reading Encoded URI: " + result[0] + ":: " + result[1] + ":: " + result[2] + ":: " + result[3]); + } return result; } - public static boolean isCMFileUri(Path fromPath, FileSystem srcFs) { - String[] result = getFileWithChksumFromURI(fromPath.toString()); + public static boolean isCMFileUri(Path fromPath) { + String[] result = decodeFileUri(fromPath.toString()); return result[1] != null; } http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index 4138fa5..ec9e9e2 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -171,9 +172,9 @@ public class FileUtils { * Rename a file. Unlike {@link FileSystem#rename(Path, Path)}, if the destPath already exists * and is a directory, this will NOT move the sourcePath into it. It will throw an IOException * instead. - * @param srcfs file system src paths are on - * @param destfs file system dest paths are on - * @param sourcePath source file or directory to move + * @param srcFs file system src paths are on + * @param destFs file system dest paths are on + * @param srcPath source file or directory to move * @param destPath destination file name. This must be a file and not an existing directory. * @return result of fs.rename. * @throws IOException if fs.rename throws it, or if destPath already exists. @@ -440,7 +441,7 @@ public class FileUtils { * Utility method that determines if a specified directory already has * contents (non-hidden files) or not - useful to determine if an * immutable table already has contents, for example. - * + * @param fs * @param path * @throws IOException */ @@ -454,4 +455,59 @@ public class FileUtils { } return true; } + + /** + * Variant of Path.makeQualified that qualifies the input path against the default file system + * indicated by the configuration + * + * This does not require a FileSystem handle in most cases - only requires the Filesystem URI. + * This saves the cost of opening the Filesystem - which can involve RPCs - as well as cause + * errors + * + * @param path + * path to be fully qualified + * @param conf + * Configuration file + * @return path qualified relative to default file system + */ + public static Path makeQualified(Path path, Configuration conf) throws IOException { + + if (!path.isAbsolute()) { + // in this case we need to get the working directory + // and this requires a FileSystem handle. So revert to + // original method. + FileSystem fs = FileSystem.get(conf); + return path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + URI fsUri = FileSystem.getDefaultUri(conf); + URI pathUri = path.toUri(); + + String scheme = pathUri.getScheme(); + String authority = pathUri.getAuthority(); + + // validate/fill-in scheme and authority. this follows logic + // identical to FileSystem.get(URI, conf) - but doesn't actually + // obtain a file system handle + + if (scheme == null) { + // no scheme - use default file system uri + scheme = fsUri.getScheme(); + authority = fsUri.getAuthority(); + if (authority == null) { + authority = ""; + } + } else { + if (authority == null) { + // no authority - use default one if it applies + if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) { + authority = fsUri.getAuthority(); + } else { + authority = ""; + } + } + } + + return new Path(scheme, authority, pathUri.getPath()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java index 6a76de5..2122788 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java @@ -205,8 +205,7 @@ public class HdfsUtils { if (params.size() == 0){ // if no entries were added via conf, we initiate our defaults params.add("-update"); - params.add("-skipcrccheck"); - params.add("-pb"); + params.add("-pbx"); } for (Path src : srcPaths) { params.add(src.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/a8fc0e67/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringUtils.java index 4449799..e49a423 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringUtils.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hive.metastore.utils; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; - import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; @@ -109,4 +106,25 @@ public class StringUtils { wrt.close(); return stm.toString(); } + + /** + * Given an array of bytes it will convert the bytes to a hex string + * representation of the bytes. + * @param bytes Input bytes + * @param start start index, inclusively + * @param end end index, exclusively + * @return hex string representation of the byte array + */ + public static String byteToHexString(byte[] bytes, int start, int end) { + return org.apache.hadoop.util.StringUtils.byteToHexString(bytes, start, end); + } + + /** + * Checks if the input string/char sequence is empty + * @param cs Input char sequence + * @return true if empty and false if not + */ + public static boolean isEmpty(CharSequence cs) { + return cs == null || cs.length() == 0; + } }