HIVE-17183 : Disable rename operations during bootstrap dump (Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5663b971 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5663b971 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5663b971 Branch: refs/heads/hive-14535 Commit: 5663b971776bd3e6a6e17426875f44313f6eff9f Parents: bb4035b Author: Sankar Hariappan <mailtosank...@gmail.com> Authored: Thu Sep 7 11:07:24 2017 -0700 Committer: Thejas M Nair <the...@hortonworks.com> Committed: Thu Sep 7 11:07:24 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/parse/TestReplicationScenarios.java | 75 ++++++++++++++++++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 15 ++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 18 +++-- .../apache/hadoop/hive/ql/parse/EximUtil.java | 14 ++++ .../hadoop/hive/ql/parse/repl/dump/Utils.java | 67 +++++++++++++++++ 5 files changed, 182 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b19c1aa..9667449 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -551,6 +551,81 @@ public class TestReplicationScenarios { } @Test + public void testBootstrapWithConcurrentRename() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + String[] ptn_data = new String[]{ "eleven" , "twelve" }; + String[] empty = new String[]{}; + String ptn_locn = new Path(TEST_PATH, name + "_ptn").toUri().getPath(); + + createTestDataFile(ptn_locn, ptn_data); + run("LOAD DATA LOCAL INPATH '" + ptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + + BehaviourInjection<Table,Table> ptnedTableRenamer = new BehaviourInjection<Table,Table>(){ + boolean success = false; + + @Nullable + @Override + public Table apply(@Nullable Table table) { + if (injectionPathCalled) { + nonInjectedPathCalled = true; + } else { + // getTable is invoked after fetching the table names + injectionPathCalled = true; + Thread t = new Thread(new Runnable() { + public void run() { + try { + LOG.info("Entered new thread"); + Driver driver2 = new Driver(hconf); + SessionState.start(new CliSessionState(hconf)); + CommandProcessorResponse ret = driver2.run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)"); + success = (ret.getException() == null); + assertFalse(success); + ret = driver2.run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed"); + success = (ret.getException() == null); + assertFalse(success); + LOG.info("Exit new thread success - {}", success); + } catch (CommandNeedRetryException e) { + LOG.info("Hit Exception {} from new thread", e.getMessage()); + throw new RuntimeException(e); + } + } + }); + t.start(); + LOG.info("Created new thread {}", t.getName()); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return table; + } + }; + InjectableBehaviourObjectStore.setGetTableBehaviour(ptnedTableRenamer); + + // The intermediate rename would've failed as bootstrap dump in progress + bootstrapLoadAndVerify(dbName, replDbName); + + ptnedTableRenamer.assertInjectionsPerformed(true,true); + InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour + + // The ptned table should be there in both source and target as rename was not successful + verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driver); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror); + + // Verify if Rename after bootstrap is successful + run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)", driver); + verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClient); + run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver); + verifyIfTableNotExist(dbName, "ptned", metaStoreClient); + verifyRun("SELECT a from " + dbName + ".ptned_renamed WHERE (b=10) ORDER BY a", ptn_data, driver); + } + + @Test public void testIncrementalAdds() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acc2390..646bb23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.parse.PreInsertTableDesc; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.AbortTxnsDesc; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; @@ -1159,6 +1160,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return 0; } + String names[] = Utilities.getDbTableName(tableName); + if (Utils.isBootstrapDumpInProgress(db, names[0])) { + LOG.error("DDLTask: Rename Partition not allowed as bootstrap dump in progress"); + throw new HiveException("Rename Partition: Not allowed as bootstrap dump in progress"); + } + Table tbl = db.getTable(tableName); Partition oldPart = db.getPartition(tbl, oldPartSpec, false); if (oldPart == null) { @@ -3597,6 +3604,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable { * Throws this exception if an unexpected error occurs. */ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { + if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) { + String names[] = Utilities.getDbTableName(alterTbl.getOldName()); + if (Utils.isBootstrapDumpInProgress(db, names[0])) { + LOG.error("DDLTask: Rename Table not allowed as bootstrap dump in progress"); + throw new HiveException("Rename Table: Not allowed as bootstrap dump in progress"); + } + } + // alter the table Table tbl = db.getTable(alterTbl.getOldName()); http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 165a2e3..7703f31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -176,9 +176,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case - Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); - - for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) { + Hive hiveDb = getHive(); + Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), Utils.getAllTables(getHive(), dbName).size(), @@ -186,14 +186,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); - for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) { + + String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); } + Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); replLogger.endLog(bootDumpBeginReplId.toString()); } - Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); @@ -204,7 +207,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { IMetaStoreClient.NotificationFilter evFilter = new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern); EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, bootDumpBeginReplId, Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, @@ -223,7 +226,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { dmd.write(); // Set the correct last repl id to return to the user - return bootDumpEndReplId; + // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap + return bootDumpBeginReplId; } private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 40c34bf..76331fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer; @@ -50,6 +51,7 @@ import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -237,9 +239,21 @@ public class EximUtil { // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using // Replv2 semantics, i.e. with listFiles laziness (no copy at export time) + // Remove all the entries from the parameters which are added for bootstrap dump progress + Map<String, String> parameters = dbObj.getParameters(); + Map<String, String> tmpParameters = new HashMap<>(); + if (parameters != null) { + tmpParameters.putAll(parameters); + tmpParameters.entrySet() + .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)); + dbObj.setParameters(tmpParameters); + } try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) { new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec); } + if (parameters != null) { + dbObj.setParameters(parameters); + } } public static void createExportDump(FileSystem fs, Path metadataPath, http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index a48a17e..a1da629 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -32,9 +33,18 @@ import com.google.common.collect.Collections2; import java.io.DataOutputStream; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; public class Utils { + public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state."; + + public enum ReplDumpState { + IDLE, ACTIVE + } + public static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf) throws SemanticException { DataOutputStream outStream = null; @@ -79,4 +89,61 @@ public class Utils { SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); }); } + + public static String setDbBootstrapDumpState(Hive hiveDb, String dbName) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database == null) { + return null; + } + + Map<String, String> newParams = new HashMap<>(); + String uniqueKey = BOOTSTRAP_DUMP_STATE_KEY_PREFIX + UUID.randomUUID().toString(); + newParams.put(uniqueKey, ReplDumpState.ACTIVE.name()); + Map<String, String> params = database.getParameters(); + + // if both old params are not null, merge them + if (params != null) { + params.putAll(newParams); + database.setParameters(params); + } else { + // if one of them is null, replace the old params with the new one + database.setParameters(newParams); + } + + hiveDb.alterDatabase(dbName, database); + return uniqueKey; + } + + public static void resetDbBootstrapDumpState(Hive hiveDb, String dbName, + String uniqueKey) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database != null) { + Map<String, String> params = database.getParameters(); + if ((params != null) && params.containsKey(uniqueKey)) { + params.remove(uniqueKey); + database.setParameters(params); + hiveDb.alterDatabase(dbName, database); + } + } + } + + public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database == null) { + return false; + } + + Map<String, String> params = database.getParameters(); + if (params == null) { + return false; + } + + for (String key : params.keySet()) { + if (key.startsWith(BOOTSTRAP_DUMP_STATE_KEY_PREFIX) + && params.get(key).equals(ReplDumpState.ACTIVE.name())) { + return true; + } + } + return false; + } }