[ https://issues.apache.org/jira/browse/HIVE-23560?focusedWorklogId=459458&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-459458 ]
ASF GitHub Bot logged work on HIVE-23560: ----------------------------------------- Author: ASF GitHub Bot Created on: 15/Jul/20 18:16 Start Date: 15/Jul/20 18:16 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1232: URL: https://github.com/apache/hive/pull/1232#discussion_r454865275 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ########## @@ -986,7 +1019,8 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx // phase won't be able to replicate those txns. So, the logic is to wait for the given amount // of time to see if all open txns < current txn is getting aborted/committed. If not, then // we forcefully abort those txns just like AcidHouseKeeperService. - ValidTxnList validTxnList = getTxnMgr().getValidTxns(); + //Exclude readonly and repl created tranasactions + ValidTxnList validTxnList = getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED)); Review comment: Create the list once , outside the loop. ########## File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ########## @@ -550,6 +550,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal "Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. " + "If these open transactions are not closed within the timeout value, then REPL DUMP will " + "forcefully abort those transactions and continue with bootstrap dump."), + REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT("hive.repl.bootstrap.dump.abort.write.txn.after.timeout", Review comment: nit: remove 'after' from the config name. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ########## @@ -999,20 +1033,27 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx } catch (InterruptedException e) { LOG.info("REPL DUMP thread sleep interrupted", e); } - validTxnList = getTxnMgr().getValidTxns(); + validTxnList = getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED)); } // After the timeout just force abort the open txns - List<Long> openTxns = getOpenTxns(validTxnList); - if (!openTxns.isEmpty()) { - hiveDb.abortTransactions(openTxns); - validTxnList = getTxnMgr().getValidTxns(); - if (validTxnList.getMinOpenTxn() != null) { - openTxns = getOpenTxns(validTxnList); - LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + - "However, this is rare case that shouldn't happen.", openTxns); - throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons."); + if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) { + List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern); + if (!openTxns.isEmpty()) { + //abort only write transactions for the db under replication if abort transactions is enabled. + hiveDb.abortTransactions(openTxns); + validTxnList = getTxnMgr().getValidTxns(Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED)); Review comment: Shouldn't already obtained validTxnList be used here? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ########## @@ -975,6 +981,33 @@ private String getValidWriteIdList(String dbName, String tblName, String validTx return openTxns; } + List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws LockException { + HiveLockManager lockManager = getTxnMgr().getLockManager(); + long[] invalidTxns = validTxnList.getInvalidTransactions(); + List<Long> openTxns = new ArrayList<>(); + List<Long> dbTxns = new ArrayList<>(); Review comment: Can be replaced with a HashSet for faster lookup. ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test + public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable { Review comment: Also add a test for a case where few open txn from primary db and few from secondary. During dump txn ids from primary gets aborted but for secondary they are not. ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test + public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables of secondary db for all txns + // t1=5 and t2=5 + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxns); + tablesInSecDb.put("t2", (long) numTxns); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'"); + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + + // After bootstrap dump, all the opened txns should be aborted. Verify it. Review comment: Comment is not correct. ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test + public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables of secondary db for all txns + // t1=5 and t2=5 + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxns); + tablesInSecDb.put("t2", (long) numTxns); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'"); Review comment: Use HiveConf constant ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test + public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables of secondary db for all txns + // t1=5 and t2=5 + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxns); + tablesInSecDb.put("t2", (long) numTxns); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'"); + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + + // After bootstrap dump, all the opened txns should be aborted. Verify it. + verifyAllOpenTxnsNotAborted(txns, primaryConf); + Map<String, Long> tablesInPrimary = new HashMap<>(); + tablesInPrimary.put("t1", 1L); + tablesInPrimary.put("t2", 2L); + verifyNextId(tablesInPrimary, primaryDbName, primaryConf); + + // Bootstrap load which should not replicate the write ids on both tables as they are on different db. + HiveConf replicaConf = replica.getConf(); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t1", "t2"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1") + .verifyResults(new String[]{"1"}) + .run("select rank from t2 order by rank") + .verifyResults(new String[] {"10", "11"}); + + // Verify if HWM is properly set after REPL LOAD + verifyNextId(tablesInPrimary, replicatedDbName, replicaConf); + + // Verify if none of the write ids are not replicated to the replicated DB as they belong to diff db + for(Map.Entry<String, Long> entry : tablesInPrimary.entrySet()) { + entry.setValue((long) 0); + } + verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName); + //Abort the txns + txnHandler.abortTxns(new AbortTxnsRequest(txns)); + //Release the locks + releaseLocks(txnHandler, lockIds); + } + + @Test + public void testAcidTablesBootstrapWithOpenTxnsAbortDisabled() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables t1 and t2 for all txns + // t1=5+1(insert) and t2=5+2(insert) + Map<String, Long> tables = new HashMap<>(); + tables.put("t1", numTxns + 1L); + tables.put("t2", numTxns + 2L); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName, tables, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'", Review comment: Use HiveConf constant ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test Review comment: Add test for bootstrap during incremental case as well ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -274,6 +277,120 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { verifyCompactionQueue(tables, replicatedDbName, replicaConf); } + @Test + public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables of secondary db for all txns + // t1=5 and t2=5 + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxns); + tablesInSecDb.put("t2", (long) numTxns); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'"); + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + + // After bootstrap dump, all the opened txns should be aborted. Verify it. + verifyAllOpenTxnsNotAborted(txns, primaryConf); + Map<String, Long> tablesInPrimary = new HashMap<>(); + tablesInPrimary.put("t1", 1L); + tablesInPrimary.put("t2", 2L); + verifyNextId(tablesInPrimary, primaryDbName, primaryConf); + + // Bootstrap load which should not replicate the write ids on both tables as they are on different db. + HiveConf replicaConf = replica.getConf(); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t1", "t2"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1") + .verifyResults(new String[]{"1"}) + .run("select rank from t2 order by rank") + .verifyResults(new String[] {"10", "11"}); + + // Verify if HWM is properly set after REPL LOAD + verifyNextId(tablesInPrimary, replicatedDbName, replicaConf); + + // Verify if none of the write ids are not replicated to the replicated DB as they belong to diff db + for(Map.Entry<String, Long> entry : tablesInPrimary.entrySet()) { + entry.setValue((long) 0); + } + verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName); + //Abort the txns + txnHandler.abortTxns(new AbortTxnsRequest(txns)); + //Release the locks + releaseLocks(txnHandler, lockIds); + } + + @Test + public void testAcidTablesBootstrapWithOpenTxnsAbortDisabled() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Allocate write ids for both tables t1 and t2 for all txns + // t1=5+1(insert) and t2=5+2(insert) + Map<String, Long> tables = new HashMap<>(); + tables.put("t1", numTxns + 1L); + tables.put("t2", numTxns + 2L); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName, tables, txnHandler, txns, primaryConf); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'", + "'hive.repl.bootstrap.dump.abort.write.txn.after.timeout'='false'"); + try { + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + } catch (Exception e) { + Assert.assertEquals("REPL DUMP cannot proceed. Force abort all the open txns is disabled. Enable " + + "hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed.", e.getMessage()); + } + + // After bootstrap dump, all the opened txns should be aborted. Verify it. Review comment: Comment: should not be aborted? ########## File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift ########## @@ -2781,6 +2785,7 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req) void add_replication_metrics(1: ReplicationMetricList replicationMetricList) throws(1:MetaException o1) ReplicationMetricList get_replication_metrics(1: GetReplicationMetricsRequest rqst) throws(1:MetaException o1) + GetOpenTxnsResponse get_open_txns_req(GetOpenTxnsRequest getOpenTxnsRequest) Review comment: Field id missing? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ########## @@ -186,6 +186,18 @@ void replTableWriteIdState(String validWriteIdList, String dbName, String tableN */ ValidTxnList getValidTxns() throws LockException; + /** + * Get the transactions that are currently valid. The resulting + * {@link ValidTxnList} object can be passed as string to the processing + * tasks for use in the reading the data. This call should be made once up + * front by the planner and should never be called on the backend, + * as this will violate the isolation level semantics. + * @return list of valid transactions. + * @param txnTypes list of transaction types that should be excluded. + * @throws LockException + */ + ValidTxnList getValidTxns(List<TxnType> txnTypes) throws LockException; Review comment: List<TxnType> txnTypes : Should be replaced with a filter/regex and same interface can be used in future for include/exclude all cases. ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java ########## @@ -336,18 +346,30 @@ void verifyInc2Load(String dbName, String lastReplId) return txns; } - void allocateWriteIdsForTables(String primaryDbName, Map<String, Long> tables, - TxnStore txnHandler, - List<Long> txns, HiveConf primaryConf) throws Throwable { + List<Long> allocateWriteIdsForTablesAndAquireLocks(String primaryDbName, Map<String, Long> tables, + TxnStore txnHandler, + List<Long> txns, HiveConf primaryConf) throws Throwable { AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(); rqst.setDbName(primaryDbName); - + List<Long> lockIds = new ArrayList<>(); for(Map.Entry<String, Long> entry : tables.entrySet()) { rqst.setTableName(entry.getKey()); rqst.setTxnIds(txns); txnHandler.allocateTableWriteIds(rqst); + for (long txnId : txns) { + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + primaryDbName); + comp.setTablename(entry.getKey()); + comp.setOperationType(DataOperationType.UPDATE); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest lockRequest = new LockRequest(components, "u1", "hostname"); + lockRequest.setTxnid(txnId); + lockIds.add(txnHandler.lock(lockRequest).getLockid()); Review comment: Why is this locking required now? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 459458) Time Spent: 20m (was: 10m) > Optimize bootstrap dump to abort only write Transactions > -------------------------------------------------------- > > Key: HIVE-23560 > URL: https://issues.apache.org/jira/browse/HIVE-23560 > Project: Hive > Issue Type: Task > Reporter: Aasha Medhi > Assignee: Aasha Medhi > Priority: Major > Labels: pull-request-available > Attachments: HIVE-23560.01.patch, HIVE-23560.02.patch, Optimize > bootstrap dump to avoid aborting all transactions.pdf > > Time Spent: 20m > Remaining Estimate: 0h > > Currently before doing a bootstrap dump, we abort all open transactions after > waiting for a configured time. We are proposing to abort only write > transactions for the db under replication and leave the read and repl created > transactions as is. > This doc attached talks about it in detail -- This message was sent by Atlassian Jira (v8.3.4#803005)