This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 67c2d4910ff HIVE-26316: Handle dangling open txns on both src & tgt in unplanned failover. (Haymant Mangla reviewed by Peter Vary) (#3367) 67c2d4910ff is described below commit 67c2d4910ff17c694653eb8bd9c9ed2405cec38b Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com> AuthorDate: Thu Jun 16 15:11:22 2022 +0530 HIVE-26316: Handle dangling open txns on both src & tgt in unplanned failover. (Haymant Mangla reviewed by Peter Vary) (#3367) --- .../parse/TestReplicationOptimisedBootstrap.java | 141 ++++++++++++++++++++- .../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 92 ++++++++++++-- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 77 +++++------ .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 41 +++++- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 61 +++++++++ .../repl/dump/events/AbstractEventHandler.java | 11 +- 6 files changed, 349 insertions(+), 74 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index 673e41b3065..dd6821dc578 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -24,12 +24,16 @@ import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.security.UserGroupInformation; import org.jetbrains.annotations.NotNull; @@ -55,7 +59,6 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY; -import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile; @@ -71,6 +74,10 @@ import static org.junit.Assert.fail; public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosAcidTables { String extraPrimaryDb; + HiveConf primaryConf; + TxnStore txnHandler; + List<Long> tearDownTxns = new ArrayList<>(); + List<Long> tearDownLockIds = new ArrayList<>(); @BeforeClass public static void classLevelSetup() throws Exception { @@ -90,10 +97,19 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA public void setup() throws Throwable { super.setup(); extraPrimaryDb = "extra_" + primaryDbName; + primaryConf = primary.getConf(); + txnHandler = TxnUtils.getTxnStore(primary.getConf()); } @After public void tearDown() throws Throwable { + if (!tearDownTxns.isEmpty()) { + //Abort the left out transactions which might not be completed due to some test failures. + txnHandler.abortTxns(new AbortTxnsRequest(tearDownTxns)); + } + //Release the unreleased locks acquired during tests. Although, we specifically release the locks when not required. + //But there may be case when test failed and locks are left in dangling state. + releaseLocks(txnHandler, tearDownLockIds); primary.run("drop database if exists " + extraPrimaryDb + " cascade"); super.tearDown(); } @@ -468,47 +484,56 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA @Test public void testReverseBootstrap() throws Throwable { - HiveConf primaryConf = primary.getConf(); - TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); List<String> withClause = setUpFirstIterForOptimisedBootstrap(); // Open 3 txns for Database which is not under replication int numTxnsForSecDb = 3; List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf); + tearDownTxns.addAll(txnsForSecDb); Map<String, Long> tablesInSecDb = new HashMap<>(); - tablesInSecDb.put("t1", (long) numTxnsForSecDb); - tablesInSecDb.put("t2", (long) numTxnsForSecDb); + tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4); + tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4); List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra", tablesInSecDb, txnHandler, txnsForSecDb, primaryConf); + tearDownLockIds.addAll(lockIdsForSecDb); //Open 2 txns for Primary Db int numTxnsForPrimaryDb = 2; List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf); + tearDownTxns.addAll(txnsForSourceDb); // Allocate write ids for both tables of source database. Map<String, Long> tablesInSourceDb = new HashMap<>(); - tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4); + tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6); tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb); - allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler, + List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler, txnsForSourceDb, replica.getConf()); + tearDownLockIds.addAll(lockIdsForSourceDb); //Open 1 txn with no hive locks acquired List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf); + tearDownTxns.addAll(txnsWithNoLocks); // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for // rest. + List<Long> allReplCreatedTxnsOnSource = getReplCreatedTxns(); + tearDownTxns.addAll(allReplCreatedTxnsOnSource); assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1"))); WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause); + verifyAllOpenTxnsAborted(allReplCreatedTxnsOnSource, primaryConf); + //Verify that openTxns for sourceDb were aborted before proceeding with bootstrap dump. verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf); verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf); verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf); txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb)); + txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb)); txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks)); releaseLocks(txnHandler, lockIdsForSecDb); + releaseLocks(txnHandler, lockIdsForSecDb); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // _bootstrap directory should be created as bootstrap enabled on external tables. @@ -829,6 +854,35 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA primary.dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName, withClause); + // Open 3 txns for Database which is not under replication + int numTxnsForSecDb = 3; + List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf); + tearDownTxns.addAll(txnsForSecDb); + + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxnsForSecDb); + tablesInSecDb.put("t2", (long) numTxnsForSecDb); + List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txnsForSecDb, primaryConf); + tearDownLockIds.addAll(lockIdsForSecDb); + + //Open 2 txns for Primary Db + int numTxnsForPrimaryDb = 2; + List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf); + tearDownTxns.addAll(txnsForSourceDb); + + // Allocate write ids for both tables of source database. + Map<String, Long> tablesInSourceDb = new HashMap<>(); + tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb); + tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb); + List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, txnHandler, + txnsForSourceDb, primary.getConf()); + tearDownLockIds.addAll(lockIdsForSourceDb); + + //Open 1 txn with no hive locks acquired + List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf); + tearDownTxns.addAll(txnsWithNoLocks); + // Create 4 managed tables and do a dump & load. WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) @@ -850,6 +904,49 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA .verifyResult("t1").run("show tables like 't2'").verifyResult("t2").run("show tables like 't3'") .verifyResult("t3").run("show tables like 't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName); + String forwardReplPolicy = HiveUtils.getReplPolicy(replicatedDbName); + List<Long> targetReplCreatedTxnIds = new ArrayList<>(); + for (Long txn: txnsForSecDb) { + targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn)); + } + for (Long txn: txnsForSourceDb) { + targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn)); + } + for (Long txn: txnsWithNoLocks) { + targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, txn)); + } + + verifyAllOpenTxnsNotAborted(targetReplCreatedTxnIds, primaryConf); + + //Open New transactions on original source cluster post it went down. + + // Open 1 txn for secondary Database + List<Long> newTxnsForSecDb = openTxns(1, txnHandler, primaryConf); + tearDownTxns.addAll(newTxnsForSecDb); + + Map<String, Long> newTablesForSecDb = new HashMap<>(); + newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1); + newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1); + List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra", + newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf); + tearDownLockIds.addAll(newLockIdsForSecDb); + + //Open 1 txn for Primary Db + List<Long> newTxnsForSourceDb = openTxns(1, txnHandler, primaryConf); + tearDownTxns.addAll(newTxnsForSourceDb); + + // Allocate write ids for both tables of source database. + Map<String, Long> newTablesInSourceDb = new HashMap<>(); + newTablesInSourceDb.put("t1", (long) 5); + newTablesInSourceDb.put("t5", (long) 3); + List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, txnHandler, + newTxnsForSourceDb, primary.getConf()); + tearDownLockIds.addAll(newLockIdsForSourceDb); + + //Open 1 txn with no hive locks acquired + List<Long> newTxnsWithNoLock = openTxns(1, txnHandler, primaryConf); + tearDownTxns.addAll(newTxnsWithNoLock); + // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3) primary.run("use " + primaryDbName).run("create table tnew_managed (id int) clustered by(id) into 3 buckets " + "stored as orc tblproperties (\"transactional\"=\"true\")") @@ -886,14 +983,44 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA // Do a load, this should create a table_diff_complete directory primary.load(primaryDbName, replicatedDbName, withClause); + verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf); + verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf); + verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf); + verifyAllOpenTxnsAborted(newTxnsForSourceDb, primaryConf); + verifyAllOpenTxnsNotAborted(newTxnsForSecDb, primaryConf); + verifyAllOpenTxnsNotAborted(newTxnsWithNoLock, primaryConf); + + txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb)); + releaseLocks(txnHandler, lockIdsForSecDb); + txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks)); + txnHandler.abortTxns(new AbortTxnsRequest(newTxnsForSecDb)); + releaseLocks(txnHandler, newLockIdsForSecDb); + txnHandler.abortTxns(new AbortTxnsRequest(newTxnsWithNoLock)); + // Check the table diff directory exist. assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist", replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY))); + assertTrue(new Path(tuple.dumpLocation, OptimisedBootstrapUtils.ABORT_TXNS_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, OptimisedBootstrapUtils.ABORT_TXNS_FILE))); + + List<Long> txnsInAbortTxnFile = OptimisedBootstrapUtils. + getTxnIdFromAbortTxnsFile(new Path(tuple.dumpLocation), primaryConf); + assertTrue (txnsInAbortTxnFile.containsAll(txnsForSourceDb)); + assertTrue (txnsInAbortTxnFile.containsAll(txnsForSecDb)); + assertTrue (txnsInAbortTxnFile.containsAll(txnsWithNoLocks)); + assertEquals (txnsInAbortTxnFile.size(), txnsForSecDb.size() + txnsForSourceDb.size() + txnsWithNoLocks.size()); + // Check the table diff has all the modified table, including the dropped and empty ones HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf); assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries.containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3"))); return withClause; } + + List<Long> getReplCreatedTxns() throws MetaException { + List<TxnType> excludedTxns = Arrays.asList(TxnType.DEFAULT, TxnType.READ_ONLY, TxnType.COMPACTION, + TxnType.MATER_VIEW_REBUILD, TxnType.SOFT_DELETE); + return txnHandler.getOpenTxns(excludedTxns).getOpen_txns(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java index f3aa5302832..7074226e14f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java @@ -29,6 +29,11 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -68,6 +73,9 @@ public class OptimisedBootstrapUtils { /** table diff directory when complete */ public static final String TABLE_DIFF_COMPLETE_DIRECTORY = "table_diff_complete"; + /** abort Txns file which contains all the txns that needs to be aborted on new source cluster(initial target)*/ + public static final String ABORT_TXNS_FILE = "abort_txns"; + /** event ack file which contains the event id till which the cluster was last loaded. */ public static final String EVENT_ACK_FILE = "event_ack"; @@ -90,6 +98,63 @@ public class OptimisedBootstrapUtils { return fs.exists(new Path(dumpPath, fileName)); } + public static void prepareAbortTxnsFile(List<NotificationEvent> notificationEvents, Set<Long> allOpenTxns, + Path dumpPath, HiveConf conf) throws SemanticException { + if (notificationEvents.size() == 0) { + return; + } + Set<Long> txnsOpenedPostCurrEventId = new HashSet<>(); + MessageDeserializer deserializer = ReplUtils.getEventDeserializer(notificationEvents.get(0)); + for (NotificationEvent event: notificationEvents) { + switch (event.getEventType()) { + case MessageBuilder.OPEN_TXN_EVENT: + OpenTxnMessage openTxnMessage = deserializer.getOpenTxnMessage(event.getMessage()); + txnsOpenedPostCurrEventId.addAll(openTxnMessage.getTxnIds()); + allOpenTxns.removeAll(openTxnMessage.getTxnIds()); + break; + case MessageBuilder.ABORT_TXN_EVENT: + AbortTxnMessage abortTxnMessage = deserializer.getAbortTxnMessage(event.getMessage()); + if (!txnsOpenedPostCurrEventId.contains(abortTxnMessage.getTxnId())) { + allOpenTxns.add(abortTxnMessage.getTxnId()); + } + break; + case MessageBuilder.COMMIT_TXN_EVENT: + CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage()); + if (!txnsOpenedPostCurrEventId.contains(commitTxnMessage.getTxnId())) { + allOpenTxns.add(commitTxnMessage.getTxnId()); + } + break; + } + } + if (!allOpenTxns.isEmpty()) { + Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath, ABORT_TXNS_FILE), conf); + } + } + + public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf conf) throws IOException { + String input; + Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE); + FileSystem fs = abortTxnFile.getFileSystem(conf); + try (FSDataInputStream stream = fs.open(abortTxnFile);) { + input = IOUtils.toString(stream, Charset.defaultCharset()); + } + return unflattenListFromString(input); + } + + private static String flattenListToString(Set<Long> list) { + return list.stream() + .map(Object::toString) + .collect(Collectors.joining(FILE_ENTRY_SEPARATOR)); + } + + private static List<Long> unflattenListFromString(String input) { + List<Long> ret = new ArrayList<>(); + for (String val : input.replaceAll(System.lineSeparator(), "").trim().split(FILE_ENTRY_SEPARATOR)) { + ret.add(Long.parseLong(val)); + } + return ret; + } + /** * Gets the source & target event id from the event ack file * @param dumpPath the dump path @@ -201,19 +266,17 @@ public class OptimisedBootstrapUtils { } /** - * Prepares the table diff file, with tables modified post the specified event id. - * @param eventId the event id after which tables should be modified + * Returns list of notificationEvents starting from eventId that are related to the database. + * @param eventId Starting eventId * @param hiveDb the hive object * @param work the load work - * @param conf hive configuration * @throws Exception */ - public static void prepareTableDiffFile(Long eventId, Hive hiveDb, ReplLoadWork work, HiveConf conf) - throws Exception { - // Get the notification events. + public static List<NotificationEvent> getListOfNotificationEvents(Long eventId, Hive hiveDb, + ReplLoadWork work) throws Exception { List<NotificationEvent> notificationEvents = - hiveDb.getMSC().getNextNotification(eventId - 1, -1, new DatabaseAndTableFilter(work.dbNameToLoadIn, null)) - .getEvents(); + hiveDb.getMSC().getNextNotification(eventId - 1, -1, + new DatabaseAndTableFilter(work.dbNameToLoadIn, null)).getEvents(); // Check the first eventId fetched is the same as what we fed, to ensure the events post that hasn't expired. if (notificationEvents.get(0).getEventId() != eventId) { @@ -222,6 +285,19 @@ public class OptimisedBootstrapUtils { // Remove the first one, it is already loaded, we fetched it to confirm the notification events post that haven't // expired. notificationEvents.remove(0); + return notificationEvents; + } + + /** + * Prepares the table diff file, with tables modified post the specified event id. + * @param notificationEvents Events that can possibly contain table DDL/DML metadata. + * @param hiveDb the hive object + * @param work the load work + * @param conf hive configuration + * @throws Exception + */ + public static void prepareTableDiffFile(List<NotificationEvent> notificationEvents, Hive hiveDb, + ReplLoadWork work, HiveConf conf) throws Exception { HashSet<String> modifiedTables = new HashSet<>(); for (NotificationEvent event : notificationEvents) { String tableName = event.getTableName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 667ede3ca74..ee33debe41a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.SQLAllTableConstraints; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter; @@ -61,10 +60,12 @@ import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; @@ -142,6 +143,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirs import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.getOpenTxns; import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots; import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getDFS; import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getListFromFileList; @@ -253,7 +255,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } else { // We should be here only if TableDiff is Present. boolean isTableDiffDirectoryPresent = - checkFileExists(previousValidHiveDumpPath.getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY); + checkFileExists(previousValidHiveDumpPath.getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY); + boolean isAbortTxnsListPresent = + checkFileExists(previousValidHiveDumpPath.getParent(), conf, OptimisedBootstrapUtils.ABORT_TXNS_FILE); assert isTableDiffDirectoryPresent; @@ -267,6 +271,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // Get the tables to be bootstrapped from the table diff tablesForBootstrap = getTablesFromTableDiffFile(previousValidHiveDumpPath.getParent(), conf); + if (isAbortTxnsListPresent) { + abortReplCreatedTxnsPriorToFailover(previousValidHiveDumpPath.getParent(), conf); + } // Generate the bootstrapped table list and put it in the new dump directory for the load to consume. createBootstrapTableList(currentDumpPath, tablesForBootstrap, conf); @@ -327,6 +334,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return 0; } + private void abortReplCreatedTxnsPriorToFailover(Path dumpPath, HiveConf conf) throws LockException, IOException { + List<Long> replCreatedTxnsToAbort = OptimisedBootstrapUtils.getTxnIdFromAbortTxnsFile(dumpPath, conf); + String replPolicy = HiveUtils.getReplPolicy(work.dbNameOrPattern); + HiveTxnManager hiveTxnManager = getTxnMgr(); + for (Long txnId : replCreatedTxnsToAbort) { + LOG.info("Rolling back Repl_Created txns:" + replCreatedTxnsToAbort.toString() + " opened prior to failover."); + hiveTxnManager.replRollbackTxn(replPolicy, txnId); + } + } + private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir, boolean isPrevFailoverReadyMarkerPresent) throws HiveException, IOException { FileSystem fs = previousValidHiveDumpDir.getFileSystem(conf); @@ -751,7 +768,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { work.setFailoverMetadata(fmd); return; } - List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns), work.dbNameOrPattern); + HiveTxnManager hiveTxnManager = getTxnMgr(); + List<Long> txnsForDb = getOpenTxns(hiveTxnManager, hiveTxnManager.getValidTxns(excludedTxns), work.dbNameOrPattern); if (!txnsForDb.isEmpty()) { LOG.debug("Going to abort transactions: {} for database: {}.", txnsForDb, work.dbNameOrPattern); hiveDb.abortTransactions(txnsForDb); @@ -762,7 +780,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { List<Long> openTxns = getOpenTxns(allValidTxns); fmd.setOpenTxns(openTxns); fmd.setTxnsWithoutLock(getTxnsNotPresentInHiveLocksTable(openTxns)); - txnsForDb = getOpenTxns(allValidTxns, work.dbNameOrPattern); + txnsForDb = getOpenTxns(hiveTxnManager, allValidTxns, work.dbNameOrPattern); if (!txnsForDb.isEmpty()) { LOG.debug("Going to abort transactions: {} for database: {}.", txnsForDb, work.dbNameOrPattern); hiveDb.abortTransactions(txnsForDb); @@ -1483,33 +1501,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return !showLocksResponse.getLocks().isEmpty(); } - List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws LockException { - HiveLockManager lockManager = getTxnMgr().getLockManager(); - long[] invalidTxns = validTxnList.getInvalidTransactions(); - List<Long> openTxns = new ArrayList<>(); - Set<Long> dbTxns = new HashSet<>(); - if (lockManager instanceof DbLockManager) { - ShowLocksRequest request = new ShowLocksRequest(); - request.setDbname(dbName.toLowerCase()); - ShowLocksResponse showLocksResponse = ((DbLockManager)lockManager).getLocks(request); - for (ShowLocksResponseElement showLocksResponseElement : showLocksResponse.getLocks()) { - dbTxns.add(showLocksResponseElement.getTxnid()); - } - for (long invalidTxn : invalidTxns) { - if (dbTxns.contains(invalidTxn) && !validTxnList.isTxnAborted(invalidTxn)) { - openTxns.add(invalidTxn); - } - } - } else { - for (long invalidTxn : invalidTxns) { - if (!validTxnList.isTxnAborted(invalidTxn)) { - openTxns.add(invalidTxn); - } - } - } - return openTxns; - } - // Get list of valid transactions for Repl Dump. Also wait for a given amount of time for the // open transactions to finish. Abort any open transactions after the wait is over. String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveException { @@ -1522,7 +1513,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // of time to see if all open txns < current txn is getting aborted/committed. If not, then // we forcefully abort those txns just like AcidHouseKeeperService. //Exclude readonly and repl created tranasactions - ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns); + HiveTxnManager hiveTxnManager = getTxnMgr(); + ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns); while (System.currentTimeMillis() < waitUntilTime) { //check if no open txns at all List<Long> openTxnListForAllDbs = getOpenTxns(validTxnList); @@ -1537,7 +1529,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { if (getTxnsNotPresentInHiveLocksTable(openTxnListForAllDbs).isEmpty()) { //If all open txns have been inserted in the hive locks table, we just need to check for the db under replication // If there are no txns which are open for the given db under replication, then just return it. - if (getOpenTxns(validTxnList, work.dbNameOrPattern).isEmpty()) { + if (getOpenTxns(hiveTxnManager, validTxnList, work.dbNameOrPattern).isEmpty()) { return validTxnList.toString(); } } @@ -1547,17 +1539,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } catch (InterruptedException e) { LOG.info("REPL DUMP thread sleep interrupted", e); } - validTxnList = getTxnMgr().getValidTxns(excludedTxns); + validTxnList = hiveTxnManager.getValidTxns(excludedTxns); } // After the timeout just force abort the open txns if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) { - List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern); + List<Long> openTxns = getOpenTxns(hiveTxnManager, validTxnList, work.dbNameOrPattern); if (!openTxns.isEmpty()) { //abort only write transactions for the db under replication if abort transactions is enabled. hiveDb.abortTransactions(openTxns); - validTxnList = getTxnMgr().getValidTxns(excludedTxns); - openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern); + validTxnList = hiveTxnManager.getValidTxns(excludedTxns); + openTxns = getOpenTxns(hiveTxnManager, validTxnList, work.dbNameOrPattern); if (!openTxns.isEmpty()) { LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + "However, this is rare case that shouldn't happen.", openTxns); @@ -1577,17 +1569,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) ? SLEEP_TIME_FOR_TESTS : SLEEP_TIME; } - private List<Long> getOpenTxns(ValidTxnList validTxnList) { - long[] invalidTxns = validTxnList.getInvalidTransactions(); - List<Long> openTxns = new ArrayList<>(); - for (long invalidTxn : invalidTxns) { - if (!validTxnList.isTxnAborted(invalidTxn)) { - openTxns.add(invalidTxn); - } - } - return openTxns; - } - private ReplicationSpec getNewReplicationSpec(String evState, String objState, boolean isMetadataOnly) { return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 2ef04e2a306..bcef4fae57f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.fs.Options; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc; import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc; import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; import org.apache.thrift.TException; import com.google.common.collect.Collections2; @@ -89,6 +93,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.LinkedList; +import java.util.Arrays; +import java.util.Set; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY; @@ -107,6 +113,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { private static final long serialVersionUID = 1L; private final static int ZERO_TASKS = 0; private final String STAGE_NAME = "REPL_LOAD"; + private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED); @Override public String getName() { @@ -724,9 +731,23 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { } boolean isTableDiffPresent = checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY); + boolean isAbortTxnsListPresent = + checkFileExists(new Path(work.dumpDirectory).getParent(), conf, OptimisedBootstrapUtils.ABORT_TXNS_FILE); + Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]); + List<NotificationEvent> notificationEvents = OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work); + if (!isAbortTxnsListPresent) { + //Abort the ongoing transactions(opened prior to failover) for the target database. + HiveTxnManager hiveTxnManager = getTxnMgr(); + ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns); + Set<Long> allOpenTxns = new HashSet<>(ReplUtils.getOpenTxns(validTxnList)); + abortOpenTxnsForDatabase(hiveTxnManager, validTxnList, work.dbNameToLoadIn, getHive()); + //Re-fetch the list of notification events post failover eventId. + notificationEvents = OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work); + OptimisedBootstrapUtils.prepareAbortTxnsFile(notificationEvents, allOpenTxns, + new Path(work.dumpDirectory).getParent(), conf); + } if (!isTableDiffPresent) { - Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]); - prepareTableDiffFile(eventId, getHive(), work, conf); + prepareTableDiffFile(notificationEvents, getHive(), work, conf); } if (this.childTasks == null) { this.childTasks = new ArrayList<>(); @@ -849,6 +870,22 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { return 0; } + private void abortOpenTxnsForDatabase(HiveTxnManager hiveTxnManager, ValidTxnList validTxnList, String dbName, + Hive hiveDb) throws HiveException { + List<Long> openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList, dbName); + if (!openTxns.isEmpty()) { + LOG.info("Rolling back write txns:" + openTxns.toString() + " for the database: " + dbName); + //abort only write transactions for the current database if abort transactions is enabled. + hiveDb.abortTransactions(openTxns); + validTxnList = hiveTxnManager.getValidTxns(excludedTxns); + openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList, dbName); + if (!openTxns.isEmpty()) { + LOG.warn("Unable to force abort all the open txns: {}.", openTxns); + throw new IllegalStateException("Failover triggered abort txns request failed for unknown reasons."); + } + } + } + private Database getSourceDbMetadata() throws IOException, SemanticException { Path dbMetadata = new Path(work.dumpDirectory, EximUtil.METADATA_PATH_NAME); BootstrapEventsIterator itr = new BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index d059e6c2d5b..f0330e021e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; @@ -31,6 +32,12 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; @@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.exec.util.Retryable; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -78,6 +89,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Base64; +import java.util.Set; import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID; import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME; @@ -358,6 +370,55 @@ public class ReplUtils { return parameters != null && ReplConst.TRUE.equalsIgnoreCase(parameters.get(ReplConst.REPL_FIRST_INC_PENDING_FLAG)); } + public static List<Long> getOpenTxns(ValidTxnList validTxnList) { + long[] invalidTxns = validTxnList.getInvalidTransactions(); + List<Long> openTxns = new ArrayList<>(); + for (long invalidTxn : invalidTxns) { + if (!validTxnList.isTxnAborted(invalidTxn)) { + openTxns.add(invalidTxn); + } + } + return openTxns; + } + + public static List<Long> getOpenTxns(HiveTxnManager hiveTxnManager, ValidTxnList validTxnList, String dbName) throws LockException { + HiveLockManager lockManager = hiveTxnManager.getLockManager(); + long[] invalidTxns = validTxnList.getInvalidTransactions(); + List<Long> openTxns = new ArrayList<>(); + Set<Long> dbTxns = new HashSet<>(); + if (lockManager instanceof DbLockManager) { + ShowLocksRequest request = new ShowLocksRequest(); + request.setDbname(dbName.toLowerCase()); + ShowLocksResponse showLocksResponse = ((DbLockManager)lockManager).getLocks(request); + for (ShowLocksResponseElement showLocksResponseElement : showLocksResponse.getLocks()) { + dbTxns.add(showLocksResponseElement.getTxnid()); + } + for (long invalidTxn : invalidTxns) { + if (dbTxns.contains(invalidTxn) && !validTxnList.isTxnAborted(invalidTxn)) { + openTxns.add(invalidTxn); + } + } + } else { + for (long invalidTxn : invalidTxns) { + if (!validTxnList.isTxnAborted(invalidTxn)) { + openTxns.add(invalidTxn); + } + } + } + return openTxns; + } + + public static MessageDeserializer getEventDeserializer(NotificationEvent event) { + try { + return MessageFactory.getInstance(event.getMessageFormat()).getDeserializer(); + } catch (Exception e) { + String message = + "could not create appropriate messageFactory for format " + event.getMessageFormat(); + LOG.error(message, e); + throw new IllegalStateException(message, e); + } + } + public static EnvironmentContext setReplDataLocationChangedFlag(EnvironmentContext envContext) { if (envContext == null) { envContext = new EnvironmentContext(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index 6a59b2f2d8e..f488b8577f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -26,8 +26,8 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -56,14 +56,7 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand AbstractEventHandler(NotificationEvent event) { this.event = event; - try { - deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer(); - } catch (Exception e) { - String message = - "could not create appropriate messageFactory for format " + event.getMessageFormat(); - LOG.error(message, e); - throw new IllegalStateException(message, e); - } + deserializer = ReplUtils.getEventDeserializer(event); eventMessage = eventMessage(event.getMessage()); eventMessageAsJSON = eventMessageAsJSON(eventMessage); }