This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ff00f37a54 HIVE-29403: Move the truncate table out of HMSHandler 
(#6269)
6ff00f37a54 is described below

commit 6ff00f37a549ee3cba07b239ca1c23b51d8a26ec
Author: dengzh <[email protected]>
AuthorDate: Fri Jan 30 10:58:34 2026 +0800

    HIVE-29403: Move the truncate table out of HMSHandler (#6269)
---
 standalone-metastore/metastore-server/pom.xml      |  12 +
 .../apache/hadoop/hive/metastore/HMSHandler.java   | 542 ++-------------------
 .../hadoop/hive/metastore/HiveAlterHandler.java    |   2 +-
 .../apache/hadoop/hive/metastore/IHMSHandler.java  |   4 +-
 ...ionHandler.java => AbstractRequestHandler.java} | 250 +++++-----
 .../metastore/handler/AddPartitionsHandler.java    |   5 +-
 .../hive/metastore/handler/CreateTableHandler.java | 456 +++++++++++++++++
 .../metastore/handler/DropDatabaseHandler.java     |  30 +-
 .../metastore/handler/DropPartitionsHandler.java   |   9 +-
 .../hive/metastore/handler/DropTableHandler.java   |  23 +-
 .../hive/metastore/handler/RequestHandler.java     |  35 ++
 .../metastore/handler/TruncateTableHandler.java    | 301 ++++++++++++
 .../hive/metastore/client/TestDropTable.java       |   8 +-
 13 files changed, 1013 insertions(+), 664 deletions(-)

diff --git a/standalone-metastore/metastore-server/pom.xml 
b/standalone-metastore/metastore-server/pom.xml
index 94217be1924..47783d74f7f 100644
--- a/standalone-metastore/metastore-server/pom.xml
+++ b/standalone-metastore/metastore-server/pom.xml
@@ -23,6 +23,7 @@
   <name>Hive Metastore Server</name>
   <properties>
     <standalone.metastore.path.to.root>..</standalone.metastore.path.to.root>
+    <reflections.version>0.10.2</reflections.version>
   </properties>
   <dependencies>
     <dependency>
@@ -70,6 +71,17 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.reflections</groupId>
+      <artifactId>reflections</artifactId>
+      <version>${reflections.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 31af4c9efcd..2a7ce1c94d2 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -29,16 +29,9 @@
 import com.google.common.util.concurrent.Striped;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.AcidConstants;
-import org.apache.hadoop.hive.common.AcidMetaDataFile;
-import org.apache.hadoop.hive.common.AcidMetaDataFile.DataFormat;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.api.Package;
@@ -47,11 +40,13 @@
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import 
org.apache.hadoop.hive.metastore.dataconnector.DataConnectorProviderFactory;
 import org.apache.hadoop.hive.metastore.events.*;
-import org.apache.hadoop.hive.metastore.handler.AbstractOperationHandler;
+import org.apache.hadoop.hive.metastore.handler.AbstractRequestHandler;
 import org.apache.hadoop.hive.metastore.handler.AddPartitionsHandler;
+import org.apache.hadoop.hive.metastore.handler.CreateTableHandler;
 import org.apache.hadoop.hive.metastore.handler.DropDatabaseHandler;
 import org.apache.hadoop.hive.metastore.handler.DropPartitionsHandler;
 import org.apache.hadoop.hive.metastore.handler.DropTableHandler;
+import org.apache.hadoop.hive.metastore.handler.TruncateTableHandler;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
@@ -67,7 +62,6 @@
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.FilterUtils;
-import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -93,17 +87,10 @@
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.join;
-import static 
org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX;
-import static 
org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
-import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
-import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS;
 
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.REPL_RESUME_STARTED_AFTER_FAILOVER;
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY;
 import static 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.RENAME_PARTITION_MAKE_COPY;
-import static 
org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
-import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.CTAS_LEGACY_CONFIG;
-import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
 import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
 import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.newMetaException;
 import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.rethrowException;
@@ -111,8 +98,6 @@
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
-import static 
org.apache.hadoop.hive.metastore.Warehouse.getCatalogQualifiedTableName;
-import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTLT;
 import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.canUpdateStats;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
@@ -304,6 +289,11 @@ public List<MetaStoreEventListener> getListeners() {
     return listeners;
   }
 
+  @Override
+  public IMetaStoreMetadataTransformer getMetadataTransformer() {
+    return transformer;
+  }
+
   @Override
   public void init() throws MetaException {
     init(new Warehouse(conf));
@@ -1592,8 +1582,8 @@ public AsyncOperationResp drop_database_req(final 
DropDatabaseRequest req)
     }
     Exception ex = null;
     try {
-      DropDatabaseHandler dropDatabaseOp = 
AbstractOperationHandler.offer(this, req);
-      AbstractOperationHandler.OperationStatus status = 
dropDatabaseOp.getOperationStatus();
+      DropDatabaseHandler dropDatabaseOp = AbstractRequestHandler.offer(this, 
req);
+      AbstractRequestHandler.RequestStatus status = 
dropDatabaseOp.getRequestStatus();
       return status.toAsyncOperationResp();
     } catch (Exception e) {
       ex = e;
@@ -2010,283 +2000,6 @@ public Table translate_table_dryrun(final 
CreateTableRequest req) throws Already
     return transformedTbl != null ? transformedTbl : tbl;
   }
 
-  private void create_table_core(final RawStore ms, final CreateTableRequest 
req)
-      throws AlreadyExistsException, MetaException,
-      InvalidObjectException, NoSuchObjectException, InvalidInputException {
-    ColumnStatistics colStats = null;
-    Table tbl = req.getTable();
-    EnvironmentContext envContext = req.getEnvContext();
-    SQLAllTableConstraints constraints = new SQLAllTableConstraints();
-    constraints.setPrimaryKeys(req.getPrimaryKeys());
-    constraints.setForeignKeys(req.getForeignKeys());
-    constraints.setUniqueConstraints(req.getUniqueConstraints());
-    constraints.setDefaultConstraints(req.getDefaultConstraints());
-    constraints.setCheckConstraints(req.getCheckConstraints());
-    constraints.setNotNullConstraints(req.getNotNullConstraints());
-    List<String> processorCapabilities = req.getProcessorCapabilities();
-    String processorId = req.getProcessorIdentifier();
-
-    // To preserve backward compatibility throw MetaException in case of null 
database
-    if (tbl.getDbName() == null) {
-      throw new MetaException("Null database name is not allowed");
-    }
-
-    if (!MetaStoreUtils.validateName(tbl.getTableName(), conf)) {
-      throw new InvalidObjectException(tbl.getTableName()
-          + " is not a valid object name");
-    }
-
-    if (!MetaStoreUtils.validateTblStorage(tbl.getSd())) {
-      throw new InvalidObjectException(tbl.getTableName()
-              + " location must not be root path");
-    }
-
-    if (!tbl.isSetCatName()) {
-      tbl.setCatName(getDefaultCatalog(conf));
-    }
-
-    Database db = get_database_core(tbl.getCatName(), tbl.getDbName());
-    if (MetaStoreUtils.isDatabaseRemote(db)) {
-      // HIVE-24425: Create table in REMOTE db should fail
-      throw new MetaException("Create table in REMOTE database " + 
db.getName() + " is not allowed");
-    }
-
-    if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), 
tbl.getTableName())) {
-      throw new AlreadyExistsException("Table " + 
getCatalogQualifiedTableName(tbl)
-          + " already exists");
-    }
-
-    tbl.setDbName(normalizeIdentifier(tbl.getDbName()));
-    tbl.setTableName(normalizeIdentifier(tbl.getTableName()));
-
-    if (transformer != null) {
-      tbl = transformer.transformCreateTable(tbl, processorCapabilities, 
processorId);
-    }
-
-    Map<String, String> params = tbl.getParameters();
-    if (params != null) {
-      params.remove(TABLE_IS_CTAS);
-      params.remove(TABLE_IS_CTLT);
-      if (MetaStoreServerUtils.getBooleanEnvProp(envContext, 
CTAS_LEGACY_CONFIG) &&
-          TableType.MANAGED_TABLE.toString().equals(tbl.getTableType())) {
-        params.put("EXTERNAL", "TRUE");
-        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
-      }
-    }
-
-    // If the given table has column statistics, save it here. We will update 
it later.
-    // We don't want it to be part of the Table object being created, lest the 
create table
-    // event will also have the col stats which we don't want.
-    if (tbl.isSetColStats()) {
-      colStats = tbl.getColStats();
-      tbl.unsetColStats();
-    }
-
-    String validate = 
MetaStoreServerUtils.validateTblColumns(tbl.getSd().getCols());
-    if (validate != null) {
-      throw new InvalidObjectException("Invalid column " + validate);
-    }
-    if (tbl.getPartitionKeys() != null) {
-      validate = 
MetaStoreServerUtils.validateTblColumns(tbl.getPartitionKeys());
-      if (validate != null) {
-        throw new InvalidObjectException("Invalid partition column " + 
validate);
-      }
-    }
-    if (tbl.isSetId()) {
-      LOG.debug("Id shouldn't be set but table {}.{} has the Id set to {}. Id 
is ignored.", tbl.getDbName(),
-          tbl.getTableName(), tbl.getId());
-      tbl.unsetId();
-    }
-    SkewedInfo skew = tbl.getSd().getSkewedInfo();
-    if (skew != null) {
-      validate = 
MetaStoreServerUtils.validateSkewedColNames(skew.getSkewedColNames());
-      if (validate != null) {
-        throw new InvalidObjectException("Invalid skew column " + validate);
-      }
-      validate = MetaStoreServerUtils.validateSkewedColNamesSubsetCol(
-          skew.getSkewedColNames(), tbl.getSd().getCols());
-      if (validate != null) {
-        throw new InvalidObjectException("Invalid skew column " + validate);
-      }
-    }
-
-    Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
-    Path tblPath = null;
-    boolean success = false, madeDir = false;
-    boolean isReplicated = false;
-    try {
-
-      ms.openTransaction();
-
-      db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
-      isReplicated = isDbReplicationTarget(db);
-
-      firePreEvent(new PreCreateTableEvent(tbl, db, this));
-
-      if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
-        if (tbl.getSd().getLocation() == null
-            || tbl.getSd().getLocation().isEmpty()) {
-          tblPath = wh.getDefaultTablePath(db, tbl.getTableName() + 
getTableSuffix(tbl),
-              MetaStoreUtils.isExternalTable(tbl));
-        } else {
-          if (!MetaStoreUtils.isExternalTable(tbl) && 
!MetaStoreUtils.isNonNativeTable(tbl)) {
-            LOG.warn("Location: " + tbl.getSd().getLocation()
-                + " specified for non-external table:" + tbl.getTableName());
-          }
-          tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
-          // ignore suffix if it's already there (direct-write CTAS)
-          if (!tblPath.getName().matches("(.*)" + SOFT_DELETE_TABLE_PATTERN)) {
-            tblPath = new Path(tblPath + getTableSuffix(tbl));
-          }
-        }
-        tbl.getSd().setLocation(tblPath.toString());
-      }
-
-      if (tblPath != null) {
-        if (!wh.isDir(tblPath)) {
-          if (!wh.mkdirs(tblPath)) {
-            throw new MetaException(tblPath
-                + " is not a directory or unable to create one");
-          }
-          madeDir = true;
-        }
-      }
-
-      MetaStoreServerUtils.updateTableStatsForCreateTable(wh, db, tbl, 
envContext, conf, tblPath, madeDir);
-
-      // set create time
-      long time = System.currentTimeMillis() / 1000;
-      tbl.setCreateTime((int) time);
-      if (tbl.getParameters() == null ||
-          tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
-        tbl.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(time));
-      }
-
-      if (CollectionUtils.isEmpty(constraints.getPrimaryKeys()) && 
CollectionUtils.isEmpty(constraints.getForeignKeys())
-          && CollectionUtils.isEmpty(constraints.getUniqueConstraints())&& 
CollectionUtils.isEmpty(constraints.getNotNullConstraints())&& 
CollectionUtils.isEmpty(constraints.getDefaultConstraints())
-          && CollectionUtils.isEmpty(constraints.getCheckConstraints())) {
-        ms.createTable(tbl);
-      } else {
-        final String catName = tbl.getCatName();
-        // Check that constraints have catalog name properly set first
-        if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys()) && 
!constraints.getPrimaryKeys().get(0).isSetCatName()) {
-          constraints.getPrimaryKeys().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getForeignKeys()) && 
!constraints.getForeignKeys().get(0).isSetCatName()) {
-          constraints.getForeignKeys().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints()) && 
!constraints.getUniqueConstraints().get(0).isSetCatName()) {
-          constraints.getUniqueConstraints().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints()) && 
!constraints.getNotNullConstraints().get(0).isSetCatName()) {
-          constraints.getNotNullConstraints().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints()) && 
!constraints.getDefaultConstraints().get(0).isSetCatName()) {
-          constraints.getDefaultConstraints().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints()) && 
!constraints.getCheckConstraints().get(0).isSetCatName()) {
-          constraints.getCheckConstraints().forEach(constraint -> 
constraint.setCatName(catName));
-        }
-        // Set constraint name if null before sending to listener
-        constraints = ms.createTableWithConstraints(tbl, constraints);
-
-      }
-
-      if (!transactionalListeners.isEmpty()) {
-        transactionalListenerResponses = 
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-            EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this, 
isReplicated), envContext);
-        if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_PRIMARYKEY,
-              new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), true, 
this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_FOREIGNKEY,
-              new AddForeignKeyEvent(constraints.getForeignKeys(), true, 
this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_UNIQUECONSTRAINT,
-              new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), 
true, this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_NOTNULLCONSTRAINT,
-              new 
AddNotNullConstraintEvent(constraints.getNotNullConstraints(), true, this), 
envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_CHECKCONSTRAINT,
-              new AddCheckConstraintEvent(constraints.getCheckConstraints(), 
true, this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.ADD_DEFAULTCONSTRAINT,
-              new 
AddDefaultConstraintEvent(constraints.getDefaultConstraints(), true, this), 
envContext);
-        }
-      }
-
-      success = ms.commitTransaction();
-    } finally {
-      if (!success) {
-        ms.rollbackTransaction();
-        if (madeDir) {
-          wh.deleteDir(tblPath, false, ReplChangeManager.shouldEnableCm(db, 
tbl));
-        }
-      }
-
-      if (!listeners.isEmpty()) {
-        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.CREATE_TABLE,
-            new CreateTableEvent(tbl, success, this, isReplicated), envContext,
-            transactionalListenerResponses, ms);
-        if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_PRIMARYKEY,
-              new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), success, 
this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_FOREIGNKEY,
-              new AddForeignKeyEvent(constraints.getForeignKeys(), success, 
this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_UNIQUECONSTRAINT,
-              new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), 
success, this), envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_NOTNULLCONSTRAINT,
-              new 
AddNotNullConstraintEvent(constraints.getNotNullConstraints(), success, this), 
envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_DEFAULTCONSTRAINT,
-              new 
AddDefaultConstraintEvent(constraints.getDefaultConstraints(), success, this), 
envContext);
-        }
-        if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventType.ADD_CHECKCONSTRAINT,
-              new AddCheckConstraintEvent(constraints.getCheckConstraints(), 
success, this), envContext);
-        }
-      }
-    }
-
-    // If the table has column statistics, update it into the metastore. We 
need a valid
-    // writeId list to update column statistics for a transactional table. But 
during bootstrap
-    // replication, where we use this feature, we do not have a valid writeId 
list which was
-    // used to update the stats. But we know for sure that the writeId 
associated with the
-    // stats was valid then (otherwise stats update would have failed on the 
source). So, craft
-    // a valid transaction list with only that writeId and use it to update 
the stats.
-    if (colStats != null) {
-      long writeId = tbl.getWriteId();
-      String validWriteIds = null;
-      if (writeId > 0) {
-        ValidWriteIdList validWriteIdList =
-            new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
-                tbl.getTableName()),
-                new long[0], new BitSet(), writeId);
-        validWriteIds = validWriteIdList.toString();
-      }
-      updateTableColumnStatsInternal(colStats, validWriteIds, 
tbl.getWriteId());
-    }
-  }
-
-  private String getTableSuffix(Table tbl) {
-    return tbl.isSetTxnId() && tbl.getParameters() != null 
-        && Boolean.parseBoolean(tbl.getParameters().get(SOFT_DELETE_TABLE)) ?
-      SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, tbl.getTxnId()) : 
"";
-  }
-
   @Override
   @Deprecated
   public void create_table(final Table tbl) throws AlreadyExistsException,
@@ -2329,8 +2042,8 @@ public void create_table_req(final CreateTableRequest req)
     boolean success = false;
     Exception ex = null;
     try {
-      create_table_core(getMS(), req);
-      success = true;
+      CreateTableHandler createTableOp = AbstractRequestHandler.offer(this, 
req);
+      success = createTableOp.success();
     } catch (Exception e) {
       LOG.warn("create_table_req got ", e);
       ex = e;
@@ -2711,8 +2424,8 @@ public AsyncOperationResp drop_table_req(DropTableRequest 
dropReq)
         ", async: " + dropReq.isAsyncDrop() + ", id: " + dropReq.getId());
     Exception ex = null;
     try {
-      DropTableHandler dropTableOp = AbstractOperationHandler.offer(this, 
dropReq);
-      AbstractOperationHandler.OperationStatus status = 
dropTableOp.getOperationStatus();
+      DropTableHandler dropTableOp = AbstractRequestHandler.offer(this, 
dropReq);
+      AbstractRequestHandler.RequestStatus status = 
dropTableOp.getRequestStatus();
       return status.toAsyncOperationResp();
     } catch (Exception e) {
       ex = e;
@@ -2726,119 +2439,6 @@ public AsyncOperationResp 
drop_table_req(DropTableRequest dropReq)
     }
   }
 
-  private void updateStatsForTruncate(Map<String,String> props, 
EnvironmentContext environmentContext) {
-    if (null == props) {
-      return;
-    }
-    for (String stat : StatsSetupConst.SUPPORTED_STATS) {
-      String statVal = props.get(stat);
-      if (statVal != null) {
-        //In the case of truncate table, we set the stats to be 0.
-        props.put(stat, "0");
-      }
-    }
-    //first set basic stats to true
-    StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
-    environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, 
StatsSetupConst.TASK);
-    
environmentContext.putToProperties(StatsSetupConst.DO_NOT_POPULATE_QUICK_STATS, 
StatsSetupConst.TRUE);
-    //then invalidate column stats
-    StatsSetupConst.clearColumnStatsState(props);
-    return;
-  }
-
-  private void alterPartitionsForTruncate(RawStore ms, String catName, String 
dbName, String tableName,
-      Table table, List<Partition> partitions, String validWriteIds, long 
writeId) throws Exception {
-    EnvironmentContext environmentContext = new EnvironmentContext();
-    if (partitions.isEmpty()) {
-      return;
-    }
-    List<List<String>> partValsList = new ArrayList<>();
-    for (Partition partition: partitions) {
-      updateStatsForTruncate(partition.getParameters(), environmentContext);
-      if (writeId > 0) {
-        partition.setWriteId(writeId);
-      }
-      partition.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(System
-          .currentTimeMillis() / 1000));
-      partValsList.add(partition.getValues());
-    }
-    ms.alterPartitions(catName, dbName, tableName, partValsList, partitions, 
writeId, validWriteIds);
-    if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
-      boolean shouldSendSingleEvent = MetastoreConf.getBoolVar(this.getConf(),
-          MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED);
-      if (shouldSendSingleEvent) {
-        MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ALTER_PARTITIONS,
-            new AlterPartitionsEvent(partitions, partitions, table, true, 
true, this), environmentContext);
-      } else {
-        for (Partition partition : partitions) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ALTER_PARTITION,
-              new AlterPartitionEvent(partition, partition, table, true, true, 
partition.getWriteId(), this),
-              environmentContext);
-        }
-      }
-    }
-    if (listeners != null && !listeners.isEmpty()) {
-      boolean shouldSendSingleEvent = MetastoreConf.getBoolVar(this.getConf(),
-          MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED);
-      if (shouldSendSingleEvent) {
-        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ALTER_PARTITIONS,
-            new AlterPartitionsEvent(partitions, partitions, table, true, 
true, this), environmentContext);
-      } else {
-        for (Partition partition : partitions) {
-          MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ALTER_PARTITION,
-              new AlterPartitionEvent(partition, partition, table, true, true, 
partition.getWriteId(), this),
-              environmentContext);
-        }
-      }
-    }
-  }
-
-  private void alterTableStatsForTruncate(RawStore ms, String catName, String 
dbName,
-      String tableName, Table table, List<Partition> partitionsList,
-      String validWriteIds, long writeId) throws Exception {
-    if (0 != table.getPartitionKeysSize()) {
-      alterPartitionsForTruncate(ms, catName, dbName, tableName, table, 
partitionsList,
-          validWriteIds, writeId);
-    } else {
-      EnvironmentContext environmentContext = new EnvironmentContext();
-      updateStatsForTruncate(table.getParameters(), environmentContext);
-      boolean isReplicated = isDbReplicationTarget(ms.getDatabase(catName, 
dbName));
-      if (!transactionalListeners.isEmpty()) {
-        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-            EventType.ALTER_TABLE,
-            new AlterTableEvent(table, table, true, true,
-                writeId, this, isReplicated));
-      }
-
-      if (!listeners.isEmpty()) {
-        MetaStoreListenerNotifier.notifyEvent(listeners,
-            EventType.ALTER_TABLE,
-            new AlterTableEvent(table, table, true, true,
-                writeId, this, isReplicated));
-      }
-      // TODO: this should actually pass thru and set writeId for txn stats.
-      if (writeId > 0) {
-        table.setWriteId(writeId);
-      }
-      ms.alterTable(catName, dbName, tableName, table, validWriteIds);
-    }
-    return;
-  }
-
-  private List<Path> getLocationsForTruncate(final RawStore ms, final String 
catName,
-      final String dbName, final String tableName, final Table table,
-      List<Partition> partitionsList)  throws Exception {
-    List<Path> locations = new ArrayList<>();
-    if (0 != table.getPartitionKeysSize()) {
-      for (Partition partition : partitionsList) {
-        locations.add(new Path(partition.getSd().getLocation()));
-      }
-    } else {
-      locations.add(new Path(table.getSd().getLocation()));
-    }
-    return locations;
-  }
-
   @Override
   public CmRecycleResponse cm_recycle(final CmRecycleRequest request) throws 
MetaException {
     wh.recycleDirToCmPath(new Path(request.getDataPath()), request.isPurge());
@@ -2857,109 +2457,23 @@ public void truncate_table(final String dbName, final 
String tableName, List<Str
   @Override
   public TruncateTableResponse truncate_table_req(TruncateTableRequest req)
       throws NoSuchObjectException, MetaException {
-    truncateTableInternal(req.getDbName(), req.getTableName(), 
req.getPartNames(),
-        req.getValidWriteIdList(), req.getWriteId(), 
req.getEnvironmentContext());
-    return new TruncateTableResponse();
-  }
-
-  private void truncateTableInternal(String dbName, String tableName, 
List<String> partNames,
-      String validWriteIds, long writeId, EnvironmentContext context) throws 
MetaException, NoSuchObjectException {
-    boolean isSkipTrash = false, needCmRecycle = false;
+    String[] parsedDbName = parseDbName(req.getDbName(), getConf());
+    startFunction("truncate_table_req",
+        ": db=" + parsedDbName[DB_NAME] + " tab=" + req.getTableName());
+    Exception ex = null;
+    boolean success = false;
     try {
-      String[] parsedDbName = parseDbName(dbName, conf);
-      GetTableRequest getTableRequest = new 
GetTableRequest(parsedDbName[DB_NAME], tableName);
-      getTableRequest.setCatName(parsedDbName[CAT_NAME]);
-      Table tbl = get_table_core(getTableRequest);
-
-      boolean skipDataDeletion = Optional.ofNullable(context)
-          .map(EnvironmentContext::getProperties)
-          .map(prop -> prop.get(TRUNCATE_SKIP_DATA_DELETION))
-          .map(Boolean::parseBoolean)
-          .orElse(false);
-      List<Partition> partitionsList = new ArrayList<>();
-      if (partNames == null) {
-        if (0 != tbl.getPartitionKeysSize()) {
-          partitionsList = getMS().getPartitions(parsedDbName[CAT_NAME], 
parsedDbName[DB_NAME],
-              tableName, GetPartitionsArgs.getAllPartitions());
-        }
-      } else {
-        partitionsList = getMS().getPartitionsByNames(parsedDbName[CAT_NAME], 
parsedDbName[DB_NAME],
-            tableName, partNames);
-      }
-      if (TxnUtils.isTransactionalTable(tbl) || !skipDataDeletion) {
-        if (!skipDataDeletion) {
-          isSkipTrash = MetaStoreUtils.isSkipTrash(tbl.getParameters());
-          
-          Database db = get_database_core(parsedDbName[CAT_NAME], 
parsedDbName[DB_NAME]);
-          needCmRecycle = ReplChangeManager.shouldEnableCm(db, tbl);
-        }
-        // This is not transactional
-        for (Path location : getLocationsForTruncate(getMS(), 
parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
-            tbl, partitionsList)) {
-          if (!skipDataDeletion) {
-            truncateDataFiles(location, isSkipTrash, needCmRecycle);
-          } else {
-            // For Acid tables we don't need to delete the old files, only 
write an empty baseDir.
-            // Compaction and cleaner will take care of the rest
-            addTruncateBaseFile(location, writeId, conf, DataFormat.TRUNCATED);
-          }
-        }
-      }
-
-      // Alter the table/partition stats and also notify truncate table event
-      alterTableStatsForTruncate(getMS(), parsedDbName[CAT_NAME], 
parsedDbName[DB_NAME],
-          tableName, tbl, partitionsList, validWriteIds, writeId);
+      TruncateTableHandler truncateTable = AbstractRequestHandler.offer(this, 
req);
+      success = truncateTable.success();
+      return new TruncateTableResponse();
     } catch (Exception e) {
+      ex = e;
       throw handleException(e).throwIfInstance(MetaException.class, 
NoSuchObjectException.class)
           .convertIfInstance(IOException.class, MetaException.class)
           .defaultMetaException();
-    }
-  }
-
-  /**
-   * Add an empty baseDir with a truncate metadatafile
-   * @param location partition or table directory
-   * @param writeId allocated writeId
-   * @throws MetaException
-   */
-  public static void addTruncateBaseFile(Path location, long writeId, 
Configuration conf, DataFormat dataFormat)
-      throws MetaException {
-    if (location == null) 
-      return;
-    
-    Path basePath = new Path(location, AcidConstants.baseDir(writeId));
-    try {
-      FileSystem fs = location.getFileSystem(conf);
-      fs.mkdirs(basePath);
-      // We can not leave the folder empty, otherwise it will be skipped at 
some file listing in AcidUtils
-      // No need for a data file, a simple metadata is enough
-      AcidMetaDataFile.writeToFile(fs, basePath, dataFormat);
-    } catch (Exception e) {
-      throw newMetaException(e);
-    }
-  }
-
-  private void truncateDataFiles(Path location, boolean isSkipTrash, boolean 
needCmRecycle)
-      throws IOException, MetaException {
-    FileSystem fs = location.getFileSystem(getConf());
-    
-    if (!HdfsUtils.isPathEncrypted(getConf(), fs.getUri(), location) &&
-        !FileUtils.pathHasSnapshotSubDir(location, fs)) {
-      HdfsUtils.HadoopFileStatus status = new 
HdfsUtils.HadoopFileStatus(getConf(), fs, location);
-      FileStatus targetStatus = fs.getFileStatus(location);
-      String targetGroup = targetStatus == null ? null : 
targetStatus.getGroup();
-      
-      wh.deleteDir(location, isSkipTrash, needCmRecycle);
-      fs.mkdirs(location);
-      HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, 
location, false);
-    } else {
-      FileStatus[] statuses = fs.listStatus(location, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-      if (statuses == null || statuses.length == 0) {
-        return;
-      }
-      for (final FileStatus status : statuses) {
-        wh.deleteDir(status.getPath(), isSkipTrash, needCmRecycle);
-      }
+    } finally {
+      endFunction("truncate_table_req", success, ex,
+          TableName.getQualified(parsedDbName[CAT_NAME], 
parsedDbName[DB_NAME], req.getTableName()));
     }
   }
 
@@ -3565,7 +3079,7 @@ public AddPartitionsResult 
add_partitions_req(AddPartitionsRequest request)
     Exception ex = null;
     try {
       // Make sure all the partitions have the catalog set as well
-      AddPartitionsHandler addPartsOp = AbstractOperationHandler.offer(this, 
request);
+      AddPartitionsHandler addPartsOp = AbstractRequestHandler.offer(this, 
request);
       if (addPartsOp.success() && request.isNeedResult()) {
         AddPartitionsHandler.AddPartitionsResult addPartsResult = 
addPartsOp.getResult();
         if (request.isSkipColumnSchemaForPartition()) {
@@ -3931,7 +3445,7 @@ public DropPartitionsResult drop_partitions_req(
     Exception ex = null;
     try {
       DropPartitionsResult resp = new DropPartitionsResult();
-      DropPartitionsHandler dropPartsOp = AbstractOperationHandler.offer(this, 
request);
+      DropPartitionsHandler dropPartsOp = AbstractRequestHandler.offer(this, 
request);
       if (dropPartsOp.success() && request.isNeedResult()) {
         resp.setPartitions(dropPartsOp.getResult().getPartitions());
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index a0f8c06e995..88ee4a4b8c5 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -68,10 +68,10 @@
 import java.util.LinkedList;
 import java.util.Optional;
 
-import static org.apache.hadoop.hive.metastore.HMSHandler.addTruncateBaseFile;
 import static org.apache.hadoop.hive.metastore.HiveMetaHook.ALTERLOCATION;
 import static 
org.apache.hadoop.hive.metastore.HiveMetaHook.ALTER_TABLE_OPERATION_TYPE;
 import static 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.RENAME_PARTITION_MAKE_COPY;
+import static 
org.apache.hadoop.hive.metastore.handler.TruncateTableHandler.addTruncateBaseFile;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.findStaleColumns;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
index 36fc6fb6d03..3d8c21f0755 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
@@ -107,5 +107,7 @@ Table get_table_core(final GetTableRequest getTableRequest)
   DataConnector get_dataconnector_core(final String name)
       throws NoSuchObjectException, MetaException;
 
-    AbortCompactResponse abort_Compactions(AbortCompactionRequest rqst) throws 
TException;
+  AbortCompactResponse abort_Compactions(AbortCompactionRequest rqst) throws 
TException;
+
+  IMetaStoreMetadataTransformer getMetadataTransformer();
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractOperationHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
similarity index 60%
rename from 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractOperationHandler.java
rename to 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
index 6ae548ee101..c00d23e5149 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractOperationHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
@@ -23,7 +23,9 @@
 import com.google.common.util.concurrent.MoreExecutors;
 
 import java.io.IOException;
+import java.lang.reflect.Modifier;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,65 +37,60 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
-import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
 import org.apache.hadoop.hive.metastore.api.AsyncOperationResp;
-import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
-import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
-import org.apache.hadoop.hive.metastore.api.DropTableRequest;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
+import org.reflections.Reflections;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.commons.lang3.reflect.MethodUtils.invokeMethod;
 import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
 import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+import static org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance;
 
-public abstract class AbstractOperationHandler<T extends TBase, A extends 
AbstractOperationHandler.Result> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractOperationHandler.class);
-  private static final Map<String, AbstractOperationHandler> OPID_TO_HANDLER = 
new ConcurrentHashMap<>();
-  private static final ScheduledExecutorService OPID_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
+public abstract class AbstractRequestHandler<T extends TBase, A extends 
AbstractRequestHandler.Result> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRequestHandler.class);
+  private static final Map<String, AbstractRequestHandler> ID_TO_HANDLER = new 
ConcurrentHashMap<>();
+  private static final AtomicLong ID_GEN = new AtomicLong(0);
+  private static final ScheduledExecutorService REQUEST_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
     Thread thread = new Thread(r);
     thread.setDaemon(true);
-    thread.setName("OperationHandler-Cleaner");
+    thread.setName("RequestHandler-Cleaner");
     return thread;
   });
 
   private static final Map<Class<? extends TBase>, HandlerFactory> 
REQ_FACTORIES = new ConcurrentHashMap<>();
   static {
-    REQ_FACTORIES.put(DropTableRequest.class, (base, request) -> {
-      DropTableRequest req = (DropTableRequest) request;
-      AbstractOperationHandler opHandler = ofCache(req.getId(), 
req.isCancel());
-      if (opHandler == null) {
-        opHandler = new DropTableHandler(base, req);
-      }
-      return opHandler;
-    });
-
-    REQ_FACTORIES.put(DropDatabaseRequest.class, (base, request) -> {
-      DropDatabaseRequest req = (DropDatabaseRequest) request;
-      AbstractOperationHandler opHandler = ofCache(req.getId(), 
req.isCancel());
-      if (opHandler == null) {
-        opHandler = new DropDatabaseHandler(base, req);
+    Set<Class<? extends AbstractRequestHandler>> handlerClasses =
+        new 
Reflections("org.apache.hadoop.hive.metastore.handler").getSubTypesOf(AbstractRequestHandler.class);
+    for (Class<? extends AbstractRequestHandler> clz : handlerClasses) {
+      if (validateHandler(clz)) {
+        RequestHandler handler = clz.getAnnotation(RequestHandler.class);
+        Class<? extends TBase> requestBody = handler.requestBody();
+        REQ_FACTORIES.put(requestBody, (base, request) -> {
+          AbstractRequestHandler opHandler = null;
+          if (handler.supportAsync()) {
+            opHandler = ofCache((String) invokeMethod(request, handler.id()),
+                (boolean) invokeMethod(request, handler.cancel()));
+          }
+          if (opHandler == null) {
+            opHandler = newInstance(clz, new Class[] { IHMSHandler.class, 
requestBody },
+                new Object[] { base, request });
+          }
+          return opHandler;
+        });
       }
-      return opHandler;
-    });
-
-    REQ_FACTORIES.put(DropPartitionsRequest.class, (base, request) -> {
-      DropPartitionsRequest req = (DropPartitionsRequest) request;
-      return new DropPartitionsHandler(base, req);
-    });
-
-    REQ_FACTORIES.put(AddPartitionsRequest.class, (base, request) -> {
-      AddPartitionsRequest req = (AddPartitionsRequest) request;
-      return new AddPartitionsHandler(base, req);
-    });
+    }
   }
 
   private Result result;
@@ -107,37 +104,41 @@ public abstract class AbstractOperationHandler<T extends 
TBase, A extends Abstra
   protected final String id;
   private long timeout;
 
-  private AbstractOperationHandler(String id) {
+  private AbstractRequestHandler(String id) {
     this.id = id;
   }
 
-  AbstractOperationHandler(IHMSHandler handler, boolean async, T request) {
-    this.id = UUID.randomUUID().toString();
+  AbstractRequestHandler(IHMSHandler handler, boolean async, T request) {
+    this.id = UUID.randomUUID() + "-" + ID_GEN.incrementAndGet();
     this.handler = handler;
     this.request = request;
     this.async = async;
     this.timeout = MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) ? 
10 : 5000;
-    final Timer.Context timerContext;
-    if (getHandlerAlias() != null) {
-      Timer timer = Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + 
getHandlerAlias());
-      timerContext = timer != null ? timer.time() : null;
-    } else {
-      timerContext = null;
-    }
 
     if (async) {
-      OPID_TO_HANDLER.put(id, this);
+      ID_TO_HANDLER.put(id, this);
       this.executor = Executors.newFixedThreadPool(1, r -> {
         Thread thread = new Thread(r);
         thread.setDaemon(true);
-        thread.setName("OperationHandler[" + id + "]");
+        thread.setName("RequestHandler[" + id + "]");
         return thread;
       });
     } else {
       this.executor = MoreExecutors.newDirectExecutorService();
     }
+    this.future = executeRequest();
+  }
+
+  private Future<Result> executeRequest() {
+    Timer.Context timerContext;
+    if (StringUtils.isNotEmpty(getMetricAlias())) {
+      Timer timer = Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + 
getMetricAlias());
+      timerContext = timer != null ? timer.time() : null;
+    } else {
+      timerContext = null;
+    }
 
-    this.future =  executor.submit(() -> {
+    Future<Result> resultFuture = executor.submit(() -> {
       A resultV = null;
       beforeExecute();
       try {
@@ -145,7 +146,7 @@ private AbstractOperationHandler(String id) {
       } finally {
         try {
           if (async) {
-            OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
+            REQUEST_CLEANER.schedule(() -> ID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
           }
           afterExecute(resultV);
         } finally {
@@ -156,26 +157,27 @@ private AbstractOperationHandler(String id) {
       }
       return async ? resultV.shrinkIfNecessary() : resultV;
     });
-    this.executor.shutdown();
+    executor.shutdown();
+    return resultFuture;
   }
 
-  private static <T extends TBase, A extends Result> 
AbstractOperationHandler<T, A>
-      ofCache(String opId, boolean shouldCancel) throws TException {
-    AbstractOperationHandler<T, A> opHandler = null;
-    if (opId != null) {
-      opHandler = OPID_TO_HANDLER.get(opId);
+  private static <T extends TBase, A extends Result> AbstractRequestHandler<T, 
A>
+      ofCache(String reqId, boolean shouldCancel) throws TException {
+    AbstractRequestHandler<T, A> opHandler = null;
+    if (reqId != null) {
+      opHandler = ID_TO_HANDLER.get(reqId);
       if (opHandler == null && !shouldCancel) {
-        throw new MetaException("Couldn't find the async operation handler: " 
+ opId);
+        throw new MetaException("Couldn't find the async request handler: " + 
reqId);
       }
       if (shouldCancel) {
         if (opHandler != null) {
-          opHandler.cancelOperation();
+          opHandler.cancelRequest();
         } else {
-          opHandler = new AbstractOperationHandler<>(opId) {
+          opHandler = new AbstractRequestHandler<>(reqId) {
             @Override
-            public OperationStatus getOperationStatus() throws TException {
-              OperationStatus resp = new OperationStatus(opId);
-              resp.setMessage("Operation has been canceled");
+            public RequestStatus getRequestStatus() throws TException {
+              RequestStatus resp = new RequestStatus(reqId);
+              resp.setMessage("Request has been canceled");
               resp.setFinished(true);
               return resp;
             }
@@ -188,7 +190,7 @@ public String getMessagePrefix() {
               throw new UnsupportedOperationException();
             }
             @Override
-            public String getProgress() {
+            public String getRequestProgress() {
               throw new UnsupportedOperationException();
             }
           };
@@ -198,27 +200,33 @@ public String getProgress() {
     return opHandler;
   }
 
-  public static <T extends AbstractOperationHandler> T offer(IHMSHandler 
handler, TBase req)
+  public static <T extends AbstractRequestHandler> T offer(IHMSHandler 
handler, TBase req)
       throws TException, IOException {
     HandlerFactory factory = REQ_FACTORIES.get(req.getClass());
     if (factory != null) {
-      return (T) factory.create(handler, req);
+      try {
+        return (T) factory.create(handler, req);
+      } catch (Exception e) {
+        throw handleException(e)
+            .throwIfInstance(TException.class, IOException.class)
+            .defaultTException();
+      }
     }
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
-  public OperationStatus getOperationStatus() throws TException {
+  public RequestStatus getRequestStatus() throws TException {
     String logMsgPrefix = getMessagePrefix();
     if (future == null) {
       throw new IllegalStateException(logMsgPrefix + " hasn't started yet");
     }
 
-    OperationStatus resp = new OperationStatus(id);
+    RequestStatus resp = new RequestStatus(id);
     if (future.isDone()) {
       resp.setFinished(true);
       resp.setMessage(logMsgPrefix + (future.isCancelled() ? " Canceled" : " 
Done"));
     } else {
-      resp.setMessage(logMsgPrefix + " In-progress, state - " + getProgress());
+      resp.setMessage(logMsgPrefix + " In-progress, state - " + 
getRequestProgress());
     }
     
     try {
@@ -241,43 +249,28 @@ public OperationStatus getOperationStatus() throws 
TException {
     return resp;
   }
 
-  public static class OperationStatus {
-    private final String id;
-    private String message;
-    private boolean finished;
-    OperationStatus(String id) {
+  public static class RequestStatus {
+    final String id;
+    String message;
+    boolean finished;
+    RequestStatus(String id) {
       this.id = id;
     }
-
-    public String getMessage() {
-      return message;
-    }
-
     public void setMessage(String message) {
       this.message = message;
     }
-
-    public String getId() {
-      return id;
-    }
-
-    public boolean isFinished() {
-      return finished;
-    }
-
     public void setFinished(boolean finished) {
       this.finished = finished;
     }
-
     public AsyncOperationResp toAsyncOperationResp() {
-      AsyncOperationResp resp = new AsyncOperationResp(getId());
-      resp.setFinished(isFinished());
-      resp.setMessage(getMessage());
+      AsyncOperationResp resp = new AsyncOperationResp(id);
+      resp.setFinished(finished);
+      resp.setMessage(message);
       return resp;
     }
   }
 
-  public void cancelOperation() {
+  public void cancelRequest() {
     if (!future.isDone()) {
       future.cancel(true);
       aborted.set(true);
@@ -293,8 +286,8 @@ public void cancelOperation() {
    * @throws TException exception while checking the status of the operation
    */
   public final A getResult() throws TException {
-    OperationStatus resp = getOperationStatus();
-    if (!resp.isFinished()) {
+    RequestStatus resp = getRequestStatus();
+    if (!resp.finished) {
       throw new IllegalStateException("Result is un-available as " +
           getMessagePrefix() + " is still running");
     }
@@ -303,8 +296,7 @@ public final A getResult() throws TException {
 
   /**
    * Method invoked prior to executing the given operation.
-   * This method may be used to initialize and validate the operation and
-   * executed at the same thread as the caller.
+   * This method may be used to initialize and validate the operation.
    * @throws TException
    */
   protected void beforeExecute() throws TException, IOException {
@@ -319,6 +311,15 @@ protected void beforeExecute() throws TException, 
IOException {
    */
   protected abstract A execute() throws TException, IOException;
 
+  /**
+   * Method after the operation is done.
+   * Can be used to free the resources this handler holds
+   */
+  protected void afterExecute(A result) throws TException, IOException {
+    handler = null;
+    request = null;
+  }
+
   /**
    * Get the prefix for logging the message on polling the operation status.
    *
@@ -331,19 +332,20 @@ protected void beforeExecute() throws TException, 
IOException {
    *
    * @return the progress
    */
-  protected abstract String getProgress();
+  protected abstract String getRequestProgress();
 
   public boolean success() throws TException {
-    OperationStatus status = getOperationStatus();
-    return status.isFinished() && result != null && result.success();
+    RequestStatus status = getRequestStatus();
+    return status.finished && result != null && result.success();
   }
 
   /**
    * Get the alias of this handler for metrics.
-   * @return the alias, null if no need to measure the operation.
+   * @return the alias, null or empty if no need to measure the operation.
    */
-  protected String getHandlerAlias() {
-    return null;
+  private String getMetricAlias() {
+    RequestHandler rh = getClass().getAnnotation(RequestHandler.class);
+    return rh != null ? rh.metricAlias() : null;
   }
 
   public void checkInterrupted() throws MetaException {
@@ -352,22 +354,13 @@ public void checkInterrupted() throws MetaException {
     }
   }
 
-  /**
-   * Method after the operation is done.
-   * Can be used to free the resources this handler holds
-   */
-  protected void afterExecute(A result) throws MetaException, IOException {
-    handler = null;
-    request = null;
-  }
-
   @VisibleForTesting
-  public static boolean containsOp(String opId) {
-    return OPID_TO_HANDLER.containsKey(opId);
+  public static boolean containsRequest(String reqId) {
+    return ID_TO_HANDLER.containsKey(reqId);
   }
 
-  public interface HandlerFactory {
-    AbstractOperationHandler create(IHMSHandler base, TBase req) throws 
TException, IOException;
+  interface HandlerFactory {
+    AbstractRequestHandler create(IHMSHandler base, TBase req) throws 
Exception;
   }
 
   public interface Result {
@@ -386,4 +379,31 @@ default Result shrinkIfNecessary() {
       return this;
     }
   }
+
+  private static boolean validateHandler(Class<? extends 
AbstractRequestHandler> clz) {
+    if (Modifier.isAbstract(clz.getModifiers())) {
+      return false;
+    }
+    RequestHandler handler = clz.getAnnotation(RequestHandler.class);
+    if (handler == null) {
+      LOG.info("Ignore this Handler: {} as it cannot handle the request", clz);
+      return false;
+    }
+    Class<? extends TBase> requestBody = handler.requestBody();
+    if (requestBody == null) {
+      LOG.info("Ignore this Handler: {} as it hasn't declared the request 
body", clz);
+      return false;
+    }
+    // check the definition of the handler
+    try {
+      clz.getDeclaredConstructor(IHMSHandler.class, requestBody);
+      if (handler.supportAsync()) {
+        MethodUtils.getMatchingAccessibleMethod(requestBody, handler.id());
+        MethodUtils.getMatchingAccessibleMethod(requestBody, handler.cancel());
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(clz + " is not a satisfied handler as it's 
declared to be", e);
+    }
+    return true;
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
index b1da7b026f6..4302c5e84b1 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
@@ -76,8 +76,9 @@
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
+@RequestHandler(requestBody = AddPartitionsRequest.class)
 public class AddPartitionsHandler
-    extends AbstractOperationHandler<AddPartitionsRequest, 
AddPartitionsHandler.AddPartitionsResult> {
+    extends AbstractRequestHandler<AddPartitionsRequest, 
AddPartitionsHandler.AddPartitionsResult> {
   private TableName tableName;
   private Table table;
   private Database db;
@@ -541,7 +542,7 @@ protected String getMessagePrefix() {
   }
 
   @Override
-  protected String getProgress() {
+  protected String getRequestProgress() {
     return "Adding partitions";
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
new file mode 100644
index 00000000000..35438f3b466
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.IMetaStoreMetadataTransformer;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SQLAllTableConstraints;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.events.AddCheckConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.AddDefaultConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
+import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent;
+import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS;
+import static 
org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX;
+import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
+import static 
org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
+import static 
org.apache.hadoop.hive.metastore.Warehouse.getCatalogQualifiedTableName;
+import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.CTAS_LEGACY_CONFIG;
+import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
+import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTLT;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@RequestHandler(requestBody = CreateTableRequest.class)
+public class CreateTableHandler
+    extends AbstractRequestHandler<CreateTableRequest, 
CreateTableHandler.CreateTableResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CreateTableHandler.class);
+  private SQLAllTableConstraints constraints;
+  private RawStore rs;
+  private Table tbl;
+  private Database db;
+  private Warehouse wh;
+  private ColumnStatistics colStats;
+  private EnvironmentContext envContext;
+
+  CreateTableHandler(IHMSHandler handler, CreateTableRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected CreateTableResult execute() throws TException, IOException {
+    Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
+    Path tblPath = null;
+    boolean success = false, madeDir = false;
+    boolean isReplicated;
+    try {
+      rs.openTransaction();
+      db = rs.getDatabase(tbl.getCatName(), tbl.getDbName());
+      isReplicated = isDbReplicationTarget(db);
+
+      ((HMSHandler)handler).firePreEvent(new PreCreateTableEvent(tbl, db, 
handler));
+
+      if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
+        if (tbl.getSd().getLocation() == null
+            || tbl.getSd().getLocation().isEmpty()) {
+          tblPath = wh.getDefaultTablePath(db, tbl.getTableName() + 
getTableSuffix(tbl),
+              MetaStoreUtils.isExternalTable(tbl));
+        } else {
+          if (!MetaStoreUtils.isExternalTable(tbl) && 
!MetaStoreUtils.isNonNativeTable(tbl)) {
+            LOG.warn("Location: " + tbl.getSd().getLocation()
+                + " specified for non-external table:" + tbl.getTableName());
+          }
+          tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
+          // ignore suffix if it's already there (direct-write CTAS)
+          if (!tblPath.getName().matches("(.*)" + SOFT_DELETE_TABLE_PATTERN)) {
+            tblPath = new Path(tblPath + getTableSuffix(tbl));
+          }
+        }
+        tbl.getSd().setLocation(tblPath.toString());
+      }
+
+      if (tblPath != null) {
+        if (!wh.isDir(tblPath)) {
+          if (!wh.mkdirs(tblPath)) {
+            throw new MetaException(tblPath
+                + " is not a directory or unable to create one");
+          }
+          madeDir = true;
+        }
+      }
+
+      MetaStoreServerUtils.updateTableStatsForCreateTable(wh, db, tbl,
+          request.getEnvContext(), handler.getConf(), tblPath, madeDir);
+
+      // set create time
+      long time = System.currentTimeMillis() / 1000;
+      tbl.setCreateTime((int) time);
+      if (tbl.getParameters() == null ||
+          tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
+        tbl.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(time));
+      }
+      if (CollectionUtils.isEmpty(constraints.getPrimaryKeys())
+          && CollectionUtils.isEmpty(constraints.getForeignKeys())
+          && CollectionUtils.isEmpty(constraints.getUniqueConstraints())
+          && CollectionUtils.isEmpty(constraints.getNotNullConstraints())
+          && CollectionUtils.isEmpty(constraints.getDefaultConstraints())
+          && CollectionUtils.isEmpty(constraints.getCheckConstraints())) {
+        rs.createTable(tbl);
+      } else {
+        // Set constraint name if null before sending to listener
+        constraints = rs.createTableWithConstraints(tbl, constraints);
+      }
+
+      List<TransactionalMetaStoreEventListener> transactionalListeners = 
handler.getTransactionalListeners();
+      if (!transactionalListeners.isEmpty()) {
+        transactionalListenerResponses = 
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+            EventMessage.EventType.CREATE_TABLE, new CreateTableEvent(tbl, 
true, handler, isReplicated), envContext);
+        if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_PRIMARYKEY,
+              new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), true, 
handler), envContext);
+        }
+        if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_FOREIGNKEY,
+              new AddForeignKeyEvent(constraints.getForeignKeys(), true, 
handler), envContext);
+        }
+        if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_UNIQUECONSTRAINT,
+              new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), 
true, handler), envContext);
+        }
+        if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_NOTNULLCONSTRAINT,
+              new 
AddNotNullConstraintEvent(constraints.getNotNullConstraints(), true, handler), 
envContext);
+        }
+        if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_CHECKCONSTRAINT,
+              new AddCheckConstraintEvent(constraints.getCheckConstraints(), 
true, handler), envContext);
+        }
+        if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventMessage.EventType.ADD_DEFAULTCONSTRAINT,
+              new 
AddDefaultConstraintEvent(constraints.getDefaultConstraints(), true, handler), 
envContext);
+        }
+      }
+      success = rs.commitTransaction();
+    } finally {
+      if (!success) {
+        rs.rollbackTransaction();
+        if (madeDir) {
+          wh.deleteDir(tblPath, false, ReplChangeManager.shouldEnableCm(db, 
tbl));
+        }
+      }
+    }
+    return new CreateTableResult(success, transactionalListenerResponses);
+  }
+
+  private String getTableSuffix(Table tbl) {
+    return tbl.isSetTxnId() && tbl.getParameters() != null
+        && Boolean.parseBoolean(tbl.getParameters().get(SOFT_DELETE_TABLE)) ?
+        SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, tbl.getTxnId()) 
: "";
+  }
+
+  @Override
+  protected void afterExecute(CreateTableResult result) throws TException, 
IOException {
+    boolean success = result != null && result.success;
+    List<MetaStoreEventListener> listeners = handler.getListeners();
+    if (!listeners.isEmpty()) {
+      boolean isReplicated = isDbReplicationTarget(db);
+      MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.CREATE_TABLE,
+          new CreateTableEvent(tbl, success, handler, isReplicated), 
envContext,
+          result != null ? result.transactionalListenerResponses : 
Collections.emptyMap(), rs);
+      if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_PRIMARYKEY,
+            new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), success, 
handler), envContext);
+      }
+      if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_FOREIGNKEY,
+            new AddForeignKeyEvent(constraints.getForeignKeys(), success, 
handler), envContext);
+      }
+      if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_UNIQUECONSTRAINT,
+            new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), 
success, handler), envContext);
+      }
+      if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_NOTNULLCONSTRAINT,
+            new AddNotNullConstraintEvent(constraints.getNotNullConstraints(), 
success, handler), envContext);
+      }
+      if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_DEFAULTCONSTRAINT,
+            new AddDefaultConstraintEvent(constraints.getDefaultConstraints(), 
success, handler), envContext);
+      }
+      if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) {
+        MetaStoreListenerNotifier.notifyEvent(listeners, 
EventMessage.EventType.ADD_CHECKCONSTRAINT,
+            new AddCheckConstraintEvent(constraints.getCheckConstraints(), 
success, handler), envContext);
+      }
+    }
+
+    if (success && colStats != null) {
+      // If the table has column statistics, update it into the metastore. We 
need a valid
+      // writeId list to update column statistics for a transactional table. 
But during bootstrap
+      // replication, where we use this feature, we do not have a valid 
writeId list which was
+      // used to update the stats. But we know for sure that the writeId 
associated with the
+      // stats was valid then (otherwise stats update would have failed on the 
source). So, craft
+      // a valid transaction list with only that writeId and use it to update 
the stats.
+      long writeId = tbl.getWriteId();
+      String validWriteIds = null;
+      if (writeId > 0) {
+        ValidWriteIdList validWriteIdList =
+            new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+                tbl.getTableName()),
+                new long[0], new BitSet(), writeId);
+        validWriteIds = validWriteIdList.toString();
+      }
+      updateTableColumnStatsInternal(colStats, validWriteIds, 
tbl.getWriteId());
+    }
+  }
+
+  private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
+      String validWriteIds, long writeId)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, 
InvalidInputException {
+    normalizeColStatsInput(colStats);
+    Map<String, String> parameters = null;
+    rs.openTransaction();
+    boolean committed = false;
+    try {
+      parameters = rs.updateTableColumnStatistics(colStats, validWriteIds, 
writeId);
+      if (parameters != null) {
+        Table tableObj = rs.getTable(colStats.getStatsDesc().getCatName(),
+            colStats.getStatsDesc().getDbName(),
+            colStats.getStatsDesc().getTableName(), validWriteIds);
+        if (!handler.getTransactionalListeners().isEmpty()) {
+          
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+        if (!handler.getListeners().isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+      }
+      committed = rs.commitTransaction();
+    } finally {
+      if (!committed) {
+        rs.rollbackTransaction();
+      }
+    }
+
+    return parameters != null;
+  }
+
+  private void normalizeColStatsInput(ColumnStatistics colStats) throws 
MetaException {
+    // TODO: is this really needed? this code is propagated from HIVE-1362 but 
most of it is useless.
+    ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+    statsDesc.setCatName(statsDesc.isSetCatName() ? 
statsDesc.getCatName().toLowerCase() :
+        getDefaultCatalog(handler.getConf()));
+    statsDesc.setDbName(statsDesc.getDbName().toLowerCase());
+    statsDesc.setTableName(statsDesc.getTableName().toLowerCase());
+    statsDesc.setPartName(statsDesc.getPartName());
+    long time = System.currentTimeMillis() / 1000;
+    statsDesc.setLastAnalyzed(time);
+
+    for (ColumnStatisticsObj statsObj : colStats.getStatsObj()) {
+      statsObj.setColName(statsObj.getColName().toLowerCase());
+      statsObj.setColType(statsObj.getColType().toLowerCase());
+    }
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(colStats.getStatsObj());
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.tbl = request.getTable();
+    this.rs = handler.getMS();
+    this.wh = handler.getWh();
+    this.envContext = request.getEnvContext();
+    // To preserve backward compatibility throw MetaException in case of null 
database
+    if (tbl.getDbName() == null) {
+      throw new MetaException("Null database name is not allowed");
+    }
+    if (!MetaStoreUtils.validateName(tbl.getTableName(), handler.getConf())) {
+      throw new InvalidObjectException(tbl.getTableName()
+          + " is not a valid object name");
+    }
+    if (!MetaStoreUtils.validateTblStorage(tbl.getSd())) {
+      throw new InvalidObjectException(tbl.getTableName()
+          + " location must not be root path");
+    }
+    if (!tbl.isSetCatName()) {
+      tbl.setCatName(getDefaultCatalog(handler.getConf()));
+    }
+
+    this.db = rs.getDatabase(tbl.getCatName(), tbl.getDbName());
+    if (MetaStoreUtils.isDatabaseRemote(db)) {
+      // HIVE-24425: Create table in REMOTE db should fail
+      throw new MetaException("Create table in REMOTE database " + 
db.getName() + " is not allowed");
+    }
+
+    if (rs.getTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName()) != 
null) {
+      throw new AlreadyExistsException("Table " + 
getCatalogQualifiedTableName(tbl)
+          + " already exists");
+    }
+
+    tbl.setDbName(normalizeIdentifier(tbl.getDbName()));
+    tbl.setTableName(normalizeIdentifier(tbl.getTableName()));
+    IMetaStoreMetadataTransformer transformer = 
handler.getMetadataTransformer();
+    if (transformer != null) {
+      tbl = transformer.transformCreateTable(tbl,
+          request.getProcessorCapabilities(), 
request.getProcessorIdentifier());
+    }
+
+    Map<String, String> params = tbl.getParameters();
+    if (params != null) {
+      params.remove(TABLE_IS_CTAS);
+      params.remove(TABLE_IS_CTLT);
+      if (MetaStoreServerUtils.getBooleanEnvProp(request.getEnvContext(), 
CTAS_LEGACY_CONFIG) &&
+          TableType.MANAGED_TABLE.toString().equals(tbl.getTableType())) {
+        params.put("EXTERNAL", "TRUE");
+        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+      }
+    }
+
+    String validate = 
MetaStoreServerUtils.validateTblColumns(tbl.getSd().getCols());
+    if (validate != null) {
+      throw new InvalidObjectException("Invalid column " + validate);
+    }
+    if (tbl.getPartitionKeys() != null) {
+      validate = 
MetaStoreServerUtils.validateTblColumns(tbl.getPartitionKeys());
+      if (validate != null) {
+        throw new InvalidObjectException("Invalid partition column " + 
validate);
+      }
+    }
+    if (tbl.isSetId()) {
+      LOG.debug("Id shouldn't be set but table {}.{} has the Id set to {}. Id 
is ignored.", tbl.getDbName(),
+          tbl.getTableName(), tbl.getId());
+      tbl.unsetId();
+    }
+    SkewedInfo skew = tbl.getSd().getSkewedInfo();
+    if (skew != null) {
+      validate = 
MetaStoreServerUtils.validateSkewedColNames(skew.getSkewedColNames());
+      if (validate != null) {
+        throw new InvalidObjectException("Invalid skew column " + validate);
+      }
+      validate = MetaStoreServerUtils.validateSkewedColNamesSubsetCol(
+          skew.getSkewedColNames(), tbl.getSd().getCols());
+      if (validate != null) {
+        throw new InvalidObjectException("Invalid skew column " + validate);
+      }
+    }
+
+    colStats = tbl.getColStats();
+    tbl.unsetColStats();
+
+    constraints = new SQLAllTableConstraints();
+    constraints.setPrimaryKeys(request.getPrimaryKeys());
+    constraints.setForeignKeys(request.getForeignKeys());
+    constraints.setUniqueConstraints(request.getUniqueConstraints());
+    constraints.setDefaultConstraints(request.getDefaultConstraints());
+    constraints.setCheckConstraints(request.getCheckConstraints());
+    constraints.setNotNullConstraints(request.getNotNullConstraints());
+
+    String catName = tbl.getCatName();
+    // Check that constraints have catalog name properly set first
+    if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())
+        && !constraints.getPrimaryKeys().get(0).isSetCatName()) {
+      constraints.getPrimaryKeys().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+    if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())
+        && !constraints.getForeignKeys().get(0).isSetCatName()) {
+      constraints.getForeignKeys().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+    if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())
+        && !constraints.getUniqueConstraints().get(0).isSetCatName()) {
+      constraints.getUniqueConstraints().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+    if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())
+        && !constraints.getNotNullConstraints().get(0).isSetCatName()) {
+      constraints.getNotNullConstraints().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+    if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())
+        && !constraints.getDefaultConstraints().get(0).isSetCatName()) {
+      constraints.getDefaultConstraints().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+    if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())
+        && !constraints.getCheckConstraints().get(0).isSetCatName()) {
+      constraints.getCheckConstraints().forEach(constraint -> 
constraint.setCatName(catName));
+    }
+  }
+
+  @Override
+  protected String getMessagePrefix() {
+    return "CreateTableHandler [" + id + "] -  create table for " +
+        TableName.getQualified(tbl.getCatName(), tbl.getDbName(), 
tbl.getTableName()) + ":";
+  }
+
+  @Override
+  protected String getRequestProgress() {
+    return "Creating table";
+  }
+
+  public record CreateTableResult(boolean success, Map<String, String> 
transactionalListenerResponses)
+      implements Result {
+
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
index d4845833a87..fb372c8b8de 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
@@ -67,8 +67,9 @@
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
+@RequestHandler(requestBody = DropDatabaseRequest.class, supportAsync = true, 
metricAlias = "drop_database_req")
 public class DropDatabaseHandler
-    extends AbstractOperationHandler<DropDatabaseRequest, 
DropDatabaseHandler.DropDatabaseResult> {
+    extends AbstractRequestHandler<DropDatabaseRequest, 
DropDatabaseHandler.DropDatabaseResult> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DropDatabaseHandler.class);
   private String name;
@@ -80,6 +81,7 @@ public class DropDatabaseHandler
   private List<String> packages;
   private AtomicReference<String> progress;
   private DropDatabaseResult result;
+  private RawStore rs;
 
   DropDatabaseHandler(IHMSHandler handler, DropDatabaseRequest request) {
     super(handler, request.isAsyncDrop(), request);
@@ -88,11 +90,10 @@ public class DropDatabaseHandler
   public DropDatabaseResult execute() throws TException, IOException {
     boolean success = false;
     Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
-    RawStore rs = handler.getMS();
     rs.openTransaction();
     try {
       if (MetaStoreUtils.isDatabaseRemote(db)) {
-        if (rs.dropDatabase(db.getCatalogName(), db.getName())) {
+        if (rs.dropDatabase(catalogName, name)) {
           success = rs.commitTransaction();
         }
         return result;
@@ -143,7 +144,7 @@ public DropDatabaseResult execute() throws TException, 
IOException {
         dropRequest.setDeleteData(false);
         dropRequest.setDropPartitions(true);
         dropRequest.setAsyncDrop(false);
-        DropTableHandler dropTable = AbstractOperationHandler.offer(handler, 
dropRequest);
+        DropTableHandler dropTable = AbstractRequestHandler.offer(handler, 
dropRequest);
         if (tableDataShouldBeDeleted
             && dropTable.success()) {
           DropTableHandler.DropTableResult dropTableResult = 
dropTable.getResult();
@@ -153,6 +154,7 @@ public DropDatabaseResult execute() throws TException, 
IOException {
         }
       }
 
+      progress.set("Dropping the database");
       if (rs.dropDatabase(catalogName, name)) {
         if (!handler.getTransactionalListeners().isEmpty()) {
           checkInterrupted();
@@ -195,7 +197,7 @@ protected void beforeExecute() throws TException, 
IOException {
     this.catalogName = normalizeIdentifier(
         request.isSetCatalogName() ? request.getCatalogName() : 
MetaStoreUtils.getDefaultCatalog(handler.getConf()));
 
-    RawStore rs = handler.getMS();
+    this.rs = handler.getMS();
     db = rs.getDatabase(catalogName, name);
     if (!MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) && 
ReplChangeManager.isSourceOfReplication(db)) {
       throw new InvalidOperationException("can not drop a database which is a 
source of replication");
@@ -342,7 +344,7 @@ protected String getMessagePrefix() {
   }
 
   @Override
-  protected String getProgress() {
+  protected String getRequestProgress() {
     if (progress == null) {
       return getMessagePrefix() + " hasn't started yet";
     }
@@ -405,12 +407,7 @@ public Result shrinkIfNecessary() {
   }
 
   @Override
-  protected String getHandlerAlias() {
-    return "drop_database_req";
-  }
-
-  @Override
-  protected void afterExecute(DropDatabaseResult result) throws MetaException, 
IOException {
+  protected void afterExecute(DropDatabaseResult result) throws TException, 
IOException {
     try {
       Warehouse wh = handler.getWh();
       if (result != null && result.success()) {
@@ -418,6 +415,9 @@ protected void afterExecute(DropDatabaseResult result) 
throws MetaException, IOE
           wh.addToChangeManagement(funcCmPath);
         }
         if (request.isDeleteData()) {
+          progress.set(String.format("Deleting %d partition paths and %d table 
paths from the database",
+              result.getPartitionPaths() != null ? 
result.getPartitionPaths().size() : 0,
+              result.getTablePaths().size()));
           Database db = result.getDatabase();
           // Delete the data in the partitions which have other locations
           List<Path> pathsToDelete = new ArrayList<>();
@@ -457,12 +457,16 @@ protected void afterExecute(DropDatabaseResult result) 
throws MetaException, IOE
         }
       }
     } finally {
+      super.afterExecute(result);
+      if (async) {
+        rs.shutdown();
+      }
+      rs = null;
       tables = null;
       functions = null;
       procedures = null;
       packages = null;
       db = null;
-      super.afterExecute(result);
     }
   }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
index 5e6bad3db37..823601dd7ca 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
@@ -60,15 +60,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hive.metastore.HMSHandler.addTruncateBaseFile;
+import static 
org.apache.hadoop.hive.metastore.handler.TruncateTableHandler.addTruncateBaseFile;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.checkTableDataShouldBeDeleted;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getWriteId;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isMustPurge;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
+@RequestHandler(requestBody = DropPartitionsRequest.class)
 public class DropPartitionsHandler
-    extends AbstractOperationHandler<DropPartitionsRequest, 
DropPartitionsHandler.DropPartitionsResult> {
+    extends AbstractRequestHandler<DropPartitionsRequest, 
DropPartitionsHandler.DropPartitionsResult> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DropPartitionsHandler.class);
   private TableName tableName;
   private Table table;
@@ -225,7 +226,7 @@ private void verifyIsWritablePath(Path dir) throws 
MetaException {
   }
 
   @Override
-  protected void afterExecute(DropPartitionsResult result) throws 
MetaException, IOException {
+  protected void afterExecute(DropPartitionsResult result) throws TException, 
IOException {
     if (result != null && result.success()) {
       Warehouse wh = handler.getWh();
       long writeId = getWriteId(request.getEnvironmentContext());
@@ -290,7 +291,7 @@ protected String getMessagePrefix() {
   }
 
   @Override
-  protected String getProgress() {
+  protected String getRequestProgress() {
     return "Dropping partitions";
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropTableHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropTableHandler.java
index 446e63b42d4..332ae04179b 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropTableHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropTableHandler.java
@@ -56,14 +56,16 @@
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
+@RequestHandler(requestBody = DropTableRequest.class, supportAsync = true, 
metricAlias = "drop_table_req")
 public class DropTableHandler
-    extends AbstractOperationHandler<DropTableRequest, 
DropTableHandler.DropTableResult> {
+    extends AbstractRequestHandler<DropTableRequest, 
DropTableHandler.DropTableResult> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DropTableHandler.class);
   private Table tbl;
   private Path tblPath;
   private TableName tableName;
   private boolean tableDataShouldBeDeleted;
   private AtomicReference<String> progress;
+  private RawStore ms;
 
   DropTableHandler(IHMSHandler handler, DropTableRequest request) {
     super(handler, request.isAsyncDrop(), request);
@@ -73,9 +75,8 @@ public DropTableResult execute() throws TException {
     boolean success = false;
     List<Path> partPaths = null;
     Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
-    Database db = null;
+    Database db;
     boolean isReplicated = false;
-    RawStore ms = handler.getMS();
     try {
       ms.openTransaction();
       String catName = tableName.getCat();
@@ -145,6 +146,7 @@ public void beforeExecute() throws TException, IOException {
     String name = normalizeIdentifier(request.getTableName());
     String dbname = normalizeIdentifier(request.getDbName());
     tableName = new TableName(catName, dbname, name);
+    this.ms = handler.getMS();
     progress = new AtomicReference<>("Starting to drop the table: " + 
tableName);
     GetTableRequest req = new GetTableRequest(tableName.getDb(), 
tableName.getTable());
     req.setCatName(tableName.getCat());
@@ -179,7 +181,7 @@ public String getMessagePrefix() {
   }
 
   @Override
-  public String getProgress() {
+  public String getRequestProgress() {
     if (progress == null) {
       return getMessagePrefix() + " hasn't started yet";
     }
@@ -187,7 +189,7 @@ public String getProgress() {
   }
 
   @Override
-  protected void afterExecute(DropTableResult result) throws MetaException, 
IOException {
+  protected void afterExecute(DropTableResult result) throws TException, 
IOException {
     try {
       if (result != null && result.success() &&
           result.tableDataShouldBeDeleted()) {
@@ -196,6 +198,8 @@ protected void afterExecute(DropTableResult result) throws 
MetaException, IOExce
         // Data needs deletion. Check if trash may be skipped.
         // Delete the data in the partitions which have other locations
         List<Path> pathsToDelete = new ArrayList<>();
+        progress.set(String.format("Deleting %d partition paths from the 
table",
+            result.partPaths != null ? result.partPaths.size() : 0));
         if (result.partPaths != null) {
           pathsToDelete.addAll(result.partPaths);
         }
@@ -206,6 +210,10 @@ protected void afterExecute(DropTableResult result) throws 
MetaException, IOExce
       }
     } finally {
       super.afterExecute(result);
+      if (async) {
+        ms.shutdown();
+      }
+      ms = null;
       tbl = null;
     }
   }
@@ -244,11 +252,6 @@ private void deleteDataExcludeCmroot(Path path, boolean 
ifPurge, boolean shouldE
     }
   }
 
-  @Override
-  protected String getHandlerAlias() {
-    return "drop_table_req";
-  }
-
   public record DropTableResult(Path tablePath,
                                 boolean success,
                                 boolean tableDataShouldBeDeleted,
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/RequestHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/RequestHandler.java
new file mode 100644
index 00000000000..c9fc082069e
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/RequestHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.thrift.TBase;
+
[email protected]
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RequestHandler {
+  Class<? extends TBase> requestBody();
+  boolean supportAsync() default false;
+  String id() default "getId";
+  String cancel() default "isCancel";
+  String metricAlias() default "";
+}
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
new file mode 100644
index 00000000000..6818a9cf942
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.AcidConstants;
+import org.apache.hadoop.hive.common.AcidMetaDataFile;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.client.builder.GetPartitionsArgs;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.thrift.TException;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.newMetaException;
+import static 
org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.CAT_NAME;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.DB_NAME;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
+
+@RequestHandler(requestBody = TruncateTableRequest.class)
+public class TruncateTableHandler
+    extends AbstractRequestHandler<TruncateTableRequest, 
TruncateTableHandler.TruncateTableResult>  {
+  private String catName;
+  private String dbName;
+  private Warehouse wh;
+  private RawStore ms;
+  private Table table;
+  private List<Partition> partitions;
+
+  TruncateTableHandler(IHMSHandler handler, TruncateTableRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    String[] parsedDbName = parseDbName(request.getDbName(), 
handler.getConf());
+    this.catName = parsedDbName[CAT_NAME];
+    this.dbName = parsedDbName[DB_NAME];
+    GetTableRequest getTableRequest = new GetTableRequest(dbName, 
request.getTableName());
+    getTableRequest.setCatName(catName);
+    this.table = handler.get_table_core(getTableRequest);
+
+    this.ms = handler.getMS();
+    if (request.getPartNames() == null) {
+      if (0 != table.getPartitionKeysSize()) {
+        this.partitions = ms.getPartitions(catName, dbName,
+            request.getTableName(), GetPartitionsArgs.getAllPartitions());
+      }
+    } else {
+      this.partitions = ms.getPartitionsByNames(catName, dbName,
+          request.getTableName(), request.getPartNames());
+    }
+    this.wh = handler.getWh();
+  }
+
+  @Override
+  protected TruncateTableResult execute() throws TException, IOException {
+    boolean isSkipTrash = false, needCmRecycle = false;
+    boolean skipDataDeletion = 
Optional.ofNullable(request.getEnvironmentContext())
+        .map(EnvironmentContext::getProperties)
+        .map(prop -> prop.get(TRUNCATE_SKIP_DATA_DELETION))
+        .map(Boolean::parseBoolean)
+        .orElse(false);
+
+    if (TxnUtils.isTransactionalTable(table) || !skipDataDeletion) {
+      if (!skipDataDeletion) {
+        isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
+        Database db = handler.get_database_core(catName, dbName);
+        needCmRecycle = ReplChangeManager.shouldEnableCm(db, table);
+      }
+      List<Path> locations = new ArrayList<>();
+      if (0 != table.getPartitionKeysSize()) {
+        for (Partition partition : partitions) {
+          locations.add(new Path(partition.getSd().getLocation()));
+        }
+      } else {
+        locations.add(new Path(table.getSd().getLocation()));
+      }
+      // This is not transactional
+      for (Path location : locations) {
+        if (!skipDataDeletion) {
+          truncateDataFiles(location, isSkipTrash, needCmRecycle);
+        } else {
+          // For Acid tables we don't need to delete the old files, only write 
an empty baseDir.
+          // Compaction and cleaner will take care of the rest
+          addTruncateBaseFile(location, request.getWriteId(),
+              handler.getConf(), AcidMetaDataFile.DataFormat.TRUNCATED);
+        }
+      }
+    }
+    // Alter the table/partition stats and also notify truncate table event
+    alterTableStatsForTruncate();
+    return new TruncateTableResult(true);
+  }
+
+  private void updateStatsForTruncate(Map<String,String> props, 
EnvironmentContext environmentContext) {
+    if (null == props) {
+      return;
+    }
+    for (String stat : StatsSetupConst.SUPPORTED_STATS) {
+      String statVal = props.get(stat);
+      if (statVal != null) {
+        //In the case of truncate table, we set the stats to be 0.
+        props.put(stat, "0");
+      }
+    }
+    //first set basic stats to true
+    StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
+    environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, 
StatsSetupConst.TASK);
+    
environmentContext.putToProperties(StatsSetupConst.DO_NOT_POPULATE_QUICK_STATS, 
StatsSetupConst.TRUE);
+    //then invalidate column stats
+    StatsSetupConst.clearColumnStatsState(props);
+  }
+
+  private void alterPartitionsForTruncate() throws TException {
+    EnvironmentContext environmentContext = new EnvironmentContext();
+    if (partitions.isEmpty()) {
+      return;
+    }
+    List<List<String>> partValsList = new ArrayList<>();
+    for (Partition partition: partitions) {
+      updateStatsForTruncate(partition.getParameters(), environmentContext);
+      if (request.getWriteId() > 0) {
+        partition.setWriteId(request.getWriteId());
+      }
+      partition.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(System
+          .currentTimeMillis() / 1000));
+      partValsList.add(partition.getValues());
+    }
+    ms.alterPartitions(catName, dbName, request.getTableName(), partValsList, 
partitions,
+        request.getWriteId(), request.getValidWriteIdList());
+    if (handler.getTransactionalListeners() != null && 
!handler.getTransactionalListeners().isEmpty()) {
+      boolean shouldSendSingleEvent = 
MetastoreConf.getBoolVar(handler.getConf(),
+          MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED);
+      if (shouldSendSingleEvent) {
+        
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+            EventMessage.EventType.ALTER_PARTITIONS,
+            new AlterPartitionsEvent(partitions, partitions, table, true, 
true, handler), environmentContext);
+      } else {
+        for (Partition partition : partitions) {
+          
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+              EventMessage.EventType.ALTER_PARTITION,
+              new AlterPartitionEvent(partition, partition, table, true, true, 
partition.getWriteId(), handler),
+              environmentContext);
+        }
+      }
+    }
+    if (handler.getListeners() != null && !handler.getListeners().isEmpty()) {
+      boolean shouldSendSingleEvent = 
MetastoreConf.getBoolVar(handler.getConf(),
+          MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED);
+      if (shouldSendSingleEvent) {
+        MetaStoreListenerNotifier.notifyEvent(handler.getListeners(), 
EventMessage.EventType.ALTER_PARTITIONS,
+            new AlterPartitionsEvent(partitions, partitions, table, true, 
true, handler), environmentContext);
+      } else {
+        for (Partition partition : partitions) {
+          MetaStoreListenerNotifier.notifyEvent(handler.getListeners(), 
EventMessage.EventType.ALTER_PARTITION,
+              new AlterPartitionEvent(partition, partition, table, true, true, 
partition.getWriteId(), handler),
+              environmentContext);
+        }
+      }
+    }
+  }
+
+  private void alterTableStatsForTruncate() throws TException{
+    if (0 != table.getPartitionKeysSize()) {
+      alterPartitionsForTruncate();
+    } else {
+      EnvironmentContext environmentContext = new EnvironmentContext();
+      updateStatsForTruncate(table.getParameters(), environmentContext);
+      boolean isReplicated = isDbReplicationTarget(ms.getDatabase(catName, 
dbName));
+      if (!handler.getTransactionalListeners().isEmpty()) {
+        
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+            EventMessage.EventType.ALTER_TABLE,
+            new AlterTableEvent(table, table, true, true,
+                request.getWriteId(), handler, isReplicated));
+      }
+
+      if (!handler.getListeners().isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+            EventMessage.EventType.ALTER_TABLE,
+            new AlterTableEvent(table, table, true, true,
+                request.getWriteId(), handler, isReplicated));
+      }
+      // TODO: this should actually pass thru and set writeId for txn stats.
+      if (request.getWriteId() > 0) {
+        table.setWriteId(request.getWriteId());
+      }
+      ms.alterTable(catName, dbName, request.getTableName(), table,
+          request.getValidWriteIdList());
+    }
+  }
+
+  private void truncateDataFiles(Path location, boolean isSkipTrash, boolean 
needCmRecycle)
+      throws IOException, MetaException {
+    FileSystem fs = location.getFileSystem(handler.getConf());
+
+    if (!HdfsUtils.isPathEncrypted(handler.getConf(), fs.getUri(), location) &&
+        !FileUtils.pathHasSnapshotSubDir(location, fs)) {
+      HdfsUtils.HadoopFileStatus status = new 
HdfsUtils.HadoopFileStatus(handler.getConf(), fs, location);
+      FileStatus targetStatus = fs.getFileStatus(location);
+      String targetGroup = targetStatus == null ? null : 
targetStatus.getGroup();
+
+      wh.deleteDir(location, isSkipTrash, needCmRecycle);
+      fs.mkdirs(location);
+      HdfsUtils.setFullFileStatus(handler.getConf(), status, targetGroup, fs, 
location, false);
+    } else {
+      FileStatus[] statuses = fs.listStatus(location, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+      if (statuses == null || statuses.length == 0) {
+        return;
+      }
+      for (final FileStatus status : statuses) {
+        wh.deleteDir(status.getPath(), isSkipTrash, needCmRecycle);
+      }
+    }
+  }
+
+  /**
+   * Add an empty baseDir with a truncate metadatafile.
+   * @param location partition or table directory
+   * @param writeId allocated writeId
+   * @throws MetaException
+   */
+  public static void addTruncateBaseFile(Path location, long writeId, 
Configuration conf,
+      AcidMetaDataFile.DataFormat dataFormat) throws MetaException {
+    if (location == null) {
+      return;
+    }
+
+    Path basePath = new Path(location, AcidConstants.baseDir(writeId));
+    try {
+      FileSystem fs = location.getFileSystem(conf);
+      fs.mkdirs(basePath);
+      // We can not leave the folder empty, otherwise it will be skipped at 
some file listing in AcidUtils
+      // No need for a data file, a simple metadata is enough
+      AcidMetaDataFile.writeToFile(fs, basePath, dataFormat);
+    } catch (Exception e) {
+      throw newMetaException(e);
+    }
+  }
+
+  @Override
+  protected String getMessagePrefix() {
+    return "TruncateTableHandler [" + id + "] -  truncate table for " +
+        TableName.getQualified(catName, dbName, table.getTableName()) + ":";
+  }
+
+  @Override
+  protected String getRequestProgress() {
+    return "Truncating table";
+  }
+
+  public record TruncateTableResult(boolean success) implements Result {
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestDropTable.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestDropTable.java
index 77d3c32b686..08d40e4924f 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestDropTable.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestDropTable.java
@@ -27,7 +27,7 @@
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.handler.AbstractOperationHandler;
+import org.apache.hadoop.hive.metastore.handler.AbstractRequestHandler;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
@@ -221,7 +221,7 @@ public void testDropProgress() throws Exception {
     assertNotNull(resp.getMessage());
     dropTableReq.setId(resp.getId());
     while (!resp.isFinished()) {
-      assertTrue(AbstractOperationHandler.containsOp(dropTableReq.getId()));
+      assertTrue(AbstractRequestHandler.containsRequest(dropTableReq.getId()));
       resp = iface.drop_table_req(dropTableReq);
       assertNotNull(resp.getMessage());
     }
@@ -258,10 +258,10 @@ public void cancelDropTable() throws Exception {
     // cancel the request
     dropTableReq.setId(resp.getId());
     dropTableReq.setCancel(true);
-    assertTrue(AbstractOperationHandler.containsOp(dropTableReq.getId()));
+    assertTrue(AbstractRequestHandler.containsRequest(dropTableReq.getId()));
     resp = iface.drop_table_req(dropTableReq);
     assertTrue(resp.isFinished());
-    assertTrue(resp.getMessage().contains("table " + table.getCatName() + "."+ 
tableName + ": Canceled"));
+    assertTrue(resp.getMessage(), resp.getMessage().contains(": Canceled"));
 
     PartitionsRequest req = new PartitionsRequest();
     req.setDbName(DB_NAME);

Reply via email to