Repository: hive
Updated Branches:
  refs/heads/master bb4035b68 -> 5663b9717


HIVE-17183 : Disable rename operations during bootstrap dump (Sankar Hariappan, 
reviewed by Anishek Agarwal, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5663b971
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5663b971
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5663b971

Branch: refs/heads/master
Commit: 5663b971776bd3e6a6e17426875f44313f6eff9f
Parents: bb4035b
Author: Sankar Hariappan <mailtosank...@gmail.com>
Authored: Thu Sep 7 11:07:24 2017 -0700
Committer: Thejas M Nair <the...@hortonworks.com>
Committed: Thu Sep 7 11:07:24 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/parse/TestReplicationScenarios.java | 75 ++++++++++++++++++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 15 ++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  | 18 +++--
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 14 ++++
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   | 67 +++++++++++++++++
 5 files changed, 182 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b19c1aa..9667449 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -551,6 +551,81 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testBootstrapWithConcurrentRename() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name, driver);
+    String replDbName = dbName + "_dupe";
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) 
STORED AS TEXTFILE", driver);
+
+    String[] ptn_data = new String[]{ "eleven" , "twelve" };
+    String[] empty = new String[]{};
+    String ptn_locn = new Path(TEST_PATH, name + "_ptn").toUri().getPath();
+
+    createTestDataFile(ptn_locn, ptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn + "' OVERWRITE INTO TABLE " + 
dbName + ".ptned PARTITION(b=1)", driver);
+
+    BehaviourInjection<Table,Table> ptnedTableRenamer = new 
BehaviourInjection<Table,Table>(){
+      boolean success = false;
+
+      @Nullable
+      @Override
+      public Table apply(@Nullable Table table) {
+        if (injectionPathCalled) {
+          nonInjectedPathCalled = true;
+        } else {
+          // getTable is invoked after fetching the table names
+          injectionPathCalled = true;
+          Thread t = new Thread(new Runnable() {
+            public void run() {
+              try {
+                LOG.info("Entered new thread");
+                Driver driver2 = new Driver(hconf);
+                SessionState.start(new CliSessionState(hconf));
+                CommandProcessorResponse ret = driver2.run("ALTER TABLE " + 
dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)");
+                success = (ret.getException() == null);
+                assertFalse(success);
+                ret = driver2.run("ALTER TABLE " + dbName + ".ptned RENAME TO 
" + dbName + ".ptned_renamed");
+                success = (ret.getException() == null);
+                assertFalse(success);
+                LOG.info("Exit new thread success - {}", success);
+              } catch (CommandNeedRetryException e) {
+                LOG.info("Hit Exception {} from new thread", e.getMessage());
+                throw new RuntimeException(e);
+              }
+            }
+          });
+          t.start();
+          LOG.info("Created new thread {}", t.getName());
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return table;
+      }
+    };
+    InjectableBehaviourObjectStore.setGetTableBehaviour(ptnedTableRenamer);
+
+    // The intermediate rename would've failed as bootstrap dump in progress
+    bootstrapLoadAndVerify(dbName, replDbName);
+
+    ptnedTableRenamer.assertInjectionsPerformed(true,true);
+    InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the 
behaviour
+
+    // The ptned table should be there in both source and target as rename was 
not successful
+    verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", 
ptn_data, driver);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY 
a", ptn_data, driverMirror);
+
+    // Verify if Rename after bootstrap is successful
+    run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION 
(b=10)", driver);
+    verifyIfPartitionNotExist(dbName, "ptned", new 
ArrayList<>(Arrays.asList("1")), metaStoreClient);
+    run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + 
".ptned_renamed", driver);
+    verifyIfTableNotExist(dbName, "ptned", metaStoreClient);
+    verifyRun("SELECT a from " + dbName + ".ptned_renamed WHERE (b=10) ORDER 
BY a", ptn_data, driver);
+  }
+
+  @Test
   public void testIncrementalAdds() throws IOException {
     String name = testName.getMethodName();
     String dbName = createDB(name, driver);

http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index acc2390..646bb23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -150,6 +150,7 @@ import 
org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.parse.PreInsertTableDesc;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.plan.AbortTxnsDesc;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
@@ -1159,6 +1160,12 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       return 0;
     }
 
+    String names[] = Utilities.getDbTableName(tableName);
+    if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+      LOG.error("DDLTask: Rename Partition not allowed as bootstrap dump in 
progress");
+      throw new HiveException("Rename Partition: Not allowed as bootstrap dump 
in progress");
+    }
+
     Table tbl = db.getTable(tableName);
     Partition oldPart = db.getPartition(tbl, oldPartSpec, false);
     if (oldPart == null) {
@@ -3597,6 +3604,14 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
    *           Throws this exception if an unexpected error occurs.
    */
   private int alterTable(Hive db, AlterTableDesc alterTbl) throws 
HiveException {
+    if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
+      String names[] = Utilities.getDbTableName(alterTbl.getOldName());
+      if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+        LOG.error("DDLTask: Rename Table not allowed as bootstrap dump in 
progress");
+        throw new HiveException("Rename Table: Not allowed as bootstrap dump 
in progress");
+      }
+    }
+
     // alter the table
     Table tbl = db.getTable(alterTbl.getOldName());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 165a2e3..7703f31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -176,9 +176,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
 
   private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) 
throws Exception {
     // bootstrap case
-    Long bootDumpBeginReplId = 
getHive().getMSC().getCurrentNotificationEventId().getEventId();
-
-    for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) {
+    Hive hiveDb = getHive();
+    Long bootDumpBeginReplId = 
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+    for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + 
dbName);
       replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
               Utils.getAllTables(getHive(), dbName).size(),
@@ -186,14 +186,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       replLogger.startLog();
       Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
       dumpFunctionMetadata(dbName, dumpRoot);
-      for (String tblName : Utils.matchesTbl(getHive(), dbName, 
work.tableNameOrPattern)) {
+
+      String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
+      for (String tblName : Utils.matchesTbl(hiveDb, dbName, 
work.tableNameOrPattern)) {
         LOG.debug(
             "analyzeReplDump dumping table: " + tblName + " to db root " + 
dbRoot.toUri());
         dumpTable(dbName, tblName, dbRoot);
       }
+      Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
       replLogger.endLog(bootDumpBeginReplId.toString());
     }
-    Long bootDumpEndReplId = 
getHive().getMSC().getCurrentNotificationEventId().getEventId();
+    Long bootDumpEndReplId = 
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
     LOG.info("Bootstrap object dump phase took from {} to {}", 
bootDumpBeginReplId,
         bootDumpEndReplId);
 
@@ -204,7 +207,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     IMetaStoreClient.NotificationFilter evFilter =
         new DatabaseAndTableFilter(work.dbNameOrPattern, 
work.tableNameOrPattern);
     EventUtils.MSClientNotificationFetcher evFetcher =
-        new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+        new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC());
     EventUtils.NotificationEventIterator evIter = new 
EventUtils.NotificationEventIterator(
         evFetcher, bootDumpBeginReplId,
         Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
@@ -223,7 +226,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     dmd.write();
 
     // Set the correct last repl id to return to the user
-    return bootDumpEndReplId;
+    // Currently returned bootDumpBeginReplId as we don't consolidate the 
events after bootstrap
+    return bootDumpBeginReplId;
   }
 
   private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 40c34bf..76331fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer;
@@ -50,6 +51,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -237,9 +239,21 @@ public class EximUtil {
     // If we later make this work for non-repl cases, analysis of this logic 
might become necessary. Also, this is using
     // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
 
+    // Remove all the entries from the parameters which are added for 
bootstrap dump progress
+    Map<String, String> parameters = dbObj.getParameters();
+    Map<String, String> tmpParameters = new HashMap<>();
+    if (parameters != null) {
+      tmpParameters.putAll(parameters);
+      tmpParameters.entrySet()
+                .removeIf(e -> 
e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX));
+      dbObj.setParameters(tmpParameters);
+    }
     try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {
       new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec);
     }
+    if (parameters != null) {
+      dbObj.setParameters(parameters);
+    }
   }
 
   public static void createExportDump(FileSystem fs, Path metadataPath,

http://git-wip-us.apache.org/repos/asf/hive/blob/5663b971/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index a48a17e..a1da629 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -32,9 +33,18 @@ import com.google.common.collect.Collections2;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 public class Utils {
+  public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = 
"bootstrap.dump.state.";
+
+  public enum ReplDumpState {
+    IDLE, ACTIVE
+  }
+
   public static void writeOutput(List<String> values, Path outputFile, 
HiveConf hiveConf)
       throws SemanticException {
     DataOutputStream outStream = null;
@@ -79,4 +89,61 @@ public class Utils {
                       
SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase());
             });
   }
+
+  public static String setDbBootstrapDumpState(Hive hiveDb, String dbName) 
throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database == null) {
+      return null;
+    }
+
+    Map<String, String> newParams = new HashMap<>();
+    String uniqueKey = BOOTSTRAP_DUMP_STATE_KEY_PREFIX + 
UUID.randomUUID().toString();
+    newParams.put(uniqueKey, ReplDumpState.ACTIVE.name());
+    Map<String, String> params = database.getParameters();
+
+    // if both old params are not null, merge them
+    if (params != null) {
+      params.putAll(newParams);
+      database.setParameters(params);
+    } else {
+      // if one of them is null, replace the old params with the new one
+      database.setParameters(newParams);
+    }
+
+    hiveDb.alterDatabase(dbName, database);
+    return uniqueKey;
+  }
+
+  public static void resetDbBootstrapDumpState(Hive hiveDb, String dbName,
+                                               String uniqueKey) throws 
HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database != null) {
+      Map<String, String> params = database.getParameters();
+      if ((params != null) && params.containsKey(uniqueKey)) {
+        params.remove(uniqueKey);
+        database.setParameters(params);
+        hiveDb.alterDatabase(dbName, database);
+      }
+    }
+  }
+
+  public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) 
throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database == null) {
+      return false;
+    }
+
+    Map<String, String> params = database.getParameters();
+    if (params == null) {
+      return false;
+    }
+
+    for (String key : params.keySet()) {
+      if (key.startsWith(BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
+              && params.get(key).equals(ReplDumpState.ACTIVE.name())) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Reply via email to