This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 67c2d4910ff HIVE-26316: Handle dangling open txns on both src & tgt in 
unplanned failover. (Haymant Mangla reviewed by Peter Vary) (#3367)
67c2d4910ff is described below

commit 67c2d4910ff17c694653eb8bd9c9ed2405cec38b
Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com>
AuthorDate: Thu Jun 16 15:11:22 2022 +0530

    HIVE-26316: Handle dangling open txns on both src & tgt in unplanned 
failover. (Haymant Mangla reviewed by Peter Vary) (#3367)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 141 ++++++++++++++++++++-
 .../hive/ql/exec/repl/OptimisedBootstrapUtils.java |  92 ++++++++++++--
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  77 +++++------
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  41 +++++-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  61 +++++++++
 .../repl/dump/events/AbstractEventHandler.java     |  11 +-
 6 files changed, 349 insertions(+), 74 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 673e41b3065..dd6821dc578 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -24,12 +24,16 @@ import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.jetbrains.annotations.NotNull;
@@ -55,7 +59,6 @@ import static 
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI
 import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
 import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
 import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
-import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
 import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
 import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
 
@@ -71,6 +74,10 @@ import static org.junit.Assert.fail;
 public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosAcidTables {
 
   String extraPrimaryDb;
+  HiveConf primaryConf;
+  TxnStore txnHandler;
+  List<Long> tearDownTxns = new ArrayList<>();
+  List<Long> tearDownLockIds = new ArrayList<>();
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -90,10 +97,19 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
   public void setup() throws Throwable {
     super.setup();
     extraPrimaryDb = "extra_" + primaryDbName;
+    primaryConf = primary.getConf();
+    txnHandler = TxnUtils.getTxnStore(primary.getConf());
   }
 
   @After
   public void tearDown() throws Throwable {
+    if (!tearDownTxns.isEmpty()) {
+      //Abort the left out transactions which might not be completed due to 
some test failures.
+      txnHandler.abortTxns(new AbortTxnsRequest(tearDownTxns));
+    }
+    //Release the unreleased locks acquired during tests. Although, we 
specifically release the locks when not required.
+    //But there may be case when test failed and locks are left in dangling 
state.
+    releaseLocks(txnHandler, tearDownLockIds);
     primary.run("drop database if exists " + extraPrimaryDb + " cascade");
     super.tearDown();
   }
@@ -468,47 +484,56 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
 
   @Test
   public void testReverseBootstrap() throws Throwable {
-    HiveConf primaryConf = primary.getConf();
-    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
     List<String> withClause = setUpFirstIterForOptimisedBootstrap();
 
     // Open 3 txns for Database which is not under replication
     int numTxnsForSecDb = 3;
     List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, 
primaryConf);
+    tearDownTxns.addAll(txnsForSecDb);
 
     Map<String, Long> tablesInSecDb = new HashMap<>();
-    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
-    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
     List<Long> lockIdsForSecDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
             tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+    tearDownLockIds.addAll(lockIdsForSecDb);
 
     //Open 2 txns for Primary Db
     int numTxnsForPrimaryDb = 2;
     List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, 
primaryConf);
+    tearDownTxns.addAll(txnsForSourceDb);
 
     // Allocate write ids for both tables of source database.
     Map<String, Long> tablesInSourceDb = new HashMap<>();
-    tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4);
+    tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
     tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
-    allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, 
tablesInSourceDb, txnHandler,
+    List<Long> lockIdsForSourceDb = 
allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, 
txnHandler,
             txnsForSourceDb, replica.getConf());
+    tearDownLockIds.addAll(lockIdsForSourceDb);
 
     //Open 1 txn with no hive locks acquired
     List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+    tearDownTxns.addAll(txnsWithNoLocks);
 
     // Do a reverse second dump, this should do a bootstrap dump for the 
tables in the table_diff and incremental for
     // rest.
+    List<Long> allReplCreatedTxnsOnSource = getReplCreatedTxns();
+    tearDownTxns.addAll(allReplCreatedTxnsOnSource);
 
     
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
     WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
 
+    verifyAllOpenTxnsAborted(allReplCreatedTxnsOnSource, primaryConf);
+
     //Verify that openTxns for sourceDb were aborted before proceeding with 
bootstrap dump.
     verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
     verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
     verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
     txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
     txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
     releaseLocks(txnHandler, lockIdsForSecDb);
+    releaseLocks(txnHandler, lockIdsForSecDb);
 
     String hiveDumpDir = tuple.dumpLocation + File.separator + 
ReplUtils.REPL_HIVE_BASE_DIR;
     // _bootstrap directory should be created as bootstrap enabled on external 
tables.
@@ -829,6 +854,35 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
     primary.dump(primaryDbName, withClause);
     replica.load(replicatedDbName, primaryDbName, withClause);
 
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, 
primaryConf);
+    tearDownTxns.addAll(txnsForSecDb);
+
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+    tearDownLockIds.addAll(lockIdsForSecDb);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, 
primaryConf);
+    tearDownTxns.addAll(txnsForSourceDb);
+
+    // Allocate write ids for both tables of source database.
+    Map<String, Long> tablesInSourceDb = new HashMap<>();
+    tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
+    tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
+    List<Long> lockIdsForSourceDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, 
txnHandler,
+            txnsForSourceDb, primary.getConf());
+    tearDownLockIds.addAll(lockIdsForSourceDb);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+    tearDownTxns.addAll(txnsWithNoLocks);
+
     // Create 4 managed tables and do a dump & load.
     WarehouseInstance.Tuple tuple =
         primary.run("use " + primaryDbName)
@@ -850,6 +904,49 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
         .verifyResult("t1").run("show tables like 
't2'").verifyResult("t2").run("show tables like 't3'")
         .verifyResult("t3").run("show tables like 
't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName);
 
+    String forwardReplPolicy = HiveUtils.getReplPolicy(replicatedDbName);
+    List<Long> targetReplCreatedTxnIds = new ArrayList<>();
+    for (Long txn: txnsForSecDb) {
+      targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, 
txn));
+    }
+    for (Long txn: txnsForSourceDb) {
+      targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, 
txn));
+    }
+    for (Long txn: txnsWithNoLocks) {
+      targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy, 
txn));
+    }
+
+    verifyAllOpenTxnsNotAborted(targetReplCreatedTxnIds, primaryConf);
+
+    //Open New transactions on original source cluster post it went down.
+
+    // Open 1 txn for secondary Database
+    List<Long> newTxnsForSecDb = openTxns(1, txnHandler, primaryConf);
+    tearDownTxns.addAll(newTxnsForSecDb);
+
+    Map<String, Long> newTablesForSecDb = new HashMap<>();
+    newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
+    newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
+    List<Long> newLockIdsForSecDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+            newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
+    tearDownLockIds.addAll(newLockIdsForSecDb);
+
+    //Open 1 txn for Primary Db
+    List<Long> newTxnsForSourceDb = openTxns(1, txnHandler, primaryConf);
+    tearDownTxns.addAll(newTxnsForSourceDb);
+
+    // Allocate write ids for both tables of source database.
+    Map<String, Long> newTablesInSourceDb = new HashMap<>();
+    newTablesInSourceDb.put("t1", (long) 5);
+    newTablesInSourceDb.put("t5", (long) 3);
+    List<Long> newLockIdsForSourceDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, 
txnHandler,
+            newTxnsForSourceDb, primary.getConf());
+    tearDownLockIds.addAll(newLockIdsForSourceDb);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> newTxnsWithNoLock = openTxns(1, txnHandler, primaryConf);
+    tearDownTxns.addAll(newTxnsWithNoLock);
+
     // Do some modifications on original source cluster. The diff 
becomes(tnew_managed, t1, t2, t3)
     primary.run("use " + primaryDbName).run("create table tnew_managed (id 
int) clustered by(id) into 3 buckets " +
                     "stored as orc tblproperties (\"transactional\"=\"true\")")
@@ -886,14 +983,44 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
     // Do a load, this should create a table_diff_complete directory
     primary.load(primaryDbName, replicatedDbName, withClause);
 
+    verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    verifyAllOpenTxnsAborted(newTxnsForSourceDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(newTxnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(newTxnsWithNoLock, primaryConf);
+
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    releaseLocks(txnHandler, lockIdsForSecDb);
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    txnHandler.abortTxns(new AbortTxnsRequest(newTxnsForSecDb));
+    releaseLocks(txnHandler, newLockIdsForSecDb);
+    txnHandler.abortTxns(new AbortTxnsRequest(newTxnsWithNoLock));
+
     // Check the table diff directory exist.
     assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
         replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
 
+    assertTrue(new Path(tuple.dumpLocation, 
OptimisedBootstrapUtils.ABORT_TXNS_FILE).toString() + " doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, 
OptimisedBootstrapUtils.ABORT_TXNS_FILE)));
+
+    List<Long> txnsInAbortTxnFile = OptimisedBootstrapUtils.
+            getTxnIdFromAbortTxnsFile(new Path(tuple.dumpLocation), 
primaryConf);
+    assertTrue (txnsInAbortTxnFile.containsAll(txnsForSourceDb));
+    assertTrue (txnsInAbortTxnFile.containsAll(txnsForSecDb));
+    assertTrue (txnsInAbortTxnFile.containsAll(txnsWithNoLocks));
+    assertEquals (txnsInAbortTxnFile.size(), txnsForSecDb.size() + 
txnsForSourceDb.size() + txnsWithNoLocks.size());
+
     // Check the table diff has all the modified table, including the dropped 
and empty ones
     HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, 
conf);
     assertTrue("Table Diff Contains " + tableDiffEntries,
         tableDiffEntries.containsAll(Arrays.asList("tnew_managed", "t1", "t2", 
"t3")));
     return withClause;
   }
+
+  List<Long> getReplCreatedTxns() throws MetaException {
+    List<TxnType> excludedTxns = Arrays.asList(TxnType.DEFAULT, 
TxnType.READ_ONLY, TxnType.COMPACTION,
+            TxnType.MATER_VIEW_REBUILD, TxnType.SOFT_DELETE);
+    return txnHandler.getOpenTxns(excludedTxns).getOpen_txns();
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index f3aa5302832..7074226e14f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -29,6 +29,11 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -68,6 +73,9 @@ public class OptimisedBootstrapUtils {
   /** table diff directory when complete */
   public static final String TABLE_DIFF_COMPLETE_DIRECTORY = 
"table_diff_complete";
 
+  /** abort Txns file which contains all the txns that needs to be aborted on 
new source cluster(initial target)*/
+  public static final String ABORT_TXNS_FILE = "abort_txns";
+
   /** event ack file which contains the event id till which the cluster was 
last loaded. */
   public static final String EVENT_ACK_FILE = "event_ack";
 
@@ -90,6 +98,63 @@ public class OptimisedBootstrapUtils {
     return fs.exists(new Path(dumpPath, fileName));
   }
 
+  public static void prepareAbortTxnsFile(List<NotificationEvent> 
notificationEvents, Set<Long> allOpenTxns,
+                                          Path dumpPath, HiveConf conf) throws 
SemanticException {
+    if (notificationEvents.size() == 0) {
+      return;
+    }
+    Set<Long> txnsOpenedPostCurrEventId = new HashSet<>();
+    MessageDeserializer deserializer = 
ReplUtils.getEventDeserializer(notificationEvents.get(0));
+    for (NotificationEvent event: notificationEvents) {
+      switch (event.getEventType()) {
+        case MessageBuilder.OPEN_TXN_EVENT:
+          OpenTxnMessage openTxnMessage = 
deserializer.getOpenTxnMessage(event.getMessage());
+          txnsOpenedPostCurrEventId.addAll(openTxnMessage.getTxnIds());
+          allOpenTxns.removeAll(openTxnMessage.getTxnIds());
+          break;
+        case MessageBuilder.ABORT_TXN_EVENT:
+          AbortTxnMessage abortTxnMessage = 
deserializer.getAbortTxnMessage(event.getMessage());
+          if (!txnsOpenedPostCurrEventId.contains(abortTxnMessage.getTxnId())) 
{
+            allOpenTxns.add(abortTxnMessage.getTxnId());
+          }
+          break;
+        case MessageBuilder.COMMIT_TXN_EVENT:
+          CommitTxnMessage commitTxnMessage = 
deserializer.getCommitTxnMessage(event.getMessage());
+          if 
(!txnsOpenedPostCurrEventId.contains(commitTxnMessage.getTxnId())) {
+            allOpenTxns.add(commitTxnMessage.getTxnId());
+          }
+          break;
+      }
+    }
+    if (!allOpenTxns.isEmpty()) {
+      Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath, 
ABORT_TXNS_FILE), conf);
+    }
+  }
+
+  public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf 
conf) throws IOException {
+    String input;
+    Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE);
+    FileSystem fs = abortTxnFile.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(abortTxnFile);) {
+      input = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return unflattenListFromString(input);
+  }
+
+  private static String flattenListToString(Set<Long> list) {
+    return list.stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(FILE_ENTRY_SEPARATOR));
+  }
+
+  private static List<Long> unflattenListFromString(String input) {
+    List<Long> ret = new ArrayList<>();
+    for (String val : input.replaceAll(System.lineSeparator(), 
"").trim().split(FILE_ENTRY_SEPARATOR)) {
+      ret.add(Long.parseLong(val));
+    }
+    return ret;
+  }
+
   /**
    * Gets the source &amp; target event id  from the event ack file
    * @param dumpPath the dump path
@@ -201,19 +266,17 @@ public class OptimisedBootstrapUtils {
   }
 
   /**
-   * Prepares the table diff file, with tables modified post the specified 
event id.
-   * @param eventId the event id after which tables should be modified
+   * Returns list of notificationEvents starting from eventId that are related 
to the database.
+   * @param eventId Starting eventId
    * @param hiveDb the hive object
    * @param work the load work
-   * @param conf hive configuration
    * @throws Exception
    */
-  public static void prepareTableDiffFile(Long eventId, Hive hiveDb, 
ReplLoadWork work, HiveConf conf)
-      throws Exception {
-    // Get the notification events.
+  public static List<NotificationEvent> getListOfNotificationEvents(Long 
eventId, Hive hiveDb,
+                                                                    
ReplLoadWork work) throws Exception {
     List<NotificationEvent> notificationEvents =
-        hiveDb.getMSC().getNextNotification(eventId - 1, -1, new 
DatabaseAndTableFilter(work.dbNameToLoadIn, null))
-            .getEvents();
+        hiveDb.getMSC().getNextNotification(eventId - 1, -1,
+            new DatabaseAndTableFilter(work.dbNameToLoadIn, null)).getEvents();
 
     // Check the first eventId fetched is the same as what we fed, to ensure 
the events post that hasn't expired.
     if (notificationEvents.get(0).getEventId() != eventId) {
@@ -222,6 +285,19 @@ public class OptimisedBootstrapUtils {
     // Remove the first one, it is already loaded, we fetched it to confirm 
the notification events post that haven't
     // expired.
     notificationEvents.remove(0);
+    return notificationEvents;
+  }
+
+  /**
+   * Prepares the table diff file, with tables modified post the specified 
event id.
+   * @param notificationEvents Events that can possibly contain table DDL/DML 
metadata.
+   * @param hiveDb the hive object
+   * @param work the load work
+   * @param conf hive configuration
+   * @throws Exception
+   */
+  public static void prepareTableDiffFile(List<NotificationEvent> 
notificationEvents, Hive hiveDb,
+                                          ReplLoadWork work, HiveConf conf) 
throws Exception {
     HashSet<String> modifiedTables = new HashSet<>();
     for (NotificationEvent event : notificationEvents) {
       String tableName = event.getTableName();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 667ede3ca74..ee33debe41a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.SQLAllTableConstraints;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
@@ -61,10 +60,12 @@ import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -142,6 +143,7 @@ import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirs
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static 
org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
 import static 
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.getOpenTxns;
 import static 
org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
 import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getDFS;
 import static 
org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getListFromFileList;
@@ -253,7 +255,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
             } else {
               // We should be here only if TableDiff is Present.
               boolean isTableDiffDirectoryPresent =
-                  checkFileExists(previousValidHiveDumpPath.getParent(), conf, 
TABLE_DIFF_COMPLETE_DIRECTORY);
+                      checkFileExists(previousValidHiveDumpPath.getParent(), 
conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+              boolean isAbortTxnsListPresent =
+                      checkFileExists(previousValidHiveDumpPath.getParent(), 
conf, OptimisedBootstrapUtils.ABORT_TXNS_FILE);
 
               assert isTableDiffDirectoryPresent;
 
@@ -267,6 +271,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
 
               // Get the tables to be bootstrapped from the table diff
               tablesForBootstrap = 
getTablesFromTableDiffFile(previousValidHiveDumpPath.getParent(), conf);
+              if (isAbortTxnsListPresent) {
+                
abortReplCreatedTxnsPriorToFailover(previousValidHiveDumpPath.getParent(), 
conf);
+              }
 
               // Generate the bootstrapped table list and put it in the new 
dump directory for the load to consume.
               createBootstrapTableList(currentDumpPath, tablesForBootstrap, 
conf);
@@ -327,6 +334,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     return 0;
   }
 
+  private void abortReplCreatedTxnsPriorToFailover(Path dumpPath, HiveConf 
conf) throws LockException, IOException {
+    List<Long> replCreatedTxnsToAbort = 
OptimisedBootstrapUtils.getTxnIdFromAbortTxnsFile(dumpPath, conf);
+    String replPolicy = HiveUtils.getReplPolicy(work.dbNameOrPattern);
+    HiveTxnManager hiveTxnManager = getTxnMgr();
+    for (Long txnId : replCreatedTxnsToAbort) {
+      LOG.info("Rolling back Repl_Created txns:" + 
replCreatedTxnsToAbort.toString() + " opened prior to failover.");
+      hiveTxnManager.replRollbackTxn(replPolicy, txnId);
+    }
+  }
+
   private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir, 
boolean isPrevFailoverReadyMarkerPresent)
           throws HiveException, IOException {
     FileSystem fs = previousValidHiveDumpDir.getFileSystem(conf);
@@ -751,7 +768,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       work.setFailoverMetadata(fmd);
       return;
     }
-    List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns), 
work.dbNameOrPattern);
+    HiveTxnManager hiveTxnManager = getTxnMgr();
+    List<Long> txnsForDb = getOpenTxns(hiveTxnManager, 
hiveTxnManager.getValidTxns(excludedTxns), work.dbNameOrPattern);
     if (!txnsForDb.isEmpty()) {
       LOG.debug("Going to abort transactions: {} for database: {}.", 
txnsForDb, work.dbNameOrPattern);
       hiveDb.abortTransactions(txnsForDb);
@@ -762,7 +780,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     List<Long> openTxns = getOpenTxns(allValidTxns);
     fmd.setOpenTxns(openTxns);
     fmd.setTxnsWithoutLock(getTxnsNotPresentInHiveLocksTable(openTxns));
-    txnsForDb = getOpenTxns(allValidTxns, work.dbNameOrPattern);
+    txnsForDb = getOpenTxns(hiveTxnManager, allValidTxns, 
work.dbNameOrPattern);
     if (!txnsForDb.isEmpty()) {
       LOG.debug("Going to abort transactions: {} for database: {}.", 
txnsForDb, work.dbNameOrPattern);
       hiveDb.abortTransactions(txnsForDb);
@@ -1483,33 +1501,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     return !showLocksResponse.getLocks().isEmpty();
   }
 
-  List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws 
LockException {
-    HiveLockManager lockManager = getTxnMgr().getLockManager();
-    long[] invalidTxns = validTxnList.getInvalidTransactions();
-    List<Long> openTxns = new ArrayList<>();
-    Set<Long> dbTxns = new HashSet<>();
-    if (lockManager instanceof DbLockManager) {
-      ShowLocksRequest request = new ShowLocksRequest();
-      request.setDbname(dbName.toLowerCase());
-      ShowLocksResponse showLocksResponse = 
((DbLockManager)lockManager).getLocks(request);
-      for (ShowLocksResponseElement showLocksResponseElement : 
showLocksResponse.getLocks()) {
-        dbTxns.add(showLocksResponseElement.getTxnid());
-      }
-      for (long invalidTxn : invalidTxns) {
-        if (dbTxns.contains(invalidTxn) && 
!validTxnList.isTxnAborted(invalidTxn)) {
-          openTxns.add(invalidTxn);
-        }
-      }
-    } else {
-      for (long invalidTxn : invalidTxns) {
-        if (!validTxnList.isTxnAborted(invalidTxn)) {
-          openTxns.add(invalidTxn);
-        }
-      }
-    }
-    return openTxns;
-  }
-
   // Get list of valid transactions for Repl Dump. Also wait for a given 
amount of time for the
   // open transactions to finish. Abort any open transactions after the wait 
is over.
   String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws 
HiveException {
@@ -1522,7 +1513,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     // of time to see if all open txns < current txn is getting 
aborted/committed. If not, then
     // we forcefully abort those txns just like AcidHouseKeeperService.
     //Exclude readonly and repl created tranasactions
-    ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns);
+    HiveTxnManager hiveTxnManager = getTxnMgr();
+    ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
     while (System.currentTimeMillis() < waitUntilTime) {
       //check if no open txns at all
       List<Long> openTxnListForAllDbs = getOpenTxns(validTxnList);
@@ -1537,7 +1529,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       if (getTxnsNotPresentInHiveLocksTable(openTxnListForAllDbs).isEmpty()) {
         //If all open txns have been inserted in the hive locks table, we just 
need to check for the db under replication
         // If there are no txns which are open for the given db under 
replication, then just return it.
-        if (getOpenTxns(validTxnList, work.dbNameOrPattern).isEmpty()) {
+        if (getOpenTxns(hiveTxnManager, validTxnList, 
work.dbNameOrPattern).isEmpty()) {
           return validTxnList.toString();
         }
       }
@@ -1547,17 +1539,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       } catch (InterruptedException e) {
         LOG.info("REPL DUMP thread sleep interrupted", e);
       }
-      validTxnList = getTxnMgr().getValidTxns(excludedTxns);
+      validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
     }
 
     // After the timeout just force abort the open txns
     if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) {
-      List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern);
+      List<Long> openTxns = getOpenTxns(hiveTxnManager, validTxnList, 
work.dbNameOrPattern);
       if (!openTxns.isEmpty()) {
         //abort only write transactions for the db under replication if abort 
transactions is enabled.
         hiveDb.abortTransactions(openTxns);
-        validTxnList = getTxnMgr().getValidTxns(excludedTxns);
-        openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern);
+        validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+        openTxns = getOpenTxns(hiveTxnManager, validTxnList, 
work.dbNameOrPattern);
         if (!openTxns.isEmpty()) {
           LOG.warn("REPL DUMP unable to force abort all the open txns: {} 
after timeout due to unknown reasons. " +
             "However, this is rare case that shouldn't happen.", openTxns);
@@ -1577,17 +1569,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) ? 
SLEEP_TIME_FOR_TESTS : SLEEP_TIME;
   }
 
-  private List<Long> getOpenTxns(ValidTxnList validTxnList) {
-    long[] invalidTxns = validTxnList.getInvalidTransactions();
-    List<Long> openTxns = new ArrayList<>();
-    for (long invalidTxn : invalidTxns) {
-      if (!validTxnList.isTxnAborted(invalidTxn)) {
-        openTxns.add(invalidTxn);
-      }
-    }
-    return openTxns;
-  }
-
   private ReplicationSpec getNewReplicationSpec(String evState, String 
objState,
       boolean isMetadataOnly) {
     return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, 
true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 2ef04e2a306..bcef4fae57f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import 
org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc;
 import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
 import org.apache.thrift.TException;
 import com.google.common.collect.Collections2;
@@ -89,6 +93,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.Set;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
@@ -107,6 +113,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
   private static final long serialVersionUID = 1L;
   private final static int ZERO_TASKS = 0;
   private final String STAGE_NAME = "REPL_LOAD";
+  private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, 
TxnType.REPL_CREATED);
 
   @Override
   public String getName() {
@@ -724,9 +731,23 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
       }
       boolean isTableDiffPresent =
           checkFileExists(new Path(work.dumpDirectory).getParent(), conf, 
TABLE_DIFF_COMPLETE_DIRECTORY);
+      boolean isAbortTxnsListPresent =
+              checkFileExists(new Path(work.dumpDirectory).getParent(), conf, 
OptimisedBootstrapUtils.ABORT_TXNS_FILE);
+      Long eventId = Long.parseLong(getEventIdFromFile(new 
Path(work.dumpDirectory).getParent(), conf)[0]);
+      List<NotificationEvent> notificationEvents = 
OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work);
+      if (!isAbortTxnsListPresent) {
+        //Abort the ongoing transactions(opened prior to failover) for the 
target database.
+        HiveTxnManager hiveTxnManager = getTxnMgr();
+        ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+        Set<Long> allOpenTxns = new 
HashSet<>(ReplUtils.getOpenTxns(validTxnList));
+        abortOpenTxnsForDatabase(hiveTxnManager, validTxnList, 
work.dbNameToLoadIn, getHive());
+        //Re-fetch the list of notification events post failover eventId.
+        notificationEvents = 
OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work);
+        OptimisedBootstrapUtils.prepareAbortTxnsFile(notificationEvents, 
allOpenTxns,
+                new Path(work.dumpDirectory).getParent(), conf);
+      }
       if (!isTableDiffPresent) {
-        Long eventId = Long.parseLong(getEventIdFromFile(new 
Path(work.dumpDirectory).getParent(), conf)[0]);
-        prepareTableDiffFile(eventId, getHive(), work, conf);
+        prepareTableDiffFile(notificationEvents, getHive(), work, conf);
       }
       if (this.childTasks == null) {
         this.childTasks = new ArrayList<>();
@@ -849,6 +870,22 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
     return 0;
   }
 
+  private void abortOpenTxnsForDatabase(HiveTxnManager hiveTxnManager, 
ValidTxnList validTxnList, String dbName,
+                                        Hive hiveDb) throws HiveException {
+    List<Long> openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList, 
dbName);
+    if (!openTxns.isEmpty()) {
+      LOG.info("Rolling back write txns:" + openTxns.toString() + " for the 
database: " + dbName);
+      //abort only write transactions for the current database if abort 
transactions is enabled.
+      hiveDb.abortTransactions(openTxns);
+      validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+      openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList, dbName);
+      if (!openTxns.isEmpty()) {
+        LOG.warn("Unable to force abort all the open txns: {}.", openTxns);
+        throw new IllegalStateException("Failover triggered abort txns request 
failed for unknown reasons.");
+      }
+    }
+  }
+
   private Database getSourceDbMetadata() throws IOException, SemanticException 
{
     Path dbMetadata = new Path(work.dumpDirectory, 
EximUtil.METADATA_PATH_NAME);
     BootstrapEventsIterator itr = new 
BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn,
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index d059e6c2d5b..f0330e021e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,6 +32,12 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.DDLWork;
@@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -78,6 +89,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Base64;
+import java.util.Set;
 
 import static 
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
 import static 
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
@@ -358,6 +370,55 @@ public class ReplUtils {
     return parameters != null && 
ReplConst.TRUE.equalsIgnoreCase(parameters.get(ReplConst.REPL_FIRST_INC_PENDING_FLAG));
   }
 
+  public static List<Long> getOpenTxns(ValidTxnList validTxnList) {
+    long[] invalidTxns = validTxnList.getInvalidTransactions();
+    List<Long> openTxns = new ArrayList<>();
+    for (long invalidTxn : invalidTxns) {
+      if (!validTxnList.isTxnAborted(invalidTxn)) {
+        openTxns.add(invalidTxn);
+      }
+    }
+    return openTxns;
+  }
+
+  public static List<Long> getOpenTxns(HiveTxnManager hiveTxnManager, 
ValidTxnList validTxnList, String dbName) throws LockException {
+    HiveLockManager lockManager = hiveTxnManager.getLockManager();
+    long[] invalidTxns = validTxnList.getInvalidTransactions();
+    List<Long> openTxns = new ArrayList<>();
+    Set<Long> dbTxns = new HashSet<>();
+    if (lockManager instanceof DbLockManager) {
+      ShowLocksRequest request = new ShowLocksRequest();
+      request.setDbname(dbName.toLowerCase());
+      ShowLocksResponse showLocksResponse = 
((DbLockManager)lockManager).getLocks(request);
+      for (ShowLocksResponseElement showLocksResponseElement : 
showLocksResponse.getLocks()) {
+        dbTxns.add(showLocksResponseElement.getTxnid());
+      }
+      for (long invalidTxn : invalidTxns) {
+        if (dbTxns.contains(invalidTxn) && 
!validTxnList.isTxnAborted(invalidTxn)) {
+          openTxns.add(invalidTxn);
+        }
+      }
+    } else {
+      for (long invalidTxn : invalidTxns) {
+        if (!validTxnList.isTxnAborted(invalidTxn)) {
+          openTxns.add(invalidTxn);
+        }
+      }
+    }
+    return openTxns;
+  }
+
+  public static MessageDeserializer getEventDeserializer(NotificationEvent 
event) {
+    try {
+      return 
MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+    } catch (Exception e) {
+      String message =
+              "could not create appropriate messageFactory for format " + 
event.getMessageFormat();
+      LOG.error(message, e);
+      throw new IllegalStateException(message, e);
+    }
+  }
+
   public static EnvironmentContext 
setReplDataLocationChangedFlag(EnvironmentContext envContext) {
     if (envContext == null) {
       envContext = new EnvironmentContext();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 6a59b2f2d8e..f488b8577f6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -56,14 +56,7 @@ abstract class AbstractEventHandler<T extends EventMessage> 
implements EventHand
 
   AbstractEventHandler(NotificationEvent event) {
     this.event = event;
-    try {
-      deserializer = 
MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
-    } catch (Exception e) {
-      String message =
-          "could not create appropriate messageFactory for format " + 
event.getMessageFormat();
-      LOG.error(message, e);
-      throw new IllegalStateException(message, e);
-    }
+    deserializer = ReplUtils.getEventDeserializer(event);
     eventMessage = eventMessage(event.getMessage());
     eventMessageAsJSON = eventMessageAsJSON(eventMessage);
   }

Reply via email to