Repository: hive
Updated Branches:
  refs/heads/master 513ee73b7 -> 5681647b7


HIVE-20397: HiveStrictManagedMigration updates (Jason Dere, reviewed by Eugene 
Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5681647b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5681647b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5681647b

Branch: refs/heads/master
Commit: 5681647b7e5076cacc487793727f08fd9ec7512b
Parents: 513ee73
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Aug 17 11:26:52 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Aug 17 11:26:52 2018 -0700

----------------------------------------------------------------------
 .../ql/util/HiveStrictManagedMigration.java     | 397 +++++++++++++++----
 1 file changed, 318 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5681647b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java 
b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
index 0f0dc22..2a737bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.util;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -41,21 +42,22 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import 
org.apache.hadoop.hive.ql.parse.HiveParser.switchDatabaseStatement_return;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
 
 import org.apache.thrift.TException;
 
@@ -253,23 +255,29 @@ public class HiveStrictManagedMigration {
   }
 
   private RunOptions runOptions;
-  private Configuration conf;
+  private HiveConf conf;
   private HiveMetaStoreClient hms;
   private boolean failedValidationChecks;
+  private boolean failuresEncountered;
   private Warehouse wh;
   private Warehouse oldWh;
   private String ownerName;
   private String groupName;
   private FsPermission dirPerms;
   private FsPermission filePerms;
+  private boolean createExternalDirsForDbs;
+  Path curWhRootPath;
+  private HadoopShims.HdfsEncryptionShim encryptionShim;
 
   HiveStrictManagedMigration(RunOptions runOptions) {
     this.runOptions = runOptions;
-    this.conf = MetastoreConf.newMetastoreConf();
+    this.conf = new HiveConf();
   }
 
   void run() throws Exception {
+    wh = new Warehouse(conf);
     checkOldWarehouseRoot();
+    checkExternalWarehouseDir();
     checkOwnerPermsOptions();
 
     hms = new HiveMetaStoreClient(conf);//MetaException
@@ -278,7 +286,12 @@ public class HiveStrictManagedMigration {
       LOG.info("Found {} databases", databases.size());
       for (String dbName : databases) {
         if (dbName.matches(runOptions.dbRegex)) {
-          processDatabase(dbName);
+          try {
+            processDatabase(dbName);
+          } catch (Exception err) {
+            LOG.error("Error processing database " + dbName, err);
+            failuresEncountered = true;
+          }
         }
       }
       LOG.info("Done processing databases.");
@@ -286,6 +299,9 @@ public class HiveStrictManagedMigration {
       hms.close();
     }
 
+    if (failuresEncountered) {
+      throw new HiveException("One or more failures encountered during 
processing.");
+    }
     if (failedValidationChecks) {
       throw new HiveException("One or more tables failed validation checks for 
strict managed table mode.");
     }
@@ -304,8 +320,12 @@ public class HiveStrictManagedMigration {
               runOptions.oldWarehouseRoot);
           runOptions.shouldModifyManagedTableLocation = false;
         } else {
-          FileSystem oldWhRootFs = new 
Path(runOptions.oldWarehouseRoot).getFileSystem(conf);
-          FileSystem curWhRootFs = new 
Path(curWarehouseRoot).getFileSystem(conf);
+          Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot);
+          curWhRootPath = new Path(curWarehouseRoot);
+          FileSystem oldWhRootFs = oldWhRootPath.getFileSystem(conf);
+          FileSystem curWhRootFs = curWhRootPath.getFileSystem(conf);
+          oldWhRootPath = oldWhRootFs.makeQualified(oldWhRootPath);
+          curWhRootPath = curWhRootFs.makeQualified(curWhRootPath);
           if (!FileUtils.equalsFileSystem(oldWhRootFs, curWhRootFs)) {
             LOG.info("oldWarehouseRoot {} has a different FS than the current 
warehouse root {}."
                 + " Disabling shouldModifyManagedTableLocation",
@@ -316,6 +336,13 @@ public class HiveStrictManagedMigration {
               LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling 
shouldModifyManagedTableLocation",
                   oldWhRootFs.getUri());
               runOptions.shouldModifyManagedTableLocation = false;
+            } else {
+              encryptionShim = 
ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf);
+              if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, 
curWhRootPath)) {
+                LOG.info("oldWarehouseRoot {} and current warehouse root {} 
have different encryption zones." +
+                    " Disabling shouldModifyManagedTableLocation", 
oldWhRootPath, curWhRootPath);
+                runOptions.shouldModifyManagedTableLocation = false;
+              }
             }
           }
         }
@@ -323,7 +350,6 @@ public class HiveStrictManagedMigration {
     }
 
     if (runOptions.shouldModifyManagedTableLocation) {
-      wh = new Warehouse(conf);
       Configuration oldWhConf = new Configuration(conf);
       HiveConf.setVar(oldWhConf, HiveConf.ConfVars.METASTOREWAREHOUSE, 
runOptions.oldWarehouseRoot);
       oldWh = new Warehouse(oldWhConf);
@@ -336,17 +362,24 @@ public class HiveStrictManagedMigration {
       groupName = conf.get("strict.managed.tables.migration.group", null);
     }
     if (runOptions.shouldModifyManagedTablePermissions) {
-      String dirPermsString = 
conf.get("strict.managed.tables.migration.dir.permissions", "1700");
+      String dirPermsString = 
conf.get("strict.managed.tables.migration.dir.permissions", "700");
       if (dirPermsString != null) {
         dirPerms = new FsPermission(dirPermsString);
       }
-      String filePermsString = 
conf.get("strict.managed.tables.migration.dir.permissions", "700");
+      String filePermsString = 
conf.get("strict.managed.tables.migration.file.permissions", "700");
       if (filePermsString != null) {
         filePerms = new FsPermission(filePermsString);
       }
     }
   }
 
+  void checkExternalWarehouseDir() {
+    String externalWarehouseDir = 
conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL);
+    if (externalWarehouseDir != null && !externalWarehouseDir.isEmpty()) {
+      createExternalDirsForDbs = true;
+    }
+  }
+
   void processDatabase(String dbName) throws IOException, HiveException, 
MetaException, TException {
     LOG.info("Processing database {}", dbName);
     Database dbObj = hms.getDatabase(dbName);
@@ -363,15 +396,26 @@ public class HiveStrictManagedMigration {
         checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
             ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
 
-        String command = String.format("ALTER DATABASE %s SET LOCATION '%s'", 
dbName, newDefaultDbLocation);
-        runHiveCommand(command);
+        // The table processing needs the db location at the old location, so 
clone the DB object
+        // when updating the location.
+        Database modifiedDb = dbObj.deepCopy();
+        getHiveUpdater().updateDbLocation(modifiedDb, newDefaultDbLocation);
       }
     }
 
+    if (createExternalDirsForDbs) {
+      createExternalDbDir(dbObj);
+    }
+
     List<String> tableNames = hms.getTables(dbName, runOptions.tableRegex);
     for (String tableName : tableNames) {
       // If we did not change the DB location, there is no need to move the 
table directories.
-      processTable(dbObj, tableName, modifyDefaultManagedLocation);
+      try {
+        processTable(dbObj, tableName, modifyDefaultManagedLocation);
+      } catch (Exception err) {
+        LOG.error("Error processing table " + 
getQualifiedName(dbObj.getName(), tableName), err);
+        failuresEncountered = true;
+      }
     }
   }
 
@@ -448,7 +492,12 @@ public class HiveStrictManagedMigration {
       String dbLocation = dbObj.getLocationUri();
       Path oldDefaultDbLocation = oldWh.getDefaultDatabasePath(dbName);
       if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) {
-        return true;
+        if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, 
curWhRootPath)) {
+          return true;
+        } else {
+          LOG.info("{} and {} are on different encryption zones. Will not 
change database location for {}",
+              oldDefaultDbLocation, curWhRootPath, dbName);
+        }
       }
     } 
     return false;
@@ -462,7 +511,12 @@ public class HiveStrictManagedMigration {
     String tableLocation = tableObj.getSd().getLocation();
     Path oldDefaultTableLocation = oldWh.getDefaultTablePath(dbObj, 
tableObj.getTableName());
     if (arePathsEqual(conf, tableLocation, 
oldDefaultTableLocation.toString())) {
-      return true;
+      if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, 
curWhRootPath)) {
+        return true;
+      } else {
+        LOG.info("{} and {} are on different encryption zones. Will not change 
table location for {}",
+            oldDefaultTableLocation, curWhRootPath, 
getQualifiedName(tableObj));
+      }
     }
     return false;
   }
@@ -472,7 +526,51 @@ public class HiveStrictManagedMigration {
     String tableName = tableObj.getTableName();
     String partLocation = partObj.getSd().getLocation();
     Path oldDefaultPartLocation = oldWh.getDefaultPartitionPath(dbObj, 
tableObj, partSpec);
-    return arePathsEqual(conf, partLocation, 
oldDefaultPartLocation.toString());
+    if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) {
+      if (hasEquivalentEncryption(encryptionShim, oldDefaultPartLocation, 
curWhRootPath)) {
+        return true;
+      } else {
+        LOG.info("{} and {} are on different encryption zones. Will not change 
partition location",
+            oldDefaultPartLocation, curWhRootPath);
+      }
+    }
+    return false;
+  }
+
+  void createExternalDbDir(Database dbObj) throws IOException, MetaException {
+    Path externalTableDbPath = 
wh.getDefaultExternalDatabasePath(dbObj.getName());
+    FileSystem fs = externalTableDbPath.getFileSystem(conf);
+    if (!fs.exists(externalTableDbPath)) {
+      String dbOwner = ownerName;
+      String dbGroup = null;
+
+      String dbOwnerName = dbObj.getOwnerName();
+      if (dbOwnerName != null && !dbOwnerName.isEmpty()) {
+        switch (dbObj.getOwnerType()) {
+        case USER:
+          dbOwner = dbOwnerName;
+          break;
+        case ROLE:
+          break;
+        case GROUP:
+          dbGroup = dbOwnerName;
+          break;
+        }
+      }
+
+      LOG.info("Creating external table directory for database {} at {} with 
ownership {}/{}",
+          dbObj.getName(), externalTableDbPath, dbOwner, dbGroup);
+      if (!runOptions.dryRun) {
+        // Just rely on parent perms/umask for permissions.
+        fs.mkdirs(externalTableDbPath);
+        checkAndSetFileOwnerPermissions(fs, externalTableDbPath, dbOwner, 
dbGroup,
+            null, null, runOptions.dryRun, false);
+      }
+    } else {
+      LOG.info("Not creating external table directory for database {} - {} 
already exists.",
+          dbObj.getName(), externalTableDbPath);
+      // Leave the directory owner/perms as-is if the path already exists.
+    }
   }
 
   void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws 
HiveException, IOException, TException {
@@ -484,17 +582,17 @@ public class HiveStrictManagedMigration {
     LOG.info("Moving location of {} from {} to {}", 
getQualifiedName(tableObj), oldTablePath, newTablePath);
     if (!runOptions.dryRun) {
       FileSystem fs = newTablePath.getFileSystem(conf);
-      boolean movedData = fs.rename(oldTablePath, newTablePath);
-      if (!movedData) {
-        String msg = String.format("Unable to move data directory for table %s 
from %s to %s",
-            getQualifiedName(tableObj), oldTablePath, newTablePath);
-        throw new HiveException(msg);
+      if (fs.exists(oldTablePath)) {
+        boolean movedData = fs.rename(oldTablePath, newTablePath);
+        if (!movedData) {
+          String msg = String.format("Unable to move data directory for table 
%s from %s to %s",
+              getQualifiedName(tableObj), oldTablePath, newTablePath);
+          throw new HiveException(msg);
+        }
       }
     }
     if (!runOptions.dryRun) {
-      String command = String.format("ALTER TABLE %s SET LOCATION '%s'",
-          getQualifiedName(tableObj), newTablePath);
-      runHiveCommand(command);
+      getHiveUpdater().updateTableLocation(tableObj, newTablePath);
     }
     if (isPartitionedTable(tableObj)) {
       List<String> partNames = hms.listPartitionNames(dbName, tableName, 
Short.MAX_VALUE);
@@ -509,15 +607,37 @@ public class HiveStrictManagedMigration {
           // just update the partition location in the metastore.
           if (!runOptions.dryRun) {
             Path newPartPath = wh.getPartitionPath(newTablePath, partSpec);
-            String command = String.format("ALTER TABLE PARTITION (%s) SET 
LOCATION '%s'",
-                partName, newPartPath.toString());
-            runHiveCommand(command);
+            getHiveUpdater().updatePartitionLocation(dbName, tableObj, 
partName, partObj, newPartPath);
           }
         }
       }
     }
   }
 
+  void renameFilesToConformToAcid(Table tableObj) throws IOException, 
TException {
+    if (isPartitionedTable(tableObj)) {
+      String dbName = tableObj.getDbName();
+      String tableName = tableObj.getTableName();
+      List<String> partNames = hms.listPartitionNames(dbName, tableName, 
Short.MAX_VALUE);
+      for (String partName : partNames) {
+        Partition partObj = hms.getPartition(dbName, tableName, partName);
+        Path partPath = new Path(partObj.getSd().getLocation());
+        FileSystem fs = partPath.getFileSystem(conf);
+        if (fs.exists(partPath)) {
+          UpgradeTool.handleRenameFiles(tableObj, partPath,
+              !runOptions.dryRun, conf, tableObj.getSd().getBucketColsSize() > 
0, null);
+        }
+      }
+    } else {
+      Path tablePath = new Path(tableObj.getSd().getLocation());
+      FileSystem fs = tablePath.getFileSystem(conf);
+      if (fs.exists(tablePath)) {
+        UpgradeTool.handleRenameFiles(tableObj, tablePath,
+            !runOptions.dryRun, conf, tableObj.getSd().getBucketColsSize() > 
0, null);
+      }
+    }
+  }
+
   TableMigrationOption determineMigrationTypeAutomatically(Table tableObj, 
TableType tableType)
       throws IOException, MetaException, TException {
     TableMigrationOption result = TableMigrationOption.NONE;
@@ -554,6 +674,20 @@ public class HiveStrictManagedMigration {
     return result;
   }
 
+  private static final Map<String, String> convertToExternalTableProps = new 
HashMap<>();
+  private static final Map<String, String> convertToAcidTableProps = new 
HashMap<>();
+  private static final Map<String, String> convertToMMTableProps = new 
HashMap<>();
+
+  static {
+    convertToExternalTableProps.put("EXTERNAL", "TRUE");
+    convertToExternalTableProps.put("external.table.purge", "true");
+
+    convertToAcidTableProps.put("transactional", "true");
+
+    convertToMMTableProps.put("transactional", "true");
+    convertToMMTableProps.put("transactional_properties", "insert_only");
+  }
+
   boolean migrateToExternalTable(Table tableObj, TableType tableType) throws 
HiveException {
     String msg;
     switch (tableType) {
@@ -566,10 +700,8 @@ public class HiveStrictManagedMigration {
       }
       LOG.info("Converting {} to external table ...", 
getQualifiedName(tableObj));
       if (!runOptions.dryRun) {
-        String command = String.format(
-            "ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 
'external.table.purge'='true')",
-            getQualifiedName(tableObj));
-        runHiveCommand(command);
+        tableObj.setTableType(TableType.EXTERNAL_TABLE.toString());
+        getHiveUpdater().updateTableProperties(tableObj, 
convertToExternalTableProps);
       }
       return true;
     case EXTERNAL_TABLE:
@@ -586,12 +718,28 @@ public class HiveStrictManagedMigration {
     return false;
   }
 
-  boolean migrateToManagedTable(Table tableObj, TableType tableType) throws 
HiveException, MetaException {
+  boolean canTableBeFullAcid(Table tableObj) throws MetaException {
+    // Table must be acid-compatible table format, and no sorting columns.
+    return TransactionalValidationListener.conformToAcid(tableObj) &&
+        (tableObj.getSd().getSortColsSize() <= 0);
+  }
+
+  Map<String, String> getTablePropsForConversionToTransactional(Map<String, 
String> props,
+      boolean convertFromExternal) {
+    if (convertFromExternal) {
+      // Copy the properties to a new map so we can add EXTERNAL=FALSE
+      props = new HashMap<String, String>(props);
+      props.put("EXTERNAL", "FALSE");
+    }
+    return props;
+  }
 
-    String externalFalse = "";
+  boolean migrateToManagedTable(Table tableObj, TableType tableType) throws 
HiveException, IOException, MetaException, TException {
+
+    boolean convertFromExternal = false;
     switch (tableType) {
     case EXTERNAL_TABLE:
-      externalFalse = "'EXTERNAL'='FALSE', ";
+      convertFromExternal = true;
       // fall through
     case MANAGED_TABLE:
       if (MetaStoreUtils.isNonNativeTable(tableObj)) {
@@ -614,7 +762,7 @@ public class HiveStrictManagedMigration {
         return false;
       }
       // If table is already transactional, no migration needed.
-      if (AcidUtils.isFullAcidTable(tableObj)) {
+      if (AcidUtils.isTransactionalTable(tableObj)) {
         String msg = createManagedConversionExcuse(tableObj,
             "Table is already a transactional table");
         LOG.debug(msg);
@@ -623,23 +771,24 @@ public class HiveStrictManagedMigration {
 
       // ORC files can be converted to full acid transactional tables
       // Other formats can be converted to insert-only transactional tables
-      if (TransactionalValidationListener.conformToAcid(tableObj)) {
+      if (canTableBeFullAcid(tableObj)) {
         // TODO: option to allow converting ORC file to insert-only 
transactional?
         LOG.info("Converting {} to full transactional table", 
getQualifiedName(tableObj));
+
+        renameFilesToConformToAcid(tableObj);
+
         if (!runOptions.dryRun) {
-          String command = String.format(
-              "ALTER TABLE %s SET TBLPROPERTIES ('transactional'='true')",
-              getQualifiedName(tableObj));
-          runHiveCommand(command);
+          Map<String, String> props = 
getTablePropsForConversionToTransactional(
+              convertToAcidTableProps, convertFromExternal);
+          getHiveUpdater().updateTableProperties(tableObj, props);
         }
         return true;
       } else {
         LOG.info("Converting {} to insert-only transactional table", 
getQualifiedName(tableObj));
         if (!runOptions.dryRun) {
-          String command = String.format(
-              "ALTER TABLE %s SET TBLPROPERTIES (%s'transactional'='true', 
'transactional_properties'='insert_only')",
-              getQualifiedName(tableObj), externalFalse);
-          runHiveCommand(command);
+          Map<String, String> props = 
getTablePropsForConversionToTransactional(
+              convertToMMTableProps, convertFromExternal);
+          getHiveUpdater().updateTableProperties(tableObj, props);
         }
         return true;
       }
@@ -683,7 +832,8 @@ public class HiveStrictManagedMigration {
       Path tablePath = new Path(tableObj.getSd().getLocation());
       FileSystem fs = tablePath.getFileSystem(conf);
       if (isHdfs(fs)) {
-        shouldBeExternal = checkDirectoryOwnership(fs, tablePath, ownerName, 
true);
+        boolean ownedByHive = checkDirectoryOwnership(fs, tablePath, 
ownerName, true);
+        shouldBeExternal = !ownedByHive;
       } else {
         // Set non-hdfs tables to external, unless transactional (should have 
been checked before this).
         shouldBeExternal = true;
@@ -696,7 +846,8 @@ public class HiveStrictManagedMigration {
         Path partPath = new Path(partObj.getSd().getLocation());
         FileSystem fs = partPath.getFileSystem(conf);
         if (isHdfs(fs)) {
-          shouldBeExternal = checkDirectoryOwnership(fs, partPath, ownerName, 
true);
+          boolean ownedByHive = checkDirectoryOwnership(fs, partPath, 
ownerName, true);
+          shouldBeExternal = !ownedByHive;
         } else {
           shouldBeExternal = true;
         }
@@ -709,49 +860,103 @@ public class HiveStrictManagedMigration {
     return shouldBeExternal;
   }
 
-  void runHiveCommand(String command) throws HiveException {
-    LOG.info("Running command: {}", command);
-
-    if (driver == null) {
-      driver = new MyDriver(conf);
-    }
-  
-    CommandProcessorResponse cpr = driver.driver.run(command);
-    if (cpr.getResponseCode() != 0) {
-      String msg = "Query returned non-zero code: " + cpr.getResponseCode()
-          + ", cause: " + cpr.getErrorMessage();
-      throw new HiveException(msg);
+  void cleanup() {
+    if (hiveUpdater != null) {
+      runAndLogErrors(() -> hiveUpdater.close());
+      hiveUpdater = null;
     }
   }
 
-  void cleanup() {
-    if (driver != null) {
-      runAndLogErrors(() -> driver.close());
-      driver = null;
+  HiveUpdater getHiveUpdater() throws HiveException {
+    if (hiveUpdater == null) {
+      hiveUpdater = new HiveUpdater();
     }
+    return hiveUpdater;
   }
 
-  static class MyDriver {
-    IDriver driver;
+  class HiveUpdater {
+    Hive hive;
 
-    MyDriver(Configuration conf) {
-      HiveConf hiveConf = new HiveConf(conf, this.getClass());
-      // TODO: Clean up SessionState/Driver/TezSession on exit
-      SessionState.start(hiveConf);
-      driver = DriverFactory.newDriver(hiveConf);
+    HiveUpdater() throws HiveException {
+      hive = Hive.get(conf);
+      Hive.set(hive);
     }
 
     void close() {
-      if (driver != null) {
-        runAndLogErrors(() -> driver.close());
-        runAndLogErrors(() -> driver.destroy());
-        driver = null;
-        runAndLogErrors(() -> SessionState.get().close());
+      if (hive != null) {
+        runAndLogErrors(() -> Hive.closeCurrent());
+        hive = null;
       }
     }
+
+    void updateDbLocation(Database db, Path newLocation) throws HiveException {
+      String msg = String.format("ALTER DATABASE %s SET LOCATION '%s'", 
db.getName(), newLocation);
+      LOG.info(msg);
+
+      db.setLocationUri(newLocation.toString());
+      hive.alterDatabase(db.getName(), db);
+    }
+
+    void updateTableLocation(Table table, Path newLocation) throws 
HiveException {
+      String msg = String.format("ALTER TABLE %s SET LOCATION '%s'",
+          getQualifiedName(table), newLocation);
+      LOG.info(msg);
+
+      org.apache.hadoop.hive.ql.metadata.Table modifiedTable =
+          new org.apache.hadoop.hive.ql.metadata.Table(table);
+      modifiedTable.setDataLocation(newLocation);
+      hive.alterTable(table.getCatName(), table.getDbName(), 
table.getTableName(),
+          modifiedTable, false, null, false);
+    }
+
+    void updatePartitionLocation(String dbName, Table table, String partName, 
Partition part, Path newLocation)
+        throws HiveException, TException {
+      String msg = String.format("ALTER TABLE %s PARTITION (%s) SET LOCATION 
'%s'",
+          getQualifiedName(table), partName, newLocation.toString());
+      LOG.info(msg);
+
+      org.apache.hadoop.hive.ql.metadata.Partition modifiedPart =
+          new org.apache.hadoop.hive.ql.metadata.Partition(
+              new org.apache.hadoop.hive.ql.metadata.Table(table),
+              part);
+      modifiedPart.setLocation(newLocation.toString());
+      hive.alterPartition(dbName, table.getTableName(), modifiedPart, null, 
false);
+    }
+
+    void updateTableProperties(Table table, Map<String, String> props) throws 
HiveException {
+      StringBuilder sb = new StringBuilder();
+      org.apache.hadoop.hive.ql.metadata.Table modifiedTable =
+          new org.apache.hadoop.hive.ql.metadata.Table(table);
+      if (props.size() == 0) {
+        return;
+      }
+      boolean first = true;
+      for (String key : props.keySet()) {
+        String value = props.get(key);
+        modifiedTable.getParameters().put(key, value);
+
+        // Build properties list for logging
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("'");
+        sb.append(key);
+        sb.append("'='");
+        sb.append(value);
+        sb.append("'");
+      }
+      String msg = String.format("ALTER TABLE %s SET TBLPROPERTIES (%s)",
+          getQualifiedName(table), sb.toString());
+      LOG.info(msg);
+
+      hive.alterTable(table.getCatName(), table.getDbName(), 
table.getTableName(), modifiedTable,
+          false, null, false);
+    }
   }
 
-  MyDriver driver;
+  HiveUpdater hiveUpdater;
 
   interface ThrowableRunnable {
     void run() throws Exception;
@@ -779,7 +984,7 @@ public class HiveStrictManagedMigration {
 
   static boolean isPartitionedTable(Table tableObj) {
     List<FieldSchema> partKeys = tableObj.getPartitionKeys();
-    if (partKeys != null || partKeys.size() > 0) {
+    if (partKeys != null && partKeys.size() > 0) {
       return true;
     }
     return false;
@@ -836,7 +1041,7 @@ public class HiveStrictManagedMigration {
       String userName, String groupName,
       FsPermission dirPerms, FsPermission filePerms,
       boolean dryRun, boolean recurse) throws IOException {
-    FileStatus fStatus = fs.getFileStatus(path);
+    FileStatus fStatus = getFileStatus(fs, path);
     checkAndSetFileOwnerPermissions(fs, fStatus, userName, groupName, 
dirPerms, filePerms, dryRun, recurse);
   }
 
@@ -857,6 +1062,10 @@ public class HiveStrictManagedMigration {
       String userName, String groupName,
       FsPermission dirPerms, FsPermission filePerms,
       boolean dryRun, boolean recurse) throws IOException {
+    if (fStatus == null) {
+      return;
+    }
+
     Path path = fStatus.getPath();
     boolean setOwner = false;
     if (userName != null && !userName.equals(fStatus.getOwner())) {
@@ -901,7 +1110,7 @@ public class HiveStrictManagedMigration {
       Path path,
       String userName,
       boolean recurse) throws IOException {
-    FileStatus fStatus = fs.getFileStatus(path);
+    FileStatus fStatus = getFileStatus(fs, path);
     return checkDirectoryOwnership(fs, fStatus, userName, recurse);
   }
 
@@ -909,6 +1118,11 @@ public class HiveStrictManagedMigration {
       FileStatus fStatus,
       String userName,
       boolean recurse) throws IOException {
+    if (fStatus == null) {
+      // Non-existent file returns true.
+      return true;
+    }
+
     Path path = fStatus.getPath();
     boolean result = true;
 
@@ -930,4 +1144,29 @@ public class HiveStrictManagedMigration {
 
     return result;
   }
+
+  static FileStatus getFileStatus(FileSystem fs, Path path) throws IOException 
{
+    if (!fs.exists(path)) {
+      return null;
+    }
+    return fs.getFileStatus(path);
+  }
+
+  static FileStatus[] listStatus(FileSystem fs, Path path) throws IOException {
+    if (!fs.exists(path)) {
+      return null;
+    }
+    return fs.listStatus(path);
+  }
+
+  static boolean hasEquivalentEncryption(HadoopShims.HdfsEncryptionShim 
encryptionShim,
+      Path path1, Path path2) throws IOException {
+    // Assumes these are both qualified paths are in the same FileSystem
+    if (encryptionShim.isPathEncrypted(path1) || 
encryptionShim.isPathEncrypted(path2)) {
+      if (!encryptionShim.arePathsOnSameEncryptionZone(path1, path2)) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

Reply via email to