Repository: hive Updated Branches: refs/heads/master 8cd9d3f7f -> 81dff07cb
HIVE-20264: Bootstrap repl dump with concurrent write and drop of ACID table makes target inconsistent (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81dff07c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81dff07c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81dff07c Branch: refs/heads/master Commit: 81dff07cba2106c5d19f57701c15b62a5b773e63 Parents: 8cd9d3f Author: Sankar Hariappan <sank...@apache.org> Authored: Fri Aug 10 22:26:24 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Fri Aug 10 22:26:24 2018 +0530 ---------------------------------------------------------------------- .../TestReplicationScenariosAcidTables.java | 79 ++++++++++++++++++++ .../hive/metastore/txn/TestTxnHandler.java | 5 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 49 +++++++----- 3 files changed, 113 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 3040f6c..f074428 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -364,6 +364,85 @@ public class TestReplicationScenariosAcidTables { } @Test + public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable { + HiveConf primaryConf = primary.getConf(); + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)"); + + // Perform concurrent write + drop on the acid table t1 when bootstrap dump in progress. Bootstrap + // won't dump the table but the subsequent incremental repl with new table with same name should be seen. + BehaviourInjection<CallerArguments, Boolean> callerInjectedBehavior + = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (injectionPathCalled) { + nonInjectedPathCalled = true; + } else { + // Insert another row to t1 and drop the table from another txn when bootstrap dump in progress. + injectionPathCalled = true; + Thread t = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Entered new thread"); + IDriver driver = DriverFactory.newDriver(primaryConf); + SessionState.start(new CliSessionState(primaryConf)); + CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)"); + boolean success = (ret.getException() == null); + assertTrue(success); + ret = driver.run("drop table " + primaryDbName + ".t1"); + success = (ret.getException() == null); + assertTrue(success); + LOG.info("Exit new thread success - {}", success, ret.getException()); + } + }); + t.start(); + LOG.info("Created new thread {}", t.getName()); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return true; + } + }; + + InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior); + WarehouseInstance.Tuple bootstrapDump = null; + try { + bootstrapDump = primary.dump(primaryDbName, null); + callerInjectedBehavior.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } + + // Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped. + replica.load(replicatedDbName, bootstrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("show tables") + .verifyResult(null); + + // Create another ACID table with same name and insert a row. It should be properly replicated. + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(100)") + .dump(primaryDbName, bootstrapDump.lastReplicationId); + + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResult("100"); + } + + @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index c0725ad..be37b2a 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -1667,12 +1667,15 @@ public class TestTxnHandler { allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl2"); allocMsg.setReplPolicy("destdb.*"); allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId); + + // This is an idempotent case when repl flow forcefully allocate write id if it doesn't match + // the next write id. try { txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds(); } catch (IllegalStateException e) { failed = true; } - assertTrue(failed); + assertFalse(failed); replAbortTxnForTest(srcTxnIdList, "destdb.*"); http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f5e4905..8edc387 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1404,7 +1404,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (srcTxnIds.size() != txnIds.size()) { // Idempotent case where txn was already closed but gets allocate write id event. // So, just ignore it and return empty list. - LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() + + LOG.info("Idempotent case: Target txn id is missing for source txn id : " + srcTxnIds.toString() + " and repl policy " + rqst.getReplPolicy()); return new AllocateTableWriteIdsResponse(txnToWriteIds); } @@ -1422,10 +1422,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throw new RuntimeException("This should never happen for txnIds: " + txnIds); } - long writeId; - String s; - long allocatedTxnsCount = 0; - long txnId; List<String> queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -1440,6 +1436,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "t2w_txnid", false, false); + + long allocatedTxnsCount = 0; + long txnId; + long writeId = 0; for (String query : queries) { LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); @@ -1462,12 +1462,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return new AllocateTableWriteIdsResponse(txnToWriteIds); } + long srcWriteId = 0; + if (rqst.isSetReplPolicy()) { + // In replication flow, we always need to allocate write ID equal to that of source. + assert(srcTxnToWriteIds != null); + srcWriteId = srcTxnToWriteIds.get(0).getWriteId(); + } + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which does not have write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID - s = sqlGenerator.addForUpdateClause( + String s = sqlGenerator.addForUpdateClause( "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + " and nwi_table = " + quoteString(tblName)); LOG.debug("Going to execute query <" + s + ">"); @@ -1475,19 +1482,33 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (!rs.next()) { // First allocation of write id should add the table to the next_write_id meta table // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here - writeId = 1; + // For repl flow, we need to force set the incoming write id. + writeId = (srcWriteId > 0) ? srcWriteId : 1; s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")"; + + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")"; LOG.debug("Going to execute insert <" + s + ">"); stmt.execute(s); } else { - writeId = rs.getLong(1); + long nextWriteId = rs.getLong(1); + writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; + // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds) + " where nwi_database = " + quoteString(dbName) + " and nwi_table = " + quoteString(tblName); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); + + // For repl flow, if the source write id is mismatching with target next write id, then current + // metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID. + // This is possible in case of first incremental repl after bootstrap where concurrent write + // and drop table was performed at source during bootstrap dump. + if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) { + s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName) + + " and t2w_table = " + quoteString(tblName); + LOG.debug("Going to execute delete <" + s + ">"); + stmt.executeUpdate(s); + } } // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated @@ -1500,16 +1521,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { writeId++; } - if (rqst.isSetReplPolicy()) { - int lastIdx = txnToWriteIds.size()-1; - if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) || - (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) { - LOG.error("Allocated write id range {} is not matching with the input write id range {}.", - txnToWriteIds, srcTxnToWriteIds); - throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds); - } - } - // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids List<String> inserts = sqlGenerator.createInsertValuesStmt( "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);