pkumarsinha commented on a change in pull request #1232:
URL: https://github.com/apache/hive/pull/1232#discussion_r454865275
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -986,7 +1019,8 @@ String getValidTxnListForReplDump(Hive hiveDb, long
waitUntilTime) throws HiveEx
// phase won't be able to replicate those txns. So, the logic is to wait
for the given amount
// of time to see if all open txns < current txn is getting
aborted/committed. If not, then
// we forcefully abort those txns just like AcidHouseKeeperService.
- ValidTxnList validTxnList = getTxnMgr().getValidTxns();
+ //Exclude readonly and repl created tranasactions
+ ValidTxnList validTxnList =
getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED));
Review comment:
Create the list once , outside the loop.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -550,6 +550,11 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"Indicates the timeout for all transactions which are opened before
triggering bootstrap REPL DUMP. "
+ "If these open transactions are not closed within the timeout
value, then REPL DUMP will "
+ "forcefully abort those transactions and continue with bootstrap
dump."),
+
REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT("hive.repl.bootstrap.dump.abort.write.txn.after.timeout",
Review comment:
nit: remove 'after' from the config name.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -999,20 +1033,27 @@ String getValidTxnListForReplDump(Hive hiveDb, long
waitUntilTime) throws HiveEx
} catch (InterruptedException e) {
LOG.info("REPL DUMP thread sleep interrupted", e);
}
- validTxnList = getTxnMgr().getValidTxns();
+ validTxnList = getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED));
}
// After the timeout just force abort the open txns
- List<Long> openTxns = getOpenTxns(validTxnList);
- if (!openTxns.isEmpty()) {
- hiveDb.abortTransactions(openTxns);
- validTxnList = getTxnMgr().getValidTxns();
- if (validTxnList.getMinOpenTxn() != null) {
- openTxns = getOpenTxns(validTxnList);
- LOG.warn("REPL DUMP unable to force abort all the open txns: {} after
timeout due to unknown reasons. " +
- "However, this is rare case that shouldn't happen.", openTxns);
- throw new IllegalStateException("REPL DUMP triggered abort txns failed
for unknown reasons.");
+ if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) {
+ List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern);
+ if (!openTxns.isEmpty()) {
+ //abort only write transactions for the db under replication if abort
transactions is enabled.
+ hiveDb.abortTransactions(openTxns);
+ validTxnList =
getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED));
Review comment:
Shouldn't already obtained validTxnList be used here?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -975,6 +981,33 @@ private String getValidWriteIdList(String dbName, String
tblName, String validTx
return openTxns;
}
+ List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws
LockException {
+ HiveLockManager lockManager = getTxnMgr().getLockManager();
+ long[] invalidTxns = validTxnList.getInvalidTransactions();
+ List<Long> openTxns = new ArrayList<>();
+ List<Long> dbTxns = new ArrayList<>();
Review comment:
Can be replaced with a HashSet for faster lookup.
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
Review comment:
Also add a test for a case where few open txn from primary db and few
from secondary. During dump txn ids from primary gets aborted but for secondary
they are not.
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables of secondary db for all txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxns);
+ tablesInSecDb.put("t2", (long) numTxns);
+ List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName
+ "_extra",
+ tablesInSecDb, txnHandler, txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, withConfigs);
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
Review comment:
Comment is not correct.
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables of secondary db for all txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxns);
+ tablesInSecDb.put("t2", (long) numTxns);
+ List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName
+ "_extra",
+ tablesInSecDb, txnHandler, txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
Review comment:
Use HiveConf constant
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables of secondary db for all txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxns);
+ tablesInSecDb.put("t2", (long) numTxns);
+ List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName
+ "_extra",
+ tablesInSecDb, txnHandler, txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, withConfigs);
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
+ verifyAllOpenTxnsNotAborted(txns, primaryConf);
+ Map<String, Long> tablesInPrimary = new HashMap<>();
+ tablesInPrimary.put("t1", 1L);
+ tablesInPrimary.put("t2", 2L);
+ verifyNextId(tablesInPrimary, primaryDbName, primaryConf);
+
+ // Bootstrap load which should not replicate the write ids on both tables
as they are on different db.
+ HiveConf replicaConf = replica.getConf();
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[] {"10", "11"});
+
+ // Verify if HWM is properly set after REPL LOAD
+ verifyNextId(tablesInPrimary, replicatedDbName, replicaConf);
+
+ // Verify if none of the write ids are not replicated to the replicated DB
as they belong to diff db
+ for(Map.Entry<String, Long> entry : tablesInPrimary.entrySet()) {
+ entry.setValue((long) 0);
+ }
+ verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName);
+ //Abort the txns
+ txnHandler.abortTxns(new AbortTxnsRequest(txns));
+ //Release the locks
+ releaseLocks(txnHandler, lockIds);
+ }
+
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsAbortDisabled() throws
Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables t1 and t2 for all txns
+ // t1=5+1(insert) and t2=5+2(insert)
+ Map<String, Long> tables = new HashMap<>();
+ tables.put("t1", numTxns + 1L);
+ tables.put("t2", numTxns + 2L);
+ List<Long> lockIds =
allocateWriteIdsForTablesAndAquireLocks(primaryDbName, tables, txnHandler,
txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'",
Review comment:
Use HiveConf constant
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
Review comment:
Add test for bootstrap during incremental case as well
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout()
throws Throwable {
verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables of secondary db for all txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxns);
+ tablesInSecDb.put("t2", (long) numTxns);
+ List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName
+ "_extra",
+ tablesInSecDb, txnHandler, txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, withConfigs);
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
+ verifyAllOpenTxnsNotAborted(txns, primaryConf);
+ Map<String, Long> tablesInPrimary = new HashMap<>();
+ tablesInPrimary.put("t1", 1L);
+ tablesInPrimary.put("t2", 2L);
+ verifyNextId(tablesInPrimary, primaryDbName, primaryConf);
+
+ // Bootstrap load which should not replicate the write ids on both tables
as they are on different db.
+ HiveConf replicaConf = replica.getConf();
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[] {"10", "11"});
+
+ // Verify if HWM is properly set after REPL LOAD
+ verifyNextId(tablesInPrimary, replicatedDbName, replicaConf);
+
+ // Verify if none of the write ids are not replicated to the replicated DB
as they belong to diff db
+ for(Map.Entry<String, Long> entry : tablesInPrimary.entrySet()) {
+ entry.setValue((long) 0);
+ }
+ verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName);
+ //Abort the txns
+ txnHandler.abortTxns(new AbortTxnsRequest(txns));
+ //Release the locks
+ releaseLocks(txnHandler, lockIds);
+ }
+
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsAbortDisabled() throws
Throwable {
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+
+ // Create 2 tables, one partitioned and other not. Also, have both types
of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as
orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ // Allocate write ids for both tables t1 and t2 for all txns
+ // t1=5+1(insert) and t2=5+2(insert)
+ Map<String, Long> tables = new HashMap<>();
+ tables.put("t1", numTxns + 1L);
+ tables.put("t2", numTxns + 2L);
+ List<Long> lockIds =
allocateWriteIdsForTablesAndAquireLocks(primaryDbName, tables, txnHandler,
txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'",
+ "'hive.repl.bootstrap.dump.abort.write.txn.after.timeout'='false'");
+ try {
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, withConfigs);
+ } catch (Exception e) {
+ Assert.assertEquals("REPL DUMP cannot proceed. Force abort all the open
txns is disabled. Enable " +
+ "hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed.",
e.getMessage());
+ }
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
Review comment:
Comment: should not be aborted?
##########
File path:
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2781,6 +2785,7 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest
req)
void add_replication_metrics(1: ReplicationMetricList replicationMetricList)
throws(1:MetaException o1)
ReplicationMetricList get_replication_metrics(1:
GetReplicationMetricsRequest rqst) throws(1:MetaException o1)
+ GetOpenTxnsResponse get_open_txns_req(GetOpenTxnsRequest getOpenTxnsRequest)
Review comment:
Field id missing?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
##########
@@ -186,6 +186,18 @@ void replTableWriteIdState(String validWriteIdList, String
dbName, String tableN
*/
ValidTxnList getValidTxns() throws LockException;
+ /**
+ * Get the transactions that are currently valid. The resulting
+ * {@link ValidTxnList} object can be passed as string to the processing
+ * tasks for use in the reading the data. This call should be made once up
+ * front by the planner and should never be called on the backend,
+ * as this will violate the isolation level semantics.
+ * @return list of valid transactions.
+ * @param txnTypes list of transaction types that should be excluded.
+ * @throws LockException
+ */
+ ValidTxnList getValidTxns(List<TxnType> txnTypes) throws LockException;
Review comment:
List<TxnType> txnTypes : Should be replaced with a filter/regex and same
interface can be used in future for include/exclude all cases.
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
##########
@@ -336,18 +346,30 @@ void verifyInc2Load(String dbName, String lastReplId)
return txns;
}
- void allocateWriteIdsForTables(String primaryDbName, Map<String, Long>
tables,
- TxnStore txnHandler,
- List<Long> txns, HiveConf
primaryConf) throws Throwable {
+ List<Long> allocateWriteIdsForTablesAndAquireLocks(String primaryDbName,
Map<String, Long> tables,
+ TxnStore txnHandler,
+ List<Long> txns, HiveConf
primaryConf) throws Throwable {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
rqst.setDbName(primaryDbName);
-
+ List<Long> lockIds = new ArrayList<>();
for(Map.Entry<String, Long> entry : tables.entrySet()) {
rqst.setTableName(entry.getKey());
rqst.setTxnIds(txns);
txnHandler.allocateTableWriteIds(rqst);
+ for (long txnId : txns) {
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.TABLE,
+ primaryDbName);
+ comp.setTablename(entry.getKey());
+ comp.setOperationType(DataOperationType.UPDATE);
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest lockRequest = new LockRequest(components, "u1",
"hostname");
+ lockRequest.setTxnid(txnId);
+ lockIds.add(txnHandler.lock(lockRequest).getLockid());
Review comment:
Why is this locking required now?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]