Repository: hive
Updated Branches:
  refs/heads/master 2948c160f -> b1ca2a5e3


HIVE-16898 : alidation of source file after distcp in repl load  (Daniel Dai, 
Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1ca2a5e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1ca2a5e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1ca2a5e

Branch: refs/heads/master
Commit: b1ca2a5e35895f83beb13344159f5ade2545fc8e
Parents: 2948c16
Author: Thejas M Nair <the...@hortonworks.com>
Authored: Sun Oct 1 23:16:40 2017 -0700
Committer: Thejas M Nair <the...@hortonworks.com>
Committed: Sun Oct 1 23:16:40 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |  33 ++--
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    | 161 ++++++++++++++++---
 .../hive/metastore/ReplChangeManager.java       |  73 +++++++--
 3 files changed, 219 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 54746d3..6e722f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -76,11 +76,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // be a CM uri in the from path.
       if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) {
         String[] result = 
ReplChangeManager.getFileWithChksumFromURI(fromPath.toString());
-        Path sourcePath = ReplChangeManager
-            .getFileStatus(new Path(result[0]), result[1], conf)
-            .getPath();
+        ReplChangeManager.FileInfo sourceInfo = ReplChangeManager
+            .getFileInfo(new Path(result[0]), result[1], conf);
         if (FileUtils.copy(
-            sourcePath.getFileSystem(conf), sourcePath,
+            sourceInfo.getSrcFs(), sourceInfo.getSourcePath(),
             dstFs, toPath, false, false, conf)) {
           return 0;
         } else {
@@ -90,13 +89,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         }
       }
 
-      List<Path> srcPaths = new ArrayList<>();
+      List<ReplChangeManager.FileInfo> srcFiles = new ArrayList<>();
       if (rwork.readSrcAsFilesList()) {
         // This flow is usually taken for REPL LOAD
         // Our input is the result of a _files listing, we should expand out 
_files.
-        srcPaths = filesInFileListing(srcFs, fromPath);
-        LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" 
: srcPaths.size()));
-        if ((srcPaths == null) || (srcPaths.isEmpty())) {
+        srcFiles = filesInFileListing(srcFs, fromPath);
+        LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" 
: srcFiles.size()));
+        if ((srcFiles == null) || (srcFiles.isEmpty())) {
           if (work.isErrorOnSrcEmpty()) {
             console.printError("No _files entry found on source: " + 
fromPath.toString());
             return 5;
@@ -120,17 +119,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         for (FileStatus oneSrc : srcs) {
           console.printInfo("Copying file: " + oneSrc.getPath().toString());
           LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
-          srcPaths.add(oneSrc.getPath());
+          srcFiles.add(new 
ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
+                                                      oneSrc.getPath()));
         }
       }
 
-      LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size());
+      LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size());
       if (!FileUtils.mkdir(dstFs, toPath, conf)) {
         console.printError("Cannot make target directory: " + 
toPath.toString());
         return 2;
       }
       // Copy the files from different source file systems to one destination 
directory
-      new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths);
+      new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, 
srcFiles);
 
       return 0;
     } catch (Exception e) {
@@ -140,7 +140,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
     }
   }
 
-  private List<Path> filesInFileListing(FileSystem fs, Path dataPath)
+  private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, 
Path dataPath)
       throws IOException {
     Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
     LOG.debug("ReplCopyTask filesInFileListing() reading " + 
fileListing.toUri());
@@ -150,19 +150,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // On success, but with nothing to return, we can return an empty list.
     }
 
-    List<Path> filePaths = new ArrayList<>();
+    List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
     BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(fileListing)));
     // TODO : verify if skipping charset here is okay
 
     String line = null;
-    while ( (line = br.readLine()) != null){
+    while ((line = br.readLine()) != null) {
       LOG.debug("ReplCopyTask :_filesReadLine:" + line);
 
       String[] fileWithChksum = 
ReplChangeManager.getFileWithChksumFromURI(line);
       try {
-        Path f = ReplChangeManager
-                .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], 
conf)
-                .getPath();
+        ReplChangeManager.FileInfo f = ReplChangeManager
+                .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], 
conf);
         filePaths.add(f);
       } catch (MetaException e) {
         // issue warning for missing file and throw exception

http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index a022b5d..71cdbde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -24,12 +24,15 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 import javax.security.auth.login.LoginException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,6 +49,7 @@ public class CopyUtils {
   private final long maxNumberOfFiles;
   private final boolean hiveInTest;
   private final String copyAsUser;
+  private final int MAX_COPY_RETRY = 3;
 
   public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
@@ -55,29 +59,121 @@ public class CopyUtils {
     this.copyAsUser = distCpDoAsUser;
   }
 
-  public void doCopy(Path destination, List<Path> srcPaths) throws 
IOException, LoginException {
-    Map<FileSystem, List<Path>> map = fsToFileMap(srcPaths);
+  // Used by replication, copy files from source to destination. It is 
possible source file is
+  // changed/removed during copy, so double check the checksum after copy,
+  // if not match, copy again from cm
+  public void copyAndVerify(Path destination, List<ReplChangeManager.FileInfo> 
srcFiles)
+          throws IOException, LoginException {
+    Map<FileSystem, List<ReplChangeManager.FileInfo>> map = 
fsToFileMap(srcFiles);
     FileSystem destinationFs = destination.getFileSystem(hiveConf);
 
+    for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : 
map.entrySet()) {
+      FileSystem sourceFs = entry.getKey();
+      List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue();
+      boolean useRegularCopy = regularCopy(destinationFs, sourceFs, 
fileInfoList);
+
+      doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, 
useRegularCopy);
+
+      // Verify checksum, retry if checksum changed
+      List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>();
+      for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+        if(!srcFile.isUseSourcePath()) {
+          // If already use cmpath, nothing we can do here, skip this file
+          continue;
+        }
+        String sourceChecksumString = srcFile.getCheckSum();
+        if (sourceChecksumString != null) {
+          String verifySourceChecksumString;
+          try {
+            verifySourceChecksumString
+                    = ReplChangeManager.checksumFor(srcFile.getSourcePath(), 
sourceFs);
+          } catch (IOException e) {
+            // Retry with CM path
+            verifySourceChecksumString = null;
+          }
+          if ((verifySourceChecksumString == null)
+                  || !sourceChecksumString.equals(verifySourceChecksumString)) 
{
+            // If checksum does not match, likely the file is changed/removed, 
copy again from cm
+            srcFile.setIsUseSourcePath(false);
+            retryFileInfoList.add(srcFile);
+          }
+        }
+      }
+      if (!retryFileInfoList.isEmpty()) {
+        doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, 
useRegularCopy);
+      }
+    }
+  }
+
+  private void doCopyRetry(FileSystem sourceFs, 
List<ReplChangeManager.FileInfo> fileList,
+                           FileSystem destinationFs, Path destination,
+                           boolean useRegularCopy) throws IOException, 
LoginException {
+    int repeat = 0;
+    List<Path> pathList = Lists.transform(fileList,
+                                          fileInfo -> { return 
fileInfo.getEffectivePath(); });
+    while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) {
+      try {
+        doCopyOnce(sourceFs, pathList, destinationFs, destination, 
useRegularCopy);
+        return;
+      } catch (IOException e) {
+        pathList = new ArrayList<>();
+
+        // Going through file list, retry with CM if applicable
+        for (ReplChangeManager.FileInfo file : fileList) {
+          Path copyPath = file.getEffectivePath();
+          if (!destinationFs.exists(new Path(destination, 
copyPath.getName()))) {
+            if (!sourceFs.exists(copyPath)) {
+              if (file.isUseSourcePath()) {
+                // Source file missing, then try with CM path
+                file.setIsUseSourcePath(false);
+              } else {
+                // CM path itself is missing, so, cannot recover from this 
error
+                throw e;
+              }
+            }
+            pathList.add(file.getEffectivePath());
+          }
+        }
+      }
+      repeat++;
+    }
+  }
+
+  // Copy without retry
+  private void doCopyOnce(FileSystem sourceFs, List<Path> srcList,
+                          FileSystem destinationFs, Path destination,
+                          boolean useRegularCopy) throws IOException, 
LoginException {
     UserGroupInformation ugi = Utils.getUGI();
     String currentUser = ugi.getShortUserName();
     boolean usePrivilegedDistCp = copyAsUser != null && 
!currentUser.equals(copyAsUser);
 
+    if (useRegularCopy) {
+      Path[] paths = srcList.toArray(new Path[] {});
+      FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, 
hiveConf);
+    } else {
+      FileUtils.distCp(
+              sourceFs, // source file system
+              srcList,  // list of source paths
+              destination,
+              false,
+              usePrivilegedDistCp ? copyAsUser : null,
+              hiveConf,
+              ShimLoader.getHadoopShims()
+      );
+    }
+  }
+
+  public void doCopy(Path destination, List<Path> srcPaths) throws 
IOException, LoginException {
+    Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths);
+    FileSystem destinationFs = destination.getFileSystem(hiveConf);
+
     for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
-      if (regularCopy(destinationFs, entry)) {
-        Path[] paths = entry.getValue().toArray(new Path[] {});
-        FileUtil.copy(entry.getKey(), paths, destinationFs, destination, 
false, true, hiveConf);
-      } else {
-        FileUtils.distCp(
-            entry.getKey(),   // source file system
-            entry.getValue(), // list of source paths
-            destination,
-            false,
-            usePrivilegedDistCp ? copyAsUser : null,
-            hiveConf,
-            ShimLoader.getHadoopShims()
-        );
-      }
+      final FileSystem sourceFs = entry.getKey();
+      List<ReplChangeManager.FileInfo> fileList = 
Lists.transform(entry.getValue(),
+                                path -> { return new 
ReplChangeManager.FileInfo(sourceFs, path);});
+      doCopyOnce(sourceFs, entry.getValue(),
+                 destinationFs, destination,
+                 regularCopy(destinationFs, sourceFs, fileList));
     }
   }
 
@@ -88,12 +184,11 @@ public class CopyUtils {
       3. aggregate fileSize of all source Paths(can be directory /  file) is 
less than configured size.
       4. number of files of all source Paths(can be directory /  file) is less 
than configured size.
   */
-  private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, 
List<Path>> entry)
+  private boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, 
List<ReplChangeManager.FileInfo> fileList)
       throws IOException {
     if (hiveInTest) {
       return true;
     }
-    FileSystem sourceFs = entry.getKey();
     if (isLocal(sourceFs) || isLocal(destinationFs)) {
       return true;
     }
@@ -104,8 +199,17 @@ public class CopyUtils {
     long size = 0;
     long numberOfFiles = 0;
 
-    for (Path path : entry.getValue()) {
-      ContentSummary contentSummary = sourceFs.getContentSummary(path);
+    for (ReplChangeManager.FileInfo fileInfo : fileList) {
+      ContentSummary contentSummary = null;
+      try {
+        contentSummary = 
sourceFs.getContentSummary(fileInfo.getEffectivePath());
+      } catch (IOException e) {
+        // in replication, if source file does not exist, try cmroot
+        if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) {
+          contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath());
+          fileInfo.setIsUseSourcePath(false);
+        }
+      }
       size += contentSummary.getLength();
       numberOfFiles += contentSummary.getFileCount();
       if (limitReachedForLocalCopy(size, numberOfFiles)) {
@@ -129,15 +233,28 @@ public class CopyUtils {
     return fs.getScheme().equals("file");
   }
 
-  private Map<FileSystem, List<Path>> fsToFileMap(List<Path> srcPaths) throws 
IOException {
+  private Map<FileSystem, List<Path>> fsToPathMap(List<Path> srcPaths) throws 
IOException {
     Map<FileSystem, List<Path>> result = new HashMap<>();
     for (Path path : srcPaths) {
       FileSystem fileSystem = path.getFileSystem(hiveConf);
       if (!result.containsKey(fileSystem)) {
-        result.put(fileSystem, new ArrayList<>());
+        result.put(fileSystem, new ArrayList<Path>());
       }
       result.get(fileSystem).add(path);
     }
     return result;
   }
+
+  private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap(
+      List<ReplChangeManager.FileInfo> srcFiles) throws IOException {
+    Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>();
+    for (ReplChangeManager.FileInfo file : srcFiles) {
+      FileSystem fileSystem = file.getSrcFs();
+      if (!result.containsKey(fileSystem)) {
+        result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>());
+      }
+      result.get(fileSystem).add(file);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b1ca2a5e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index dd9296a..95fa0a9 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -63,6 +63,54 @@ public class ReplChangeManager {
     COPY
   }
 
+  public static class FileInfo {
+    FileSystem srcFs;
+    Path sourcePath;
+    Path cmPath;
+    String checkSum;
+    boolean useSourcePath;
+
+    public FileInfo(FileSystem srcFs, Path sourcePath) {
+      this.srcFs = srcFs;
+      this.sourcePath = sourcePath;
+      this.cmPath = null;
+      this.checkSum = null;
+      this.useSourcePath = true;
+    }
+    public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, String 
checkSum, boolean useSourcePath) {
+      this.srcFs = srcFs;
+      this.sourcePath = sourcePath;
+      this.cmPath = cmPath;
+      this.checkSum = checkSum;
+      this.useSourcePath = useSourcePath;
+    }
+    public FileSystem getSrcFs() {
+      return srcFs;
+    }
+    public Path getSourcePath() {
+      return sourcePath;
+    }
+    public Path getCmPath() {
+      return cmPath;
+    }
+    public String getCheckSum() {
+      return checkSum;
+    }
+    public boolean isUseSourcePath() {
+      return useSourcePath;
+    }
+    public void setIsUseSourcePath(boolean useSourcePath) {
+      this.useSourcePath = useSourcePath;
+    }
+    public Path getEffectivePath() {
+      if (useSourcePath) {
+        return sourcePath;
+      } else {
+        return cmPath;
+      }
+    }
+  }
+
   public static ReplChangeManager getInstance(Configuration conf) throws 
MetaException {
     if (instance == null) {
       instance = new ReplChangeManager(conf);
@@ -259,25 +307,32 @@ public class ReplChangeManager {
    * @param src Original file location
    * @param checksumString Checksum of the original file
    * @param conf
-   * @return Corresponding FileStatus object
+   * @return Corresponding FileInfo object
    */
-  static public FileStatus getFileStatus(Path src, String checksumString,
-      Configuration conf) throws MetaException {
+  public static FileInfo getFileInfo(Path src, String checksumString, 
Configuration conf)
+          throws MetaException {
     try {
       FileSystem srcFs = src.getFileSystem(conf);
       if (checksumString == null) {
-        return srcFs.getFileStatus(src);
+        return new FileInfo(srcFs, src);
       }
 
+      Path cmPath = getCMPath(conf, src.getName(), checksumString);
       if (!srcFs.exists(src)) {
-        return srcFs.getFileStatus(getCMPath(conf, src.getName(), 
checksumString));
+        return new FileInfo(srcFs, src, cmPath, checksumString, false);
       }
 
-      String currentChecksumString = checksumFor(src, srcFs);
-      if (currentChecksumString == null || 
checksumString.equals(currentChecksumString)) {
-        return srcFs.getFileStatus(src);
+      String currentChecksumString;
+      try {
+        currentChecksumString = checksumFor(src, srcFs);
+      } catch (IOException ex) {
+        // If the file is missing or getting modified, then refer CM path
+        return new FileInfo(srcFs, src, cmPath, checksumString, false);
+      }
+      if ((currentChecksumString == null) || 
checksumString.equals(currentChecksumString)) {
+        return new FileInfo(srcFs, src, cmPath, checksumString, true);
       } else {
-        return srcFs.getFileStatus(getCMPath(conf, src.getName(), 
checksumString));
+        return new FileInfo(srcFs, src, cmPath, checksumString, false);
       }
     } catch (IOException e) {
       throw new MetaException(StringUtils.stringifyException(e));

Reply via email to