ayushtkn commented on a change in pull request #2043:
URL: https://github.com/apache/hive/pull/2043#discussion_r600579986
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -203,4 +213,87 @@ public String getName() {
public boolean canExecuteInParallel() {
return true;
}
+
+ void copyUsingDistCpSnapshots(Path sourcePath, Path targetPath,
+ UserGroupInformation proxyUser) throws IOException {
+
+ DistributedFileSystem sourceFs = SnapshotUtils.getDFS(sourcePath, conf);
+ DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, conf);
+ if (sourceFs == null || targetFs == null) {
+ LOG.error("Source and Destination filesystem are not "
+ + "DistributedFileSystem, using normal copy instead of snapshot "
+ + "copy, Source Path {}, Target Path {}, Source fs is {}, and "
+ + "Target fs {}", sourcePath, targetPath,
+ sourcePath.getFileSystem(conf).getClass(),
+ targetPath.getFileSystem(conf).getClass());
+ FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+ // Since source/dest aren't DFS, no point trying to create snapshot at
+ // target, return from here.
+ return;
+ }
+ String prefix = conf.getVar(
+ HiveConf.ConfVars.REPL_SNAPSHOT_PREFIX_FOR_EXTERNAL_TABLE_COPY);
+ if (getWork().getCopyMode()
+ .equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+ LOG.info("Using snapshot diff copy for source: {} and target: {}",
+ sourcePath, targetPath);
+ boolean result = FileUtils
+ .distCpWithSnapshot(sourceFs, prefix + "old", prefix + "initial",
+ Collections.singletonList(sourcePath), targetPath, proxyUser,
+ conf, ShimLoader.getHadoopShims());
+ if (!result) {
+ LOG.error("Can not copy using snapshot diff for source: {} and "
+ + "target: {}. Falling back to normal copy.", sourcePath,
+ targetPath);
+ FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+ }
+ } else if (getWork().getCopyMode()
+ .equals(SnapshotUtils.SnapshotCopyMode.INITIAL_COPY)) {
+ LOG.info("Using snapshot initial copy for source: {} and target: {}",
+ sourcePath, targetPath);
+ // Try allowSnapshot at target first, if success or already allowed
+ // will use snapshots to copy else fallback.
+ boolean isTargetSnapshottable =
+ SnapshotUtils.allowSnapshot(targetFs, targetPath);
+ if (!isTargetSnapshottable) {
+ // We can not allow creating snapshot at target, so no point moving
+ // to snapshot mode of copy, fallback to normal copy.
+ LOG.error("Can not copy from initial snapshot directory for source: {}
"
+ + "and target: {}. Since target is not snapshottable. Falling "
+ + "back to normal copy.", sourcePath, targetPath);
+ FileUtils.distCp(sourcePath.getFileSystem(conf), // source file system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+ // Returning to avoid creating snapshots at target.
+ return;
+ } else {
+ // Get the path relative to the initial snapshot for copy.
+ Path snapRelPath = new Path(sourcePath,
+ HdfsConstants.DOT_SNAPSHOT_DIR + "/" + prefix + "initial");
+ boolean result = FileUtils.distCp(sourcePath.getFileSystem(conf), //
+ // source file system
+ Collections.singletonList(snapRelPath), // source path relative to
+ // snapshot
+ targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+ if (!result) {
+ LOG.error(
+ "Can not copy from initial snapshot directory for source: {} "
+ + "and target: {}. Falling back to normal copy.",
snapRelPath,
+ targetPath);
+ FileUtils.distCp(sourcePath.getFileSystem(conf), // source file
system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath, false, proxyUser, conf, ShimLoader.getHadoopShims());
+ }
+ }
+ }
+ // Create snapshot at target Filesystem, ignore exceptions if any,
+ // since copy is success, if creation fails, the next iteration shall
+ // handle.
+ SnapshotUtils.deleteSnapshotSafe(targetFs, targetPath, prefix + "old");
Review comment:
added explanation in the comment, Added javadoc to the util method as
well, for posterity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]