aasha commented on a change in pull request #2043:
URL: https://github.com/apache/hive/pull/2043#discussion_r596568836



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
##########
@@ -47,6 +50,7 @@ public int execute() {
       }
       Path ackPath = work.getAckFilePath();
       Utils.create(ackPath, conf);
+//      createSnapshot();

Review comment:
       remove this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -945,6 +962,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path 
cmRoot, Hive hiveDb)
               conf.getBoolVar(REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK)
                   && work.replScope.includeAllTables();
           boolean isExternalTablePresent = false;
+          boolean isSnapshotEnabed =

Review comment:
       repeated code. can this be added to a common place

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -203,4 +213,87 @@ public String getName() {
   public boolean canExecuteInParallel() {
     return true;
   }
+
+  void copyUsingDistCpSnapshots(Path sourcePath, Path targetPath,
+      UserGroupInformation proxyUser) throws IOException {
+
+    DistributedFileSystem sourceFs = SnapshotUtils.getDFS(sourcePath, conf);
+    DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, conf);
+    if (sourceFs == null || targetFs == null) {
+      LOG.error("Source and Destination filesystem are not "
+              + "DistributedFileSystem, using normal copy instead of snapshot "
+              + "copy, Source Path {}, Target Path {}, Source fs is {}, and "
+              + "Target fs {}", sourcePath, targetPath,
+          sourcePath.getFileSystem(conf).getClass(),
+          targetPath.getFileSystem(conf).getClass());
+      FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+          Collections.singletonList(sourcePath), // list of source paths
+          targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+      // Since source/dest aren't DFS, no point trying to create snapshot at
+      // target, return from here.
+      return;
+    }
+    String prefix = conf.getVar(
+        HiveConf.ConfVars.REPL_SNAPSHOT_PREFIX_FOR_EXTERNAL_TABLE_COPY);
+    if (getWork().getCopyMode()
+        .equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+      LOG.info("Using snapshot diff copy for source: {} and target: {}",
+          sourcePath, targetPath);
+      boolean result = FileUtils
+          .distCpWithSnapshot(sourceFs, prefix + "old", prefix + "initial",
+              Collections.singletonList(sourcePath), targetPath, proxyUser,
+              conf, ShimLoader.getHadoopShims());
+      if (!result) {
+        LOG.error("Can not copy using snapshot diff for source: {} and "
+                + "target: {}. Falling back to normal copy.", sourcePath,
+            targetPath);
+        FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+            Collections.singletonList(sourcePath), // list of source paths
+            targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+      }
+    } else if (getWork().getCopyMode()
+        .equals(SnapshotUtils.SnapshotCopyMode.INITIAL_COPY)) {
+      LOG.info("Using snapshot initial copy for source: {} and target: {}",
+          sourcePath, targetPath);
+      // Try allowSnapshot at target first, if success or already allowed
+      // will use snapshots to copy else fallback.
+      boolean isTargetSnapshottable =
+          SnapshotUtils.allowSnapshot(targetFs, targetPath);
+      if (!isTargetSnapshottable) {
+        // We can not allow creating snapshot at target, so no point moving
+        // to snapshot mode of copy, fallback to normal copy.
+        LOG.error("Can not copy from initial snapshot directory for source: {} 
"
+            + "and target: {}. Since target is not snapshottable. Falling "
+            + "back to normal copy.", sourcePath, targetPath);
+        FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+            Collections.singletonList(sourcePath), // list of source paths
+            targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+        // Returning to avoid creating snapshots at target.
+        return;
+      } else {
+        // Get the path relative to the initial snapshot for copy.
+        Path snapRelPath = new Path(sourcePath,
+            HdfsConstants.DOT_SNAPSHOT_DIR + "/" + prefix + "initial");
+        boolean result = FileUtils.distCp(sourcePath.getFileSystem(conf), //
+            // source file system
+            Collections.singletonList(snapRelPath), // source path relative to
+            // snapshot
+            targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+        if (!result) {
+          LOG.error(
+              "Can not copy from initial snapshot directory for source: {} "
+                  + "and target: {}. Falling back to normal copy.", 
snapRelPath,
+              targetPath);
+          FileUtils.distCp(sourcePath.getFileSystem(conf), // source file 
system
+              Collections.singletonList(sourcePath), // list of source paths
+              targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+        }
+      }
+    }
+    // Create snapshot at target Filesystem, ignore exceptions if any,
+    // since copy is success, if creation fails, the next iteration shall
+    // handle.
+    SnapshotUtils.deleteSnapshotSafe(targetFs, targetPath, prefix + "old");

Review comment:
       this deletes older snapshot? Add a comment for clarity

##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -691,6 +690,31 @@ public static boolean distCp(FileSystem srcFS, List<Path> 
srcPaths, Path dst,
     return copied;
   }
 
+  public static boolean distCpWithSnapshot(FileSystem srcFs, String snap1,
+      String snap2, List<Path> srcPaths, Path dst,
+      UserGroupInformation proxyUser, HiveConf conf, HadoopShims shims) {
+    boolean copied;
+    try {
+      if (proxyUser == null) {
+        copied =
+            shims.runDistCpWithSnapshots(snap1, snap2, srcPaths, dst, conf);
+      } else {
+        copied = shims
+            .runDistCpWithSnapshotsAs(snap1, snap2, srcPaths, dst, conf,
+                proxyUser);
+      }
+    } catch (Exception e) {
+      LOG.error("Can not copy using snapshot from source: {}, target: {}",
+          srcPaths, dst);
+      copied = false;

Review comment:
       can set the default value to false during declaration

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -203,4 +213,87 @@ public String getName() {
   public boolean canExecuteInParallel() {
     return true;
   }
+
+  void copyUsingDistCpSnapshots(Path sourcePath, Path targetPath,

Review comment:
       can be moved to FileUtils?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -162,14 +165,21 @@ public int execute() {
           // do we create a new conf and only here provide this additional 
option so that we get away from
           // differences of data in two location for the same directories ?
           // basically add distcp.options.delete to hiveconf new object ?
-          FileUtils.distCp(
-            sourcePath.getFileSystem(conf), // source file system
-            Collections.singletonList(sourcePath),  // list of source paths
-            targetPath,
-            false,
-            proxyUser,
-            conf,
-            ShimLoader.getHadoopShims());
+          if (!getWork().getCopyMode()
+              .equals(SnapshotUtils.SnapshotCopyMode.FALLBACK_COPY)) {
+            LOG.info("Using Snapshot mode of copy for source: {} and target:"
+                + " {}", sourcePath, targetPath);
+            copyUsingDistCpSnapshots(sourcePath, targetPath, proxyUser);
+            // Use distcp with snapshots for copy.

Review comment:
       nit : comment above the code statement

##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -691,6 +690,31 @@ public static boolean distCp(FileSystem srcFS, List<Path> 
srcPaths, Path dst,
     return copied;
   }
 
+  public static boolean distCpWithSnapshot(FileSystem srcFs, String snap1,
+      String snap2, List<Path> srcPaths, Path dst,
+      UserGroupInformation proxyUser, HiveConf conf, HadoopShims shims) {
+    boolean copied;
+    try {
+      if (proxyUser == null) {
+        copied =
+            shims.runDistCpWithSnapshots(snap1, snap2, srcPaths, dst, conf);
+      } else {
+        copied = shims
+            .runDistCpWithSnapshotsAs(snap1, snap2, srcPaths, dst, conf,
+                proxyUser);
+      }
+    } catch (Exception e) {
+      LOG.error("Can not copy using snapshot from source: {}, target: {}",
+          srcPaths, dst);
+      copied = false;
+    }
+    if(copied)

Review comment:
       can be within the try catch block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to