aasha commented on a change in pull request #2043:
URL: https://github.com/apache/hive/pull/2043#discussion_r633565588
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -1258,6 +1259,134 @@ private void validateDiffSnapshotsCreated(String
location) throws Exception {
dfs.getFileStatus(new Path(locationPath, ".snapshot/" +
secondSnapshot(primaryDbName.toLowerCase()))));
}
+ @Test
+ public void testSnapshotsWithFiltersCustomDbLevelPaths() throws Throwable {
+ // Directory Structure:
+ // /prefix/project/ <- Specified as custom Location.(Snapshot Root)
+ // /randomStuff <- Not to be copied as part of
external data copy
+ // /warehouse1 <- To be copied, Contains table1 &
table2
+ // /warehouse2 <- To be copied, Contains table3 &
table4
+
+ // Create /prefix/project
+ Path project = new Path("/" + testName.getMethodName() + "/project");
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(project);
+
+ // Create /prefix/project/warehouse1
+ Path warehouse1 = new Path(project, "warehouse1");
+ fs.mkdirs(warehouse1);
+
+ // Create /prefix/project/warehouse2
+ Path warehouse2 = new Path(project, "warehouse2");
+ fs.mkdirs(warehouse2);
+
+ // Table1 Path: /prefix/project/warehouse1/table1
+ Path table1 = new Path(warehouse1, "table1");
+ fs.mkdirs(table1);
+
+ // Table2 Path: /prefix/project/warehouse1/table2
+ Path table2 = new Path(warehouse1, "table2");
+ fs.mkdirs(table2);
+
+ // Table3 Path: /prefix/project/warehouse2/table3
+ Path table3 = new Path(warehouse2, "table3");
+ fs.mkdirs(table3);
+
+ // Table4 Path: /prefix/project/warehouse2/table4
+ Path table4 = new Path(warehouse2, "table4");
+ fs.mkdirs(table4);
+
+ // Random Dir inside the /prefix/project
+ Path random = new Path(project, "randomStuff");
+ fs.mkdirs(random);
+
+ fs.create(new Path(random, "file1")).close();
+ fs.create(new Path(random, "file2")).close();
+ fs.create(new Path(random, "file3")).close();
+
+ // Create a filter file for DistCp
+ Path filterFile = new Path("/tmp/filter");
+ try(FSDataOutputStream stream = fs.create(filterFile)) {
+ stream.writeBytes(".*randomStuff.*");
+ }
+ assertTrue(fs.exists(filterFile.makeQualified(fs.getUri(),
fs.getWorkingDirectory())));
+ FileWriter myWriter = new FileWriter("/tmp/filter");
+ myWriter.write(".*randomStuff.*");
+ myWriter.close();
+
+ // Specify the project directory as the snapshot root using the single
copy task path config.
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'"
+ + REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS.varname + "'='" +
project
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() +
"'");
+
+ // Add Filter file
+ withClause.add("'distcp.options.filters'='" + "/tmp/filter" + "'");
Review comment:
Clean up the filter file after the test
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -675,6 +675,16 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
+ " table or partition level. If hive.exec.parallel \n"
+ "is set to true then max worker threads created for copy can be
hive.exec.parallel.thread.number(determines \n"
+ "number of copy tasks in parallel) * hive.repl.parallel.copy.tasks
"),
+
REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY("hive.repl.externaltable.snapshotdiff.copy",
+ false,"Use snapshot diff for copying data from source to "
+ + "destination cluster for external table in distcp. If true it uses
snapshot based distcp for all the paths "
+ + "configured as part of hive.repl.external.warehouse.single.copy.task
along with the external warehouse "
+ + "default location."),
+
REPL_SNAPSHOT_OVERWRITE_TARGET_FOR_EXTERNAL_TABLE_COPY("hive.repl.externaltable.snapshot.overwrite.target",
Review comment:
where are you not taking the custom location paths?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -217,7 +224,12 @@ public int execute() {
throw e;
} catch (Exception e) {
setException(e);
- int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ int errorCode;
+ if (e instanceof SnapshotException) {
+ errorCode = ErrorMsg.getErrorMsg("SNAPSHOT_ERROR").getErrorCode();
Review comment:
why does snapshot error need to be treated specially?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -342,7 +343,12 @@ public static PathFilter getBootstrapDirectoryFilter(final
FileSystem fs) {
public static int handleException(boolean isReplication, Throwable e, String
nonRecoverablePath,
ReplicationMetricCollector
metricCollector, String stageName, HiveConf conf){
- int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ int errorCode;
+ if (isReplication && e instanceof SnapshotException) {
+ errorCode = ErrorMsg.getErrorMsg("SNAPSHOT_ERROR").getErrorCode();
Review comment:
Is the actual error msg retained so that users can check that?
##########
File path:
shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
##########
@@ -1197,6 +1241,112 @@ public boolean runDistCp(List<Path> srcPaths, Path dst,
Configuration conf) thro
}
}
+ @Override
+ public boolean runDistCpWithSnapshots(String oldSnapshot, String
newSnapshot, List<Path> srcPaths, Path dst, Configuration conf)
+ throws IOException {
+ DistCpOptions options =
+ new DistCpOptions.Builder(srcPaths,
dst).withSyncFolder(true).withUseDiff(oldSnapshot, newSnapshot)
+
.preserve(FileAttribute.BLOCKSIZE).preserve(FileAttribute.XATTR).build();
+
+ List<String> params = constructDistCpWithSnapshotParams(srcPaths, dst,
oldSnapshot, newSnapshot, conf, "-diff");
+ try {
+ conf.setBoolean("mapred.mapper.new-api", true);
+ DistCp distcp = new DistCp(conf, options);
+ int returnCode = distcp.run(params.toArray(new String[0]));
+ if (returnCode == 0) {
+ return true;
+ } else if (returnCode == DistCpConstants.INVALID_ARGUMENT) {
+ // Handling FileNotFoundException, if source got deleted, in that case
we don't want to copy either, So it is
+ // like a success case, we didn't had anything to copy and we copied
nothing, so, we need not to fail.
+ LOG.warn("Copy failed with INVALID_ARGUMENT for source: {} to target:
{} snapshot1: {} snapshot2: {} "
+ + "params: {}", srcPaths, dst, oldSnapshot, newSnapshot, params);
+ return true;
+ } else if (returnCode == DistCpConstants.UNKNOWN_ERROR && conf
+ .getBoolean("hive.repl.externaltable.snapshot.overwrite.target",
true)) {
+ // Check if this error is due to target modified.
+ if (shouldRdiff(dst, conf, oldSnapshot)) {
+ LOG.warn("Copy failed due to target modified. Attempting to restore
back the target. source: {} target: {} "
+ + "snapshot: {}", srcPaths, dst, oldSnapshot);
+ List<String> rParams = constructDistCpWithSnapshotParams(srcPaths,
dst, ".", oldSnapshot, conf, "-rdiff");
+ DistCp rDistcp = new DistCp(conf, options);
+ returnCode = rDistcp.run(rParams.toArray(new String[0]));
+ if (returnCode == 0) {
+ LOG.info("Target restored to previous state. source: {} target:
{} snapshot: {}. Reattempting to copy.",
+ srcPaths, dst, oldSnapshot);
+ dst.getFileSystem(conf).deleteSnapshot(dst, oldSnapshot);
+ dst.getFileSystem(conf).createSnapshot(dst, oldSnapshot);
+ returnCode = distcp.run(params.toArray(new String[0]));
+ if (returnCode == 0) {
+ return true;
+ } else {
+ LOG.error("Copy failed with after target restore for source: {}
to target: {} snapshot1: {} snapshot2: "
+ + "{} params: {}. Return code: {}", srcPaths, dst,
oldSnapshot, newSnapshot, params, returnCode);
+ return false;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Cannot execute DistCp process: ", e);
+ } finally {
+ conf.setBoolean("mapred.mapper.new-api", false);
+ }
+ return false;
+ }
+
+ /**
+ * Checks wether reverse diff on the snapshot should be performed or not.
+ * @param p path where snapshot exists.
+ * @param conf the hive configuration.
+ * @param snapshot the name of snapshot.
+ * @return true, if we need to do rdiff.
+ */
+ private static boolean shouldRdiff(Path p, Configuration conf, String
snapshot) throws Exception {
+ // Using the configuration in string form since hive-shims doesn't have a
dependency on hive-common.
+ boolean isOverwrite =
conf.getBoolean("hive.repl.externaltable.snapshot.overwrite.target", true);
Review comment:
have to be careful to not modify this constant.can you not pass the
value of the conf
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]