pkumarsinha commented on a change in pull request #2097:
URL: https://github.com/apache/hive/pull/2097#discussion_r600119993
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -572,6 +574,11 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"Indicates the timeout for all transactions which are opened before
triggering bootstrap REPL DUMP. "
+ "If these open transactions are not closed within the timeout
value, then REPL DUMP will "
+ "forcefully abort those transactions and continue with bootstrap
dump."),
+
REPL_FAILOVER_DUMP_OPEN_TXN_TIMEOUT("hive.repl.failover.dump.open.txn.timeout",
"1h",
+ new TimeValidator(TimeUnit.HOURS),
+ "Indicates the timeout for all transactions which are opened
before triggering failover. "
+ + "If these open transactions are not closed within the
timeout value, then Repl Dump will not " +
+ "mark this state as failover_ready."),
Review comment:
Why do we need this? If in the current state we can't be ready for
fail-over, we simply try in the next schedule.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -506,6 +506,8 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"HDFS root scratch dir for Hive jobs which gets created with write all
(733) permission. " +
"For each connecting user, an HDFS scratch dir:
${hive.exec.scratchdir}/<username> is created, " +
"with ${hive.scratch.dir.permission}."),
+ HIVE_REPL_FAILOVER("hive.repl.failover.start",false,
+ "Indicates if user wants to trigger failover for reverse
replication."),
Review comment:
A replication policy level config to indicate if user wants to initiate
fail-over to replicate the database in reverse direction. ?? Something of this
sort?
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -350,6 +353,177 @@ public void testAcidTablesCreateTableIncremental() throws
Throwable {
.verifyResults(new String[] {"11"});
}
+ @Test
+ public void testFailoverWithNoOpenTxns() throws Throwable {
+ WarehouseInstance.Tuple dumpData = null;
+
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName);
+
Review comment:
Pass the failover start config to true for bootstrap and verify the file
does not exist
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -350,6 +353,177 @@ public void testAcidTablesCreateTableIncremental() throws
Throwable {
.verifyResults(new String[] {"11"});
}
+ @Test
+ public void testFailoverWithNoOpenTxns() throws Throwable {
+ WarehouseInstance.Tuple dumpData = null;
+
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId);
+
+ List<String> failoverConfigs = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER + "'='true'");
+ dumpData = primary.run("insert into t1 values(1)")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)")
+ .dump(primaryDbName, failoverConfigs);
+
+ FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(dumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath,
FAILOVER_READY_MARKER.toString())));
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[]{"10", "11"});
+
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
+ }
+
+ @Test
+ public void testFailoverWithOpenTxnsDiffDb() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ WarehouseInstance.Tuple dumpData = null;
+
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName);
+
Review comment:
Pass the failover start config to true for bootstrap and verify the file
does not exist
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -350,6 +353,177 @@ public void testAcidTablesCreateTableIncremental() throws
Throwable {
.verifyResults(new String[] {"11"});
}
+ @Test
+ public void testFailoverWithNoOpenTxns() throws Throwable {
+ WarehouseInstance.Tuple dumpData = null;
+
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId);
+
+ List<String> failoverConfigs = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER + "'='true'");
+ dumpData = primary.run("insert into t1 values(1)")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)")
+ .dump(primaryDbName, failoverConfigs);
Review comment:
Add a test for a case to verify the subsequent dump operations are
skipped.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -506,6 +506,8 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"HDFS root scratch dir for Hive jobs which gets created with write all
(733) permission. " +
"For each connecting user, an HDFS scratch dir:
${hive.exec.scratchdir}/<username> is created, " +
"with ${hive.scratch.dir.permission}."),
+ HIVE_REPL_FAILOVER("hive.repl.failover.start",false,
Review comment:
HIVE_REPL_FAILOVER -> HIVE_REPL_FAILOVER_START
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -534,6 +562,37 @@ private boolean isTableSatifiesConfig(Table table) {
return true;
}
+ boolean isFailoverReadyState(long waitUntilTime) throws LockException {
+ List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED);
+ ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns);
+
+ do {
+ List<Long> openTxnListForAllDbs = getOpenTxns(validTxnList);
+ if (openTxnListForAllDbs.isEmpty()) {
+ return true;
+ }
+ //check if all transactions that are open are inserted into the hive
locks table. If not wait and check again.
+ //Transactions table don't contain the db information. DB information is
present only in the hive locks table.
+ //Transactions are inserted into the hive locks table after compilation.
We need to make sure all transactions
+ //that are open have a entry in hive locks which can give us the db
information and then we only wait for open
+ //transactions for the db under replication and not for all open
transactions.
Review comment:
Unlike, getValidTxnListForReplDump, not all the comments might be
applicable in this case. Could you please check and modify?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -414,7 +441,8 @@ private boolean shouldDump(Path previousDumpPath) throws
IOException {
return true;
} else {
FileSystem fs = previousDumpPath.getFileSystem(conf);
- return fs.exists(new Path(previousDumpPath,
LOAD_ACKNOWLEDGEMENT.toString()));
+ return fs.exists(new Path(previousDumpPath,
LOAD_ACKNOWLEDGEMENT.toString()))
+ && !fs.exists(new Path(previousDumpPath,
FAILOVER_READY_MARKER.toString()));
Review comment:
It would be helpful to add a log statement the reason if it is skipped
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -414,7 +441,8 @@ private boolean shouldDump(Path previousDumpPath) throws
IOException {
return true;
} else {
FileSystem fs = previousDumpPath.getFileSystem(conf);
- return fs.exists(new Path(previousDumpPath,
LOAD_ACKNOWLEDGEMENT.toString()));
+ return fs.exists(new Path(previousDumpPath,
LOAD_ACKNOWLEDGEMENT.toString()))
+ && !fs.exists(new Path(previousDumpPath,
FAILOVER_READY_MARKER.toString()));
Review comment:
Also, we should reflect this in replication metrics as well in so that
the orchestration layer would know that dump is in failover state currently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]