This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 98da62c93f1 HIVE-29407: Move the set_aggr_stats_for out of HMSHandler
(#6287)
98da62c93f1 is described below
commit 98da62c93f198126c78d3352bf3ac6aeacefa53c
Author: dengzh <[email protected]>
AuthorDate: Tue Feb 10 08:25:24 2026 +0800
HIVE-29407: Move the set_aggr_stats_for out of HMSHandler (#6287)
---
.../listener/TestDbNotificationListener.java | 2 +-
.../hive/metastore/TestReplChangeManager.java | 1 +
.../apache/hadoop/hive/metastore/HMSHandler.java | 699 ++-------------------
.../metastore/handler/AbstractRequestHandler.java | 40 +-
.../metastore/handler/AddPartitionsHandler.java | 13 +-
.../metastore/handler/CreateDatabaseHandler.java | 271 ++++++++
.../hive/metastore/handler/CreateTableHandler.java | 74 +--
.../metastore/handler/DropDatabaseHandler.java | 17 +-
.../metastore/handler/DropPartitionsHandler.java | 5 -
.../metastore/handler/SetAggrStatsHandler.java | 460 ++++++++++++++
.../metastore/handler/TruncateTableHandler.java | 6 +-
.../hive/metastore/TestMetaStoreEventListener.java | 5 +-
12 files changed, 846 insertions(+), 747 deletions(-)
diff --git
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index ae588d8b93f..3bc716855a0 100644
---
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -342,7 +342,7 @@ public void createDatabase() throws Exception {
String dbName2 = "createdb2";
String dbLocationUri = testTempDir;
String dbDescription = "no description";
- Database db = new Database(dbName, dbDescription, dbLocationUri,
emptyParameters);
+ Database db = new Database(dbName, dbDescription, "file:" + dbLocationUri,
emptyParameters);
db.setOwnerName("test_user");
msClient.createDatabase(db);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 7cdc1bbfa9b..530a9ff2eeb 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -249,6 +249,7 @@ public void testRecycleNonPartTable() throws Exception {
db.putToParameters(SOURCE_OF_REPLICATION, "1, 2, 3");
db.setName(dbName);
client.createDatabase(db);
+ db = client.getDatabase(dbName);
String tblName = "t1";
List<FieldSchema> columns = new ArrayList<FieldSchema>();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index e6cc721b153..6065574d969 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -42,17 +42,11 @@
import org.apache.hadoop.hive.metastore.events.*;
import org.apache.hadoop.hive.metastore.handler.AbstractRequestHandler;
import org.apache.hadoop.hive.metastore.handler.AddPartitionsHandler;
-import org.apache.hadoop.hive.metastore.handler.CreateTableHandler;
-import org.apache.hadoop.hive.metastore.handler.DropDatabaseHandler;
import org.apache.hadoop.hive.metastore.handler.DropPartitionsHandler;
-import org.apache.hadoop.hive.metastore.handler.DropTableHandler;
-import org.apache.hadoop.hive.metastore.handler.TruncateTableHandler;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
-import org.apache.hadoop.hive.metastore.model.MTable;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.properties.PropertyException;
import org.apache.hadoop.hive.metastore.properties.PropertyManager;
@@ -75,9 +69,7 @@
import javax.jdo.JDOException;
import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
@@ -918,11 +910,12 @@ public void create_catalog(CreateCatalogRequest rqst)
ms.createCatalog(catalog);
// Create a default database inside the catalog
- Database db = new Database(DEFAULT_DATABASE_NAME,
- "Default database for catalog " + catalog.getName(),
catalog.getLocationUri(),
- Collections.emptyMap());
- db.setCatalogName(catalog.getName());
- create_database_core(ms, db);
+ CreateDatabaseRequest cdr = new
CreateDatabaseRequest(DEFAULT_DATABASE_NAME);
+ cdr.setCatalogName(catalog.getName());
+ cdr.setLocationUri(catalog.getLocationUri());
+ cdr.setParameters(Collections.emptyMap());
+ cdr.setDescription("Default database for catalog " +
catalog.getName());
+ AbstractRequestHandler.offer(this, cdr).getResult();
if (!transactionalListeners.isEmpty()) {
transactionalListenersResponses =
@@ -949,9 +942,11 @@ public void create_catalog(CreateCatalogRequest rqst)
}
}
success = true;
- } catch (AlreadyExistsException|InvalidObjectException|MetaException e) {
+ } catch (Exception e) {
ex = e;
- throw e;
+ throw handleException(e)
+ .throwIfInstance(AlreadyExistsException.class,
InvalidObjectException.class, MetaException.class)
+ .defaultMetaException();
} finally {
endFunction("create_catalog", success, ex);
}
@@ -1132,194 +1127,6 @@ private void dropCatalogCore(String catName, boolean
ifExists)
}
}
- // Assumes that the catalog has already been set.
- private void create_database_core(RawStore ms, final Database db)
- throws AlreadyExistsException, InvalidObjectException, MetaException {
- if (!MetaStoreUtils.validateName(db.getName(), conf)) {
- throw new InvalidObjectException(db.getName() + " is not a valid
database name");
- }
-
- Catalog cat = null;
- try {
- cat = getMS().getCatalog(db.getCatalogName());
- } catch (NoSuchObjectException e) {
- LOG.error("No such catalog " + db.getCatalogName());
- throw new InvalidObjectException("No such catalog " +
db.getCatalogName());
- }
- boolean skipAuthorization = false;
- String passedInURI = db.getLocationUri();
- String passedInManagedURI = db.getManagedLocationUri();
- if (passedInURI == null && passedInManagedURI == null) {
- skipAuthorization = true;
- }
- final Path defaultDbExtPath = wh.getDefaultDatabasePath(db.getName(),
true);
- final Path defaultDbMgdPath = wh.getDefaultDatabasePath(db.getName(),
false);
- final Path dbExtPath = (passedInURI != null) ? wh.getDnsPath(new
Path(passedInURI)) : wh.determineDatabasePath(cat, db);
- final Path dbMgdPath = (passedInManagedURI != null) ? wh.getDnsPath(new
Path(passedInManagedURI)) : null;
-
- if ((defaultDbExtPath.equals(dbExtPath) &&
defaultDbMgdPath.equals(dbMgdPath)) &&
- ((dbMgdPath == null) || dbMgdPath.equals(defaultDbMgdPath))) {
- skipAuthorization = true;
- }
-
- if ( skipAuthorization ) {
- //null out to skip authorizer URI check
- db.setLocationUri(null);
- db.setManagedLocationUri(null);
- }else{
- db.setLocationUri(dbExtPath.toString());
- if (dbMgdPath != null) {
- db.setManagedLocationUri(dbMgdPath.toString());
- }
- }
- if (db.getOwnerName() == null){
- try {
- db.setOwnerName(SecurityUtils.getUGI().getShortUserName());
- }catch (Exception e){
- LOG.warn("Failed to get owner name for create database operation.", e);
- }
- }
- long time = System.currentTimeMillis()/1000;
- db.setCreateTime((int) time);
- boolean success = false;
- boolean madeManagedDir = false;
- boolean madeExternalDir = false;
- boolean isReplicated = isDbReplicationTarget(db);
- Map<String, String> transactionalListenersResponses =
Collections.emptyMap();
- try {
- firePreEvent(new PreCreateDatabaseEvent(db, this));
- //reinstate location uri for metastore db.
- if (skipAuthorization == true){
- db.setLocationUri(dbExtPath.toString());
- if (dbMgdPath != null) {
- db.setManagedLocationUri(dbMgdPath.toString());
- }
- }
- if (db.getCatalogName() != null && !db.getCatalogName().
- equals(Warehouse.DEFAULT_CATALOG_NAME)) {
- if (!wh.isDir(dbExtPath)) {
- LOG.debug("Creating database path " + dbExtPath);
- if (!wh.mkdirs(dbExtPath)) {
- throw new MetaException("Unable to create database path " +
dbExtPath +
- ", failed to create database " + db.getName());
- }
- madeExternalDir = true;
- }
- } else {
- if (dbMgdPath != null) {
- try {
- // Since this may be done as random user (if doAs=true) he may not
have access
- // to the managed directory. We run this as an admin user
- madeManagedDir = UserGroupInformation.getLoginUser().doAs(new
PrivilegedExceptionAction<Boolean>() {
- @Override public Boolean run() throws MetaException {
- if (!wh.isDir(dbMgdPath)) {
- LOG.info("Creating database path in managed directory " +
dbMgdPath);
- if (!wh.mkdirs(dbMgdPath)) {
- throw new MetaException("Unable to create database managed
path " + dbMgdPath + ", failed to create database " + db.getName());
- }
- return true;
- }
- return false;
- }
- });
- if (madeManagedDir) {
- LOG.info("Created database path in managed directory " +
dbMgdPath);
- } else if (!isInTest || !isDbReplicationTarget(db)) { // Hive
replication tests doesn't drop the db after each test
- throw new MetaException(
- "Unable to create database managed directory " + dbMgdPath +
", failed to create database " + db.getName());
- }
- } catch (IOException | InterruptedException e) {
- throw new MetaException(
- "Unable to create database managed directory " + dbMgdPath +
", failed to create database " + db.getName() + ":" + e.getMessage());
- }
- }
- if (dbExtPath != null) {
- try {
- madeExternalDir = UserGroupInformation.getCurrentUser().doAs(new
PrivilegedExceptionAction<Boolean>() {
- @Override public Boolean run() throws MetaException {
- if (!wh.isDir(dbExtPath)) {
- LOG.info("Creating database path in external directory " +
dbExtPath);
- return wh.mkdirs(dbExtPath);
- }
- return false;
- }
- });
- if (madeExternalDir) {
- LOG.info("Created database path in external directory " +
dbExtPath);
- } else {
- LOG.warn("Failed to create external path " + dbExtPath + " for
database " + db.getName() + ". This may result in access not being allowed if
the "
- + "StorageBasedAuthorizationProvider is enabled");
- }
- } catch (IOException | InterruptedException |
UndeclaredThrowableException e) {
- throw new MetaException("Failed to create external path " +
dbExtPath + " for database " + db.getName() + ". This may result in access not
being allowed if the "
- + "StorageBasedAuthorizationProvider is enabled: " +
e.getMessage());
- }
- } else {
- LOG.info("Database external path won't be created since the external
warehouse directory is not defined");
- }
- }
-
- ms.openTransaction();
- ms.createDatabase(db);
-
- if (!transactionalListeners.isEmpty()) {
- transactionalListenersResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, true, this, isReplicated));
- }
-
- success = ms.commitTransaction();
- } finally {
- if (!success) {
- ms.rollbackTransaction();
-
- if (db.getCatalogName() != null && !db.getCatalogName().
- equals(Warehouse.DEFAULT_CATALOG_NAME)) {
- if (madeManagedDir && dbMgdPath != null) {
- wh.deleteDir(dbMgdPath, true, db);
- }
- } else {
- if (madeManagedDir && dbMgdPath != null) {
- try {
- UserGroupInformation.getLoginUser().doAs(new
PrivilegedExceptionAction<Void>() {
- @Override public Void run() throws Exception {
- wh.deleteDir(dbMgdPath, true, db);
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- LOG.error(
- "Couldn't delete managed directory " + dbMgdPath + " after "
+ "it was created for database " + db.getName() + " " + e.getMessage());
- }
- }
-
- if (madeExternalDir && dbExtPath != null) {
- try {
- UserGroupInformation.getCurrentUser().doAs(new
PrivilegedExceptionAction<Void>() {
- @Override public Void run() throws Exception {
- wh.deleteDir(dbExtPath, true, db);
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- LOG.error("Couldn't delete external directory " + dbExtPath + "
after " + "it was created for database "
- + db.getName() + " " + e.getMessage());
- }
- }
- }
- }
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, success, this, isReplicated),
- null,
- transactionalListenersResponses, ms);
- }
- }
- }
-
@Override
@Deprecated
public void create_database(final Database db)
@@ -1353,6 +1160,7 @@ public void create_database_req(final
CreateDatabaseRequest createDatabaseReques
if (!createDatabaseRequest.isSetCatalogName()) {
createDatabaseRequest.setCatalogName(getDefaultCatalog(conf));
}
+
try {
try {
if (null != get_database_core(createDatabaseRequest.getCatalogName(),
createDatabaseRequest.getDatabaseName())) {
@@ -1361,40 +1169,7 @@ public void create_database_req(final
CreateDatabaseRequest createDatabaseReques
} catch (NoSuchObjectException e) {
// expected
}
-
- Database db = new Database(createDatabaseRequest.getDatabaseName(),
createDatabaseRequest.getDescription(),
- createDatabaseRequest.getLocationUri()
,createDatabaseRequest.getParameters());
- if (createDatabaseRequest.isSetPrivileges()) {
- db.setPrivileges(createDatabaseRequest.getPrivileges());
- }
- if (createDatabaseRequest.isSetOwnerName()) {
- db.setOwnerName(createDatabaseRequest.getOwnerName());
- }
- if (createDatabaseRequest.isSetOwnerType()) {
- db.setOwnerType(createDatabaseRequest.getOwnerType());
- }
- db.setCatalogName(createDatabaseRequest.getCatalogName());
- if (createDatabaseRequest.isSetCreateTime()) {
- db.setCreateTime(createDatabaseRequest.getCreateTime());
- } else {
- db.setCreateTime((int)(System.currentTimeMillis() / 1000));
- }
- if (createDatabaseRequest.isSetManagedLocationUri()) {
-
db.setManagedLocationUri(createDatabaseRequest.getManagedLocationUri());
- }
- if (createDatabaseRequest.isSetType()) {
- db.setType(createDatabaseRequest.getType());
- }
- if (createDatabaseRequest.isSetDataConnectorName()) {
- db.setConnector_name(createDatabaseRequest.getDataConnectorName());
- }
- if (createDatabaseRequest.isSetRemote_dbname()) {
- db.setRemote_dbname(createDatabaseRequest.getRemote_dbname());
- }
- create_database_core(getMS(), db);
- createDatabaseRequest.setLocationUri(db.getLocationUri());
- createDatabaseRequest.setManagedLocationUri(db.getManagedLocationUri());
- success = true;
+ success = AbstractRequestHandler.offer(this,
createDatabaseRequest).success();
} catch (Exception e) {
ex = e;
throw handleException(e)
@@ -1582,9 +1357,7 @@ public AsyncOperationResp drop_database_req(final
DropDatabaseRequest req)
}
Exception ex = null;
try {
- DropDatabaseHandler dropDatabaseOp = AbstractRequestHandler.offer(this,
req);
- AbstractRequestHandler.RequestStatus status =
dropDatabaseOp.getRequestStatus();
- return status.toAsyncOperationResp();
+ return AbstractRequestHandler.offer(this,
req).getRequestStatus().toAsyncOperationResp();
} catch (Exception e) {
ex = e;
// Reset the id of the request in case of RetryingHMSHandler retries
@@ -2046,8 +1819,7 @@ public void create_table_req(final CreateTableRequest req)
boolean success = false;
Exception ex = null;
try {
- CreateTableHandler createTableOp = AbstractRequestHandler.offer(this,
req);
- success = createTableOp.success();
+ success = AbstractRequestHandler.offer(this, req).success();
} catch (Exception e) {
LOG.warn("create_table_req got ", e);
ex = e;
@@ -2391,11 +2163,6 @@ public void
add_check_constraint(AddCheckConstraintRequest req)
}
}
- private boolean is_table_exists(RawStore ms, String catName, String dbname,
String name)
- throws MetaException {
- return (ms.getTable(catName, dbname, name, null) != null);
- }
-
@Override
@Deprecated
public void drop_table(final String dbname, final String name, final boolean
deleteData)
@@ -2428,9 +2195,7 @@ public AsyncOperationResp drop_table_req(DropTableRequest
dropReq)
", async: " + dropReq.isAsyncDrop() + ", id: " + dropReq.getId());
Exception ex = null;
try {
- DropTableHandler dropTableOp = AbstractRequestHandler.offer(this,
dropReq);
- AbstractRequestHandler.RequestStatus status =
dropTableOp.getRequestStatus();
- return status.toAsyncOperationResp();
+ return AbstractRequestHandler.offer(this,
dropReq).getRequestStatus().toAsyncOperationResp();
} catch (Exception e) {
ex = e;
// Here we get an exception, the RetryingHMSHandler might retry the call,
@@ -2473,8 +2238,7 @@ public TruncateTableResponse
truncate_table_req(TruncateTableRequest req)
Exception ex = null;
boolean success = false;
try {
- TruncateTableHandler truncateTable = AbstractRequestHandler.offer(this,
req);
- success = truncateTable.success();
+ success = AbstractRequestHandler.offer(this, req).success();
return new TruncateTableResponse();
} catch (Exception e) {
ex = e;
@@ -3094,7 +2858,7 @@ public AddPartitionsResult
add_partitions_req(AddPartitionsRequest request)
AddPartitionsHandler.AddPartitionsResult addPartsResult =
addPartsOp.getResult();
if (request.isSkipColumnSchemaForPartition()) {
if (addPartsResult.newParts() != null &&
!addPartsResult.newParts().isEmpty()) {
- StorageDescriptor sd =
addPartsResult.newParts().get(0).getSd().deepCopy();
+ StorageDescriptor sd =
addPartsResult.newParts().getFirst().getSd().deepCopy();
result.setPartitionColSchema(sd.getCols());
}
addPartsResult.newParts().stream().forEach(partition ->
partition.getSd().getCols().clear());
@@ -5151,7 +4915,13 @@ public PartitionsStatsResult
get_partitions_statistics_req(PartitionsStatsReques
@Override
public boolean update_table_column_statistics(ColumnStatistics colStats)
throws TException {
// Deprecated API, won't work for transactional tables
- return updateTableColumnStatsInternal(colStats, null, -1);
+ colStats.getStatsDesc().setIsTblLevel(true);
+ SetPartitionsStatsRequest setStatsRequest =
+ new SetPartitionsStatsRequest(List.of(colStats));
+ setStatsRequest.setWriteId(-1);
+ setStatsRequest.setValidWriteIdList(null);
+ setStatsRequest.setNeedMerge(false);
+ return set_aggr_stats_for(setStatsRequest);
}
@Override
@@ -5165,194 +4935,27 @@ public SetPartitionsStatsResponse
update_table_column_statistics_req(
if (req.isNeedMerge()) {
throw new InvalidInputException("Merge is not supported for
non-aggregate stats");
}
- ColumnStatistics colStats = req.getColStatsIterator().next();
- boolean ret = updateTableColumnStatsInternal(colStats,
- req.getValidWriteIdList(), req.getWriteId());
+ req.getColStats().getFirst().getStatsDesc().setIsTblLevel(true);
+ SetPartitionsStatsRequest setStatsRequest = new
SetPartitionsStatsRequest(req.getColStats());
+ setStatsRequest.setWriteId(req.getWriteId());
+ setStatsRequest.setValidWriteIdList(req.getValidWriteIdList());
+ setStatsRequest.setNeedMerge(false);
+ setStatsRequest.setEngine(req.getEngine());
+ boolean ret = set_aggr_stats_for(setStatsRequest);
return new SetPartitionsStatsResponse(ret);
}
- private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
- String validWriteIds, long
writeId)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
- normalizeColStatsInput(colStats);
-
- startFunction("write_column_statistics", ": table=" +
TableName.getQualified(
- colStats.getStatsDesc().getCatName(),
colStats.getStatsDesc().getDbName(),
- colStats.getStatsDesc().getTableName()));
-
- Map<String, String> parameters = null;
- getMS().openTransaction();
- boolean committed = false;
- try {
- parameters = getMS().updateTableColumnStatistics(colStats,
validWriteIds, writeId);
- if (parameters != null) {
- Table tableObj = getMS().getTable(colStats.getStatsDesc().getCatName(),
- colStats.getStatsDesc().getDbName(),
- colStats.getStatsDesc().getTableName(), validWriteIds);
- if (transactionalListeners != null &&
!transactionalListeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
- writeId, this));
- }
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
- writeId,this));
- }
- }
- committed = getMS().commitTransaction();
- } finally {
- if (!committed) {
- getMS().rollbackTransaction();
- }
- endFunction("write_column_statistics", parameters != null, null,
- colStats.getStatsDesc().getTableName());
- }
-
- return parameters != null;
- }
-
- private void normalizeColStatsInput(ColumnStatistics colStats) throws
MetaException {
- // TODO: is this really needed? this code is propagated from HIVE-1362 but
most of it is useless.
- ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
- statsDesc.setCatName(statsDesc.isSetCatName() ?
statsDesc.getCatName().toLowerCase() : getDefaultCatalog(conf));
- statsDesc.setDbName(statsDesc.getDbName().toLowerCase());
- statsDesc.setTableName(statsDesc.getTableName().toLowerCase());
- statsDesc.setPartName(statsDesc.getPartName());
- long time = System.currentTimeMillis() / 1000;
- statsDesc.setLastAnalyzed(time);
-
- for (ColumnStatisticsObj statsObj : colStats.getStatsObj()) {
- statsObj.setColName(statsObj.getColName().toLowerCase());
- statsObj.setColType(statsObj.getColType().toLowerCase());
- }
- colStats.setStatsDesc(statsDesc);
- colStats.setStatsObj(colStats.getStatsObj());
- }
-
- public boolean updatePartitonColStatsInternal(Table tbl, MTable mTable,
ColumnStatistics colStats,
- String validWriteIds, long
writeId)
- throws MetaException, InvalidObjectException, NoSuchObjectException,
InvalidInputException {
- normalizeColStatsInput(colStats);
-
- ColumnStatisticsDesc csd = colStats.getStatsDesc();
- String catName = csd.getCatName(), dbName = csd.getDbName(), tableName =
csd.getTableName();
- startFunction("write_partition_column_statistics", ": db=" + dbName + "
table=" + tableName
- + " part=" + csd.getPartName());
-
- boolean ret = false;
-
- Map<String, String> parameters;
- List<String> partVals;
- boolean committed = false;
- getMS().openTransaction();
-
- try {
- tbl = Optional.ofNullable(tbl).orElse(getTable(catName, dbName,
tableName));
- mTable =
Optional.ofNullable(mTable).orElse(getMS().ensureGetMTable(catName, dbName,
tableName));
- partVals = getPartValsFromName(tbl, csd.getPartName());
- parameters = getMS().updatePartitionColumnStatistics(tbl, mTable,
colStats, partVals, validWriteIds, writeId);
- if (parameters != null) {
- if (transactionalListeners != null &&
!transactionalListeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.UPDATE_PARTITION_COLUMN_STAT,
- new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl,
- writeId, this));
- }
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.UPDATE_PARTITION_COLUMN_STAT,
- new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl,
- writeId, this));
- }
- }
- committed = getMS().commitTransaction();
- } finally {
- if (!committed) {
- getMS().rollbackTransaction();
- }
- endFunction("write_partition_column_statistics", ret != false, null,
tableName);
- }
- return parameters != null;
- }
-
- private void updatePartitionColStatsForOneBatch(Table tbl, Map<String,
ColumnStatistics> statsMap,
- String validWriteIds,
long writeId)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
- Map<String, Map<String, String>> result =
- getMS().updatePartitionColumnStatisticsInBatch(statsMap, tbl,
transactionalListeners, validWriteIds, writeId);
- if (result != null && result.size() != 0 && listeners != null) {
- // The normal listeners, unlike transaction listeners are not using the
same transactions used by the update
- // operations. So there is no need of keeping them within the same
transactions. If notification to one of
- // the listeners failed, then even if we abort the transaction, we can
not revert the notifications sent to the
- // other listeners.
- for (Map.Entry entry : result.entrySet()) {
- Map<String, String> parameters = (Map<String, String>)
entry.getValue();
- ColumnStatistics colStats = statsMap.get(entry.getKey());
- List<String> partVals = getPartValsFromName(tbl,
colStats.getStatsDesc().getPartName());
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT,
- new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters,
- tbl, writeId, this));
- }
- }
- }
-
- private boolean updatePartitionColStatsInBatch(Table tbl, Map<String,
ColumnStatistics> statsMap,
- String validWriteIds, long
writeId)
- throws MetaException, InvalidObjectException, NoSuchObjectException,
InvalidInputException {
-
- if (statsMap.size() == 0) {
- return false;
- }
-
- String catalogName = tbl.getCatName();
- String dbName = tbl.getDbName();
- String tableName = tbl.getTableName();
-
- startFunction("updatePartitionColStatsInBatch", ": db=" + dbName + "
table=" + tableName);
- long start = System.currentTimeMillis();
-
- Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
- long numStats = 0;
- long numStatsMax = MetastoreConf.getIntVar(conf,
ConfVars.JDBC_MAX_BATCH_SIZE);
- try {
- for (Map.Entry entry : statsMap.entrySet()) {
- ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
- normalizeColStatsInput(colStats);
- assert
catalogName.equalsIgnoreCase(colStats.getStatsDesc().getCatName());
- assert dbName.equalsIgnoreCase(colStats.getStatsDesc().getDbName());
- assert
tableName.equalsIgnoreCase(colStats.getStatsDesc().getTableName());
- newStatsMap.put((String) entry.getKey(), colStats);
- numStats += colStats.getStatsObjSize();
-
- if (newStatsMap.size() >= numStatsMax) {
- updatePartitionColStatsForOneBatch(tbl, newStatsMap, validWriteIds,
writeId);
- newStatsMap.clear();
- numStats = 0;
- }
- }
- if (numStats != 0) {
- updatePartitionColStatsForOneBatch(tbl, newStatsMap, validWriteIds,
writeId);
- }
- } finally {
- endFunction("updatePartitionColStatsInBatch", true, null, tableName);
- long end = System.currentTimeMillis();
- float sec = (end - start) / 1000F;
- LOG.info("updatePartitionColStatsInBatch took " + sec + " seconds for "
+ statsMap.size() + " stats");
- }
- return true;
- }
-
@Override
public boolean update_partition_column_statistics(ColumnStatistics colStats)
throws TException {
// Deprecated API.
- return updatePartitonColStatsInternal(null, null, colStats, null, -1);
+ SetPartitionsStatsRequest setStatsRequest =
+ new SetPartitionsStatsRequest(Arrays.asList(colStats));
+ setStatsRequest.setWriteId(-1);
+ setStatsRequest.setValidWriteIdList(null);
+ setStatsRequest.setNeedMerge(false);
+ return set_aggr_stats_for(setStatsRequest);
}
-
@Override
public SetPartitionsStatsResponse update_partition_column_statistics_req(
SetPartitionsStatsRequest req) throws NoSuchObjectException,
@@ -5364,9 +4967,12 @@ public SetPartitionsStatsResponse
update_partition_column_statistics_req(
if (req.isNeedMerge()) {
throw new InvalidInputException("Merge is not supported for
non-aggregate stats");
}
- ColumnStatistics colStats = req.getColStatsIterator().next();
- boolean ret = updatePartitonColStatsInternal(null, null, colStats,
- req.getValidWriteIdList(), req.getWriteId());
+ SetPartitionsStatsRequest setStatsRequest = new
SetPartitionsStatsRequest(req.getColStats());
+ setStatsRequest.setWriteId(req.getWriteId());
+ setStatsRequest.setValidWriteIdList(req.getValidWriteIdList());
+ setStatsRequest.setNeedMerge(false);
+ setStatsRequest.setEngine(req.getEngine());
+ boolean ret = set_aggr_stats_for(setStatsRequest);
return new SetPartitionsStatsResponse(ret);
}
@@ -7447,213 +7053,32 @@ public AggrStats
get_aggr_stats_for(PartitionsStatsRequest request) throws TExce
}
@Override
- public boolean set_aggr_stats_for(SetPartitionsStatsRequest request) throws
TException {
- boolean ret = true;
- List<ColumnStatistics> csNews = request.getColStats();
- if (csNews == null || csNews.isEmpty()) {
- return ret;
+ public boolean set_aggr_stats_for(SetPartitionsStatsRequest req) throws
TException {
+ List<ColumnStatistics> columnStatisticsList = req.getColStats();
+ if (columnStatisticsList == null || columnStatisticsList.isEmpty()) {
+ return true;
}
- // figure out if it is table level or partition level
- ColumnStatistics firstColStats = csNews.get(0);
- ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+ ColumnStatisticsDesc statsDesc =
columnStatisticsList.getFirst().getStatsDesc();
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() :
getDefaultCatalog(conf);
String dbName = statsDesc.getDbName();
String tableName = statsDesc.getTableName();
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
- colNames.add(obj.getColName());
- }
- if (statsDesc.isIsTblLevel()) {
- // there should be only one ColumnStatistics
- if (request.getColStatsSize() != 1) {
- throw new MetaException(
- "Expecting only 1 ColumnStatistics for table's column stats, but
find "
- + request.getColStatsSize());
- }
- if (request.isSetNeedMerge() && request.isNeedMerge()) {
- return updateTableColumnStatsWithMerge(catName, dbName, tableName,
colNames, request);
- } else {
- // This is the overwrite case, we do not care about the accuracy.
- return updateTableColumnStatsInternal(firstColStats,
- request.getValidWriteIdList(), request.getWriteId());
- }
- } else {
- // partition level column stats merging
- // note that we may have two or more duplicate partition names.
- // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
- Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
- for (ColumnStatistics csNew : csNews) {
- String partName = csNew.getStatsDesc().getPartName();
- if (newStatsMap.containsKey(partName)) {
- MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
- }
- newStatsMap.put(partName, csNew);
- }
-
- if (request.isSetNeedMerge() && request.isNeedMerge()) {
- ret = updatePartColumnStatsWithMerge(catName, dbName, tableName,
- colNames, newStatsMap, request);
- } else { // No merge.
- Table t = getTable(catName, dbName, tableName);
- MTable mTable = getMS().ensureGetMTable(catName, dbName, tableName);
- // We don't short-circuit on errors here anymore. That can leave acid
stats invalid.
- if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
- ret = updatePartitionColStatsInBatch(t, newStatsMap,
- request.getValidWriteIdList(), request.getWriteId());
- } else {
- for (Map.Entry<String, ColumnStatistics> entry :
newStatsMap.entrySet()) {
- // We don't short-circuit on errors here anymore. That can leave
acid stats invalid.
- ret = updatePartitonColStatsInternal(t, mTable, entry.getValue(),
- request.getValidWriteIdList(), request.getWriteId()) &&
ret;
- }
- }
- }
- }
- return ret;
- }
-
- private boolean updatePartColumnStatsWithMerge(String catName, String
dbName, String tableName,
- List<String> colNames,
Map<String, ColumnStatistics> newStatsMap, SetPartitionsStatsRequest request)
- throws MetaException, NoSuchObjectException, InvalidObjectException,
InvalidInputException {
- RawStore ms = getMS();
- ms.openTransaction();
- boolean isCommitted = false, result = false;
- try {
- // a single call to get all column stats for all partitions
- List<String> partitionNames = new ArrayList<>();
- partitionNames.addAll(newStatsMap.keySet());
- List<ColumnStatistics> csOlds = ms.getPartitionColumnStatistics(catName,
dbName, tableName,
- partitionNames, colNames, request.getEngine(),
request.getValidWriteIdList());
- if (newStatsMap.values().size() != csOlds.size()) {
- // some of the partitions miss stats.
- LOG.debug("Some of the partitions miss stats.");
- }
- Map<String, ColumnStatistics> oldStatsMap = new HashMap<>();
- for (ColumnStatistics csOld : csOlds) {
- oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld);
- }
-
- // another single call to get all the partition objects
- List<Partition> partitions = ms.getPartitionsByNames(catName, dbName,
tableName, partitionNames);
- Map<String, Partition> mapToPart = new HashMap<>();
- for (int index = 0; index < partitionNames.size(); index++) {
- mapToPart.put(partitionNames.get(index), partitions.get(index));
- }
-
- Table t = getTable(catName, dbName, tableName);
- MTable mTable = getMS().ensureGetMTable(catName, dbName, tableName);
- Map<String, ColumnStatistics> statsMap = new HashMap<>();
- boolean useDirectSql = MetastoreConf.getBoolVar(getConf(),
ConfVars.TRY_DIRECT_SQL);
- for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet())
{
- ColumnStatistics csNew = entry.getValue();
- ColumnStatistics csOld = oldStatsMap.get(entry.getKey());
- boolean isInvalidTxnStats = csOld != null
- && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
- Partition part = mapToPart.get(entry.getKey());
- if (isInvalidTxnStats) {
- // No columns can be merged; a shortcut for getMergableCols.
- csNew.setStatsObj(Lists.newArrayList());
- } else {
- // we first use getParameters() to prune the stats
- MetaStoreServerUtils.getMergableCols(csNew, part.getParameters());
- // we merge those that can be merged
- if (csOld != null && csOld.getStatsObjSize() != 0 &&
!csNew.getStatsObj().isEmpty()) {
- MetaStoreServerUtils.mergeColStats(csNew, csOld);
- }
- }
-
- if (!csNew.getStatsObj().isEmpty()) {
- // We don't short-circuit on errors here anymore. That can leave
acid stats invalid.
- if (useDirectSql) {
- statsMap.put(csNew.getStatsDesc().getPartName(), csNew);
- } else {
- result = updatePartitonColStatsInternal(t, mTable, csNew,
- request.getValidWriteIdList(), request.getWriteId()) &&
result;
- }
- } else if (isInvalidTxnStats) {
- // For now because the stats state is such as it is, we will
invalidate everything.
- // Overall the sematics here are not clear - we could invalide only
some columns, but does
- // that make any physical sense? Could query affect some columns but
not others?
- part.setWriteId(request.getWriteId());
- StatsSetupConst.clearColumnStatsState(part.getParameters());
- StatsSetupConst.setBasicStatsState(part.getParameters(),
StatsSetupConst.FALSE);
- ms.alterPartition(catName, dbName, tableName, part.getValues(), part,
- request.getValidWriteIdList());
- result = false;
- } else {
- // TODO: why doesn't the original call for non acid tables
invalidate the stats?
- LOG.debug("All the column stats " +
csNew.getStatsDesc().getPartName()
- + " are not accurate to merge.");
- }
- }
- ms.commitTransaction();
- isCommitted = true;
- // updatePartitionColStatsInBatch starts/commit transaction internally.
As there is no write or select for update
- // operations is done in this transaction, it is safe to commit it
before calling updatePartitionColStatsInBatch.
- if (!statsMap.isEmpty()) {
- updatePartitionColStatsInBatch(t, statsMap,
request.getValidWriteIdList(), request.getWriteId());
- }
- } finally {
- if (!isCommitted) {
- ms.rollbackTransaction();
- }
- }
- return result;
- }
-
-
- private boolean updateTableColumnStatsWithMerge(String catName, String
dbName, String tableName,
- List<String> colNames,
SetPartitionsStatsRequest request) throws MetaException,
- NoSuchObjectException, InvalidObjectException, InvalidInputException {
- ColumnStatistics firstColStats = request.getColStats().get(0);
- RawStore ms = getMS();
- ms.openTransaction();
- boolean isCommitted = false, result = false;
+ startFunction("set_aggr_stats_for",
+ ": db=" + dbName + " tab=" + tableName + " needMerge=" +
req.isNeedMerge() +
+ " isTblLevel=" + statsDesc.isIsTblLevel());
+ Exception ex = null;
+ boolean success = false;
try {
- ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName,
tableName, colNames,
- request.getEngine(), request.getValidWriteIdList());
- // we first use the valid stats list to prune the stats
- boolean isInvalidTxnStats = csOld != null
- && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
- if (isInvalidTxnStats) {
- // No columns can be merged; a shortcut for getMergableCols.
- firstColStats.setStatsObj(Lists.newArrayList());
- } else {
- Table t = getTable(catName, dbName, tableName);
- MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
-
- // we merge those that can be merged
- if (csOld != null && csOld.getStatsObjSize() != 0 &&
!firstColStats.getStatsObj().isEmpty()) {
- MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
- }
- }
-
- if (!firstColStats.getStatsObj().isEmpty()) {
- result = updateTableColumnStatsInternal(firstColStats,
- request.getValidWriteIdList(), request.getWriteId());
- } else if (isInvalidTxnStats) {
- // For now because the stats state is such as it is, we will
invalidate everything.
- // Overall the sematics here are not clear - we could invalide only
some columns, but does
- // that make any physical sense? Could query affect some columns but
not others?
- Table t = getTable(catName, dbName, tableName);
- t.setWriteId(request.getWriteId());
- StatsSetupConst.clearColumnStatsState(t.getParameters());
- StatsSetupConst.setBasicStatsState(t.getParameters(),
StatsSetupConst.FALSE);
- ms.alterTable(catName, dbName, tableName, t,
request.getValidWriteIdList());
- } else {
- // TODO: why doesn't the original call for non acid tables invalidate
the stats?
- LOG.debug("All the column stats are not accurate to merge.");
- result = true;
- }
-
- ms.commitTransaction();
- isCommitted = true;
+ success = AbstractRequestHandler.offer(this, req).success();
+ return success;
+ } catch (Exception e) {
+ ex = e;
+ throw handleException(e).throwIfInstance(MetaException.class,
NoSuchObjectException.class)
+ .convertIfInstance(IOException.class, MetaException.class)
+ .defaultMetaException();
} finally {
- if (!isCommitted) {
- ms.rollbackTransaction();
- }
+ endFunction("set_aggr_stats_for", success, ex,
+ TableName.getQualified(catName, dbName, tableName));
}
- return result;
}
private Table getTable(String catName, String dbName, String tableName)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
index c00d23e5149..d8d7316e876 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java
@@ -189,10 +189,6 @@ protected A execute() throws TException, IOException {
public String getMessagePrefix() {
throw new UnsupportedOperationException();
}
- @Override
- public String getRequestProgress() {
- throw new UnsupportedOperationException();
- }
};
}
}
@@ -200,6 +196,7 @@ public String getRequestProgress() {
return opHandler;
}
+ @SuppressWarnings("unchecked")
public static <T extends AbstractRequestHandler> T offer(IHMSHandler
handler, TBase req)
throws TException, IOException {
HandlerFactory factory = REQ_FACTORIES.get(req.getClass());
@@ -235,8 +232,8 @@ public RequestStatus getRequestStatus() throws TException {
// No Op, return to the caller since long polling timeout has expired
LOG.trace("{} Long polling timed out", logMsgPrefix);
} catch (CancellationException e) {
- // The background operation thread was cancelled
- LOG.trace("{} The background operation was cancelled", logMsgPrefix);
+ // The background handler thread was cancelled
+ LOG.trace("{} The background handler was cancelled", logMsgPrefix);
} catch (ExecutionException | InterruptedException e) {
// No op, we will deal with this exception later
LOG.error("{} Failed", logMsgPrefix, e);
@@ -280,11 +277,12 @@ public void cancelRequest() {
}
/**
- * Retrieve the result after this operation is done,
- * an IllegalStateException would raise if the operation has not completed.
- * @return the operation result
- * @throws TException exception while checking the status of the operation
+ * Retrieve the result after this handler is done,
+ * an IllegalStateException would raise if the handler has not completed.
+ * @return the handler's result
+ * @throws TException exception while checking the status of the handler
*/
+ @SuppressWarnings("unchecked")
public final A getResult() throws TException {
RequestStatus resp = getRequestStatus();
if (!resp.finished) {
@@ -295,8 +293,8 @@ public final A getResult() throws TException {
}
/**
- * Method invoked prior to executing the given operation.
- * This method may be used to initialize and validate the operation.
+ * Method invoked prior to executing the given handler.
+ * This method may be used to initialize and validate the handler.
* @throws TException
*/
protected void beforeExecute() throws TException, IOException {
@@ -304,15 +302,15 @@ protected void beforeExecute() throws TException,
IOException {
}
/**
- * Run this operation.
+ * Run this handler.
* @return computed result
- * @throws TException if unable to run the operation
+ * @throws TException if unable to run the handler
* @throws IOException if the request is invalid
*/
protected abstract A execute() throws TException, IOException;
/**
- * Method after the operation is done.
+ * Method after the handler is done.
* Can be used to free the resources this handler holds
*/
protected void afterExecute(A result) throws TException, IOException {
@@ -321,18 +319,20 @@ protected void afterExecute(A result) throws TException,
IOException {
}
/**
- * Get the prefix for logging the message on polling the operation status.
+ * Get the prefix for logging the message on polling the handler's status.
*
* @return message prefix
*/
protected abstract String getMessagePrefix();
/**
- * Get the message about the operation progress.
+ * Get the handler's progress that will show at the client.
*
* @return the progress
*/
- protected abstract String getRequestProgress();
+ protected String getRequestProgress() {
+ return "Done";
+ }
public boolean success() throws TException {
RequestStatus status = getRequestStatus();
@@ -341,7 +341,7 @@ public boolean success() throws TException {
/**
* Get the alias of this handler for metrics.
- * @return the alias, null or empty if no need to measure the operation.
+ * @return the alias, null or empty if no need to measure the handler.
*/
private String getMetricAlias() {
RequestHandler rh = getClass().getAnnotation(RequestHandler.class);
@@ -366,7 +366,7 @@ interface HandlerFactory {
public interface Result {
/**
* An indicator to tell if the handler is successful or not.
- * @return true if the operation is successful, false otherwise
+ * @return true if the handler is successful, false otherwise
*/
boolean success();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
index 4302c5e84b1..45355a36bb3 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionsArgs;
@@ -186,7 +187,12 @@ protected AddPartitionsResult execute() throws TException,
IOException {
new long[0], new BitSet(), writeId);
validWriteIds = validWriteIdList.toString();
}
- ((HMSHandler)handler).updatePartitonColStatsInternal(table, null,
partColStats, validWriteIds, writeId);
+ SetPartitionsStatsRequest setPartitionsStatsRequest =
+ new SetPartitionsStatsRequest(Arrays.asList(partColStats));
+ setPartitionsStatsRequest.setWriteId(writeId);
+ setPartitionsStatsRequest.setValidWriteIdList(validWriteIds);
+ setPartitionsStatsRequest.setNeedMerge(false);
+
handler.update_partition_column_statistics_req(setPartitionsStatsRequest);
}
success = ms.commitTransaction();
@@ -541,11 +547,6 @@ protected String getMessagePrefix() {
return "AddPartitionsHandler [" + id + "] - Add partitions for " +
tableName + ":";
}
- @Override
- protected String getRequestProgress() {
- return "Adding partitions";
- }
-
public record AddPartitionsResult(boolean success, List<Partition> newParts)
implements Result {
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateDatabaseHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateDatabaseHandler.java
new file mode 100644
index 00000000000..1632090192c
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateDatabaseHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.CreateDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = CreateDatabaseRequest.class)
+public class CreateDatabaseHandler
+ extends AbstractRequestHandler<CreateDatabaseRequest,
CreateDatabaseHandler.CreateDatabaseResult> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateDatabaseHandler.class);
+ private RawStore ms;
+ private Warehouse wh;
+ private Database db;
+ private boolean skipAuthorization;
+ private String name;
+
+ CreateDatabaseHandler(IHMSHandler handler, CreateDatabaseRequest request) {
+ super(handler, false, request);
+ }
+
+ @Override
+ protected CreateDatabaseResult execute() throws TException, IOException {
+ boolean success = false;
+ boolean madeManagedDir = false;
+ boolean madeExternalDir = false;
+ boolean isReplicated = isDbReplicationTarget(db);
+ Map<String, String> transactionalListenersResponses =
Collections.emptyMap();
+ Path dbExtPath = new Path(db.getLocationUri());
+ Path dbMgdPath = db.getManagedLocationUri() != null ? new
Path(db.getManagedLocationUri()) : null;
+ boolean isInTest = MetastoreConf.getBoolVar(handler.getConf(),
HIVE_IN_TEST);
+ try {
+ Database authDb = new Database(db);
+ if (skipAuthorization) {
+ // @TODO could it move to authorization layer?
+ //null out to skip authorizer URI check
+ authDb.setManagedLocationUri(null);
+ authDb.setLocationUri(null);
+ }
+
+ ((HMSHandler) handler).firePreEvent(new PreCreateDatabaseEvent(authDb,
handler));
+ if (db.getCatalogName() != null &&
!db.getCatalogName().equals(Warehouse.DEFAULT_CATALOG_NAME)) {
+ if (!wh.isDir(dbExtPath)) {
+ LOG.debug("Creating database path {}", dbExtPath);
+ if (!wh.mkdirs(dbExtPath)) {
+ throw new MetaException("Unable to create database path " +
dbExtPath +
+ ", failed to create database " + db.getName());
+ }
+ madeExternalDir = true;
+ }
+ } else {
+ if (dbMgdPath != null) {
+ try {
+ // Since this may be done as random user (if doAs=true) he may not
have access
+ // to the managed directory. We run this as an admin user
+ madeManagedDir =
UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<Boolean>)
() -> {
+ if (!wh.isDir(dbMgdPath)) {
+ LOG.info("Creating database path in managed directory {}",
dbMgdPath);
+ if (!wh.mkdirs(dbMgdPath)) {
+ throw new MetaException("Unable to create database managed
path " + dbMgdPath +
+ ", failed to create database " + db.getName());
+ }
+ return true;
+ }
+ return false;
+ });
+ if (madeManagedDir) {
+ LOG.info("Created database path in managed directory {}",
dbMgdPath);
+ } else if (!isInTest || !isDbReplicationTarget(db)) { // Hive
replication tests doesn't drop the db after each test
+ throw new MetaException("Unable to create database managed
directory " + dbMgdPath +
+ ", failed to create database " + db.getName());
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new MetaException(
+ "Unable to create database managed directory " + dbMgdPath +
", failed to create database " +
+ db.getName() + ":" + e.getMessage());
+ }
+ }
+ try {
+ madeExternalDir =
UserGroupInformation.getCurrentUser().doAs((PrivilegedExceptionAction<Boolean>)
() -> {
+ if (!wh.isDir(dbExtPath)) {
+ LOG.info("Creating database path in external directory {}",
dbExtPath);
+ return wh.mkdirs(dbExtPath);
+ }
+ return false;
+ });
+ if (madeExternalDir) {
+ LOG.info("Created database path in external directory {}",
dbExtPath);
+ } else {
+ LOG.warn(
+ "Failed to create external path {} for database {}. " +
+ "This may result in access not being allowed if the
StorageBasedAuthorizationProvider is enabled",
+ dbExtPath, db.getName());
+ }
+ } catch (IOException | InterruptedException |
UndeclaredThrowableException e) {
+ throw new MetaException("Failed to create external path " +
dbExtPath + " for database " + db.getName() +
+ ". This may result in access not being allowed if the " +
+ "StorageBasedAuthorizationProvider is enabled: " +
e.getMessage());
+ }
+ }
+
+ ms.openTransaction();
+ ms.createDatabase(db);
+
+ if (!handler.getTransactionalListeners().isEmpty()) {
+ transactionalListenersResponses =
+
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+ EventMessage.EventType.CREATE_DATABASE,
+ new CreateDatabaseEvent(db, true, handler, isReplicated));
+ }
+
+ success = ms.commitTransaction();
+ } finally {
+ if (!success) {
+ ms.rollbackTransaction();
+ if (db.getCatalogName() != null &&
!db.getCatalogName().equals(Warehouse.DEFAULT_CATALOG_NAME)) {
+ if (madeManagedDir && dbMgdPath != null) {
+ wh.deleteDir(dbMgdPath, true, db);
+ }
+ } else {
+ if (madeManagedDir && dbMgdPath != null) {
+ try {
+
UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<Void>) ()
-> {
+ wh.deleteDir(dbMgdPath, true, db);
+ return null;
+ });
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Couldn't delete managed directory {} after it was
created for database {} {}",
+ dbMgdPath, db.getName(), e.getMessage());
+ }
+ }
+
+ if (madeExternalDir) {
+ try {
+
UserGroupInformation.getCurrentUser().doAs((PrivilegedExceptionAction<Void>) ()
-> {
+ wh.deleteDir(dbExtPath, true, db);
+ return null;
+ });
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Couldn't delete external directory {} after it was
created for database {} {}",
+ dbExtPath, db.getName(), e.getMessage());
+ }
+ }
+ }
+ }
+ }
+ return new CreateDatabaseResult(success, transactionalListenersResponses);
+ }
+
+ @Override
+ protected void beforeExecute() throws TException, IOException {
+ this.name = request.getDatabaseName();
+ if (!MetaStoreUtils.validateName(name, handler.getConf())) {
+ throw new InvalidObjectException(name + " is not a valid database name");
+ }
+ this.ms = handler.getMS();
+ String catalogName =
+ request.isSetCatalogName() ? request.getCatalogName() :
getDefaultCatalog(handler.getConf());
+ Catalog cat;
+ try {
+ cat = ms.getCatalog(catalogName);
+ } catch (NoSuchObjectException e) {
+ LOG.error("No such catalog {}", catalogName);
+ throw new InvalidObjectException("No such catalog " + catalogName);
+ }
+
+ db = new Database(name, request.getDescription(),
request.getLocationUri(), request.getParameters());
+ db.setPrivileges(request.getPrivileges());
+ db.setOwnerName(request.getOwnerName());
+ db.setOwnerType(request.getOwnerType());
+ db.setCatalogName(catalogName);
+ db.setCreateTime((int)(System.currentTimeMillis() / 1000));
+ db.setManagedLocationUri(request.getManagedLocationUri());
+ db.setType(request.getType());
+ db.setConnector_name(request.getDataConnectorName());
+ db.setRemote_dbname(request.getRemote_dbname());
+ this.wh = handler.getWh();
+
+ String passedInURI = db.getLocationUri();
+ String passedInManagedURI = db.getManagedLocationUri();
+ Path defaultDbExtPath = wh.getDefaultDatabasePath(db.getName(), true);
+ Path defaultDbMgdPath = wh.getDefaultDatabasePath(db.getName(), false);
+ Path dbExtPath = (passedInURI != null) ?
+ wh.getDnsPath(new Path(passedInURI)) : wh.determineDatabasePath(cat,
db);
+ Path dbMgdPath = (passedInManagedURI != null) ? wh.getDnsPath(new
Path(passedInManagedURI)) : null;
+
+ skipAuthorization = ((passedInURI == null && passedInManagedURI == null) ||
+ (defaultDbExtPath.equals(dbExtPath) &&
+ (dbMgdPath == null || defaultDbMgdPath.equals(dbMgdPath))));
+
+ db.setLocationUri(dbExtPath.toString());
+ if (dbMgdPath != null) {
+ db.setManagedLocationUri(dbMgdPath.toString());
+ }
+
+ if (db.getOwnerName() == null){
+ try {
+ db.setOwnerName(SecurityUtils.getUGI().getShortUserName());
+ } catch (Exception e) {
+ LOG.warn("Failed to get owner name for create database operation.", e);
+ }
+ }
+ }
+
+ @Override
+ protected void afterExecute(CreateDatabaseResult result) throws TException,
IOException {
+ boolean success = result != null && result.success();
+ if (!handler.getListeners().isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
EventMessage.EventType.CREATE_DATABASE,
+ new CreateDatabaseEvent(db, success, handler,
isDbReplicationTarget(db)),
+ null, result != null ? result.transactionalListenersResponses :
Collections.emptyMap(), ms);
+ }
+ }
+
+ @Override
+ protected String getMessagePrefix() {
+ return "CreateDatabaseHandler [" + id + "] - Create database " + name +
":";
+ }
+
+ public record CreateDatabaseResult(boolean success,
+ Map<String, String>
transactionalListenersResponses) implements Result {
+
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
index 81da6dc7eb2..c0eb701c55f 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateTableHandler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.metastore.handler;
import java.io.IOException;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
@@ -41,16 +42,13 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SQLAllTableConstraints;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -62,7 +60,6 @@
import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
-import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -82,6 +79,7 @@
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+@SuppressWarnings("unused")
@RequestHandler(requestBody = CreateTableRequest.class)
public class CreateTableHandler
extends AbstractRequestHandler<CreateTableRequest,
CreateTableHandler.CreateTableResult> {
@@ -260,65 +258,14 @@ protected void afterExecute(CreateTableResult result)
throws TException, IOExcep
new long[0], new BitSet(), writeId);
validWriteIds = validWriteIdList.toString();
}
- updateTableColumnStatsInternal(colStats, validWriteIds,
tbl.getWriteId());
+ SetPartitionsStatsRequest setStatsRequest = new
SetPartitionsStatsRequest(Arrays.asList(colStats));
+ setStatsRequest.setWriteId(writeId);
+ setStatsRequest.setValidWriteIdList(validWriteIds);
+ setStatsRequest.setNeedMerge(false);
+ handler.update_table_column_statistics_req(setStatsRequest);
}
}
- private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
- String validWriteIds, long writeId)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
- normalizeColStatsInput(colStats);
- Map<String, String> parameters = null;
- rs.openTransaction();
- boolean committed = false;
- try {
- parameters = rs.updateTableColumnStatistics(colStats, validWriteIds,
writeId);
- if (parameters != null) {
- Table tableObj = rs.getTable(colStats.getStatsDesc().getCatName(),
- colStats.getStatsDesc().getDbName(),
- colStats.getStatsDesc().getTableName(), validWriteIds);
- if (!handler.getTransactionalListeners().isEmpty()) {
-
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
- EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
- writeId, handler));
- }
- if (!handler.getListeners().isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
- EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
- writeId, handler));
- }
- }
- committed = rs.commitTransaction();
- } finally {
- if (!committed) {
- rs.rollbackTransaction();
- }
- }
-
- return parameters != null;
- }
-
- private void normalizeColStatsInput(ColumnStatistics colStats) throws
MetaException {
- // TODO: is this really needed? this code is propagated from HIVE-1362 but
most of it is useless.
- ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
- statsDesc.setCatName(statsDesc.isSetCatName() ?
statsDesc.getCatName().toLowerCase() :
- getDefaultCatalog(handler.getConf()));
- statsDesc.setDbName(statsDesc.getDbName().toLowerCase());
- statsDesc.setTableName(statsDesc.getTableName().toLowerCase());
- statsDesc.setPartName(statsDesc.getPartName());
- long time = System.currentTimeMillis() / 1000;
- statsDesc.setLastAnalyzed(time);
-
- for (ColumnStatisticsObj statsObj : colStats.getStatsObj()) {
- statsObj.setColName(statsObj.getColName().toLowerCase());
- statsObj.setColType(statsObj.getColType().toLowerCase());
- }
- colStats.setStatsDesc(statsDesc);
- colStats.setStatsObj(colStats.getStatsObj());
- }
-
@Override
protected void beforeExecute() throws TException, IOException {
this.tbl = request.getTable();
@@ -444,11 +391,6 @@ protected String getMessagePrefix() {
TableName.getQualified(tbl.getCatName(), tbl.getDbName(),
tbl.getTableName()) + ":";
}
- @Override
- protected String getRequestProgress() {
- return "Creating table";
- }
-
public record CreateTableResult(boolean success, Map<String, String>
transactionalListenerResponses)
implements Result {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
index 3a475040a7f..fd632470b42 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java
@@ -67,6 +67,7 @@
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+@SuppressWarnings("unused")
@RequestHandler(requestBody = DropDatabaseRequest.class, supportAsync = true,
metricAlias = "drop_database_req")
public class DropDatabaseHandler
extends AbstractRequestHandler<DropDatabaseRequest,
DropDatabaseHandler.DropDatabaseResult> {
@@ -465,12 +466,16 @@ protected void afterExecute(DropDatabaseResult result)
throws TException, IOExce
if (async) {
rs.shutdown();
}
- rs = null;
- tables = null;
- functions = null;
- procedures = null;
- packages = null;
- db = null;
+ freeHandler();
}
}
+
+ private void freeHandler() {
+ rs = null;
+ tables = null;
+ functions = null;
+ procedures = null;
+ packages = null;
+ db = null;
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
index 823601dd7ca..e2ac7088866 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropPartitionsHandler.java
@@ -290,11 +290,6 @@ protected String getMessagePrefix() {
return "DropPartitionsHandler [" + id + "] - Drop partitions from " +
tableName + ":";
}
- @Override
- protected String getRequestProgress() {
- return "Dropping partitions";
- }
-
public static class DropPartitionsResult implements Result {
private final List<Partition> partitions;
private final boolean tableDataShouldBeDeleted;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java
new file mode 100644
index 00000000000..ff645e004cd
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = SetPartitionsStatsRequest.class)
+public class SetAggrStatsHandler
+ extends AbstractRequestHandler<SetPartitionsStatsRequest,
SetAggrStatsHandler.SetAggrStatsResult> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SetAggrStatsHandler.class);
+ private RawStore ms;
+ private String catName;
+ private String dbName;
+ private String tableName;
+ private Table t;
+ private boolean needMerge;
+ private Configuration conf;
+
+ SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) {
+ super(handler, false, request);
+ }
+
+ @Override
+ protected void beforeExecute() throws TException, IOException {
+ this.needMerge = request.isSetNeedMerge() && request.isNeedMerge();
+ this.conf = handler.getConf();
+ this.ms = handler.getMS();
+ List<ColumnStatistics> csNews = request.getColStats();
+ if (csNews != null && !csNews.isEmpty()) {
+ ColumnStatistics firstColStats = csNews.getFirst();
+ ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+ this.catName = normalizeIdentifier(statsDesc.isSetCatName() ?
statsDesc.getCatName() : getDefaultCatalog(conf));
+ this.dbName = normalizeIdentifier(statsDesc.getDbName());
+ this.tableName = normalizeIdentifier(statsDesc.getTableName());
+ this.t = ms.getTable(catName, dbName, tableName);
+ if (this.t == null) {
+ throw new NoSuchObjectException("Table " +
+ TableName.getQualified(catName, dbName, tableName) + " does not
exist");
+ }
+ if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) {
+ // there should be only one ColumnStatistics
+ throw new MetaException(
+ "Expecting only 1 ColumnStatistics for table's column stats, but
find " + request.getColStatsSize());
+ }
+ }
+ }
+
+ @Override
+ protected SetAggrStatsResult execute() throws TException, IOException {
+ boolean ret = true;
+ List<ColumnStatistics> csNews = request.getColStats();
+ if (csNews == null || csNews.isEmpty()) {
+ return new SetAggrStatsResult(true);
+ }
+ // figure out if it is table level or partition level
+ ColumnStatistics firstColStats = csNews.getFirst();
+ ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
+ colNames.add(obj.getColName());
+ }
+ if (statsDesc.isIsTblLevel()) {
+ if (needMerge) {
+ return new
SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames));
+ } else {
+ // This is the overwrite case, we do not care about the accuracy.
+ return new
SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats,
+ request.getValidWriteIdList(), request.getWriteId()));
+ }
+ } else {
+ // partition level column stats merging
+ // note that we may have two or more duplicate partition names.
+ // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
+ Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+ for (ColumnStatistics csNew : csNews) {
+ String partName = csNew.getStatsDesc().getPartName();
+ if (newStatsMap.containsKey(partName)) {
+ MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
+ }
+ newStatsMap.put(partName, csNew);
+ }
+
+ if (needMerge) {
+ ret = updatePartColumnStatsWithMerge(colNames, newStatsMap);
+ } else { // No merge.
+ // We don't short-circuit on errors here anymore. That can leave acid
stats invalid.
+ if (MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.TRY_DIRECT_SQL)) {
+ ret = updatePartitionColStatsInBatch(newStatsMap,
+ request.getValidWriteIdList(), request.getWriteId());
+ } else {
+ MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+ for (Map.Entry<String, ColumnStatistics> entry :
newStatsMap.entrySet()) {
+ // We don't short-circuit on errors here anymore. That can leave
acid stats invalid.
+ ret = updatePartitionColStatsInternal(mTable, entry.getValue(),
+ request.getValidWriteIdList(), request.getWriteId()) && ret;
+ }
+ }
+ }
+ }
+ return new SetAggrStatsResult(ret);
+ }
+
+ private boolean updateTableColumnStatsWithMerge(List<String> colNames)
throws MetaException,
+ NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ ColumnStatistics firstColStats = request.getColStats().getFirst();
+ ms.openTransaction();
+ boolean isCommitted = false, result = false;
+ try {
+ ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName,
tableName, colNames,
+ request.getEngine(), request.getValidWriteIdList());
+ // we first use the valid stats list to prune the stats
+ boolean isInvalidTxnStats = csOld != null
+ && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+ if (isInvalidTxnStats) {
+ // No columns can be merged; a shortcut for getMergableCols.
+ firstColStats.setStatsObj(Lists.newArrayList());
+ } else {
+ MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
+ // we merge those that can be merged
+ if (csOld != null && csOld.getStatsObjSize() != 0 &&
!firstColStats.getStatsObj().isEmpty()) {
+ MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
+ }
+ }
+
+ if (!firstColStats.getStatsObj().isEmpty()) {
+ result = updateTableColumnStatsInternal(firstColStats,
+ request.getValidWriteIdList(), request.getWriteId());
+ } else if (isInvalidTxnStats) {
+ // For now because the stats state is such as it is, we will
invalidate everything.
+ // Overall the semantics here are not clear - we could invalidate only
some columns, but does
+ // that make any physical sense? Could query affect some columns but
not others?
+ t.setWriteId(request.getWriteId());
+ StatsSetupConst.clearColumnStatsState(t.getParameters());
+ StatsSetupConst.setBasicStatsState(t.getParameters(),
StatsSetupConst.FALSE);
+ ms.alterTable(catName, dbName, tableName, t,
request.getValidWriteIdList());
+ } else {
+ // TODO: why doesn't the original call for non acid tables invalidate
the stats?
+ LOG.debug("All the column stats are not accurate to merge.");
+ result = true;
+ }
+
+ isCommitted = ms.commitTransaction();
+ } finally {
+ if (!isCommitted) {
+ ms.rollbackTransaction();
+ }
+ }
+ return result;
+ }
+
+ private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
+ String validWriteIds, long writeId)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
+ normalizeColStatsInput(colStats);
+
+ Map<String, String> parameters = null;
+ ms.openTransaction();
+ boolean committed = false;
+ try {
+ parameters = ms.updateTableColumnStatistics(colStats, validWriteIds,
writeId);
+ if (parameters != null) {
+ Table tableObj = ms.getTable(colStats.getStatsDesc().getCatName(),
+ colStats.getStatsDesc().getDbName(),
+ colStats.getStatsDesc().getTableName(), validWriteIds);
+ if (!handler.getTransactionalListeners().isEmpty()) {
+
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+ EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+ new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+ writeId, handler));
+ }
+ if (!handler.getListeners().isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+ EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+ new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+ writeId, handler));
+ }
+ }
+ committed = ms.commitTransaction();
+ } finally {
+ if (!committed) {
+ ms.rollbackTransaction();
+ }
+ }
+
+ return parameters != null;
+ }
+
+ private boolean updatePartColumnStatsWithMerge(
+ List<String> colNames, Map<String, ColumnStatistics> newStatsMap)
+ throws MetaException, NoSuchObjectException, InvalidObjectException,
InvalidInputException {
+ ms.openTransaction();
+ boolean isCommitted = false, result = true;
+ try {
+ // a single call to get all column stats for all partitions
+ List<String> partitionNames = new ArrayList<>();
+ partitionNames.addAll(newStatsMap.keySet());
+ List<ColumnStatistics> csOlds = ms.getPartitionColumnStatistics(catName,
dbName, tableName,
+ partitionNames, colNames, request.getEngine(),
request.getValidWriteIdList());
+ if (newStatsMap.values().size() != csOlds.size()) {
+ // some of the partitions miss stats.
+ LOG.debug("Some of the partitions miss stats.");
+ }
+ Map<String, ColumnStatistics> oldStatsMap = new HashMap<>();
+ for (ColumnStatistics csOld : csOlds) {
+ oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld);
+ }
+
+ // another single call to get all the partition objects
+ List<Partition> partitions = ms.getPartitionsByNames(catName, dbName,
tableName, partitionNames);
+ Map<String, Partition> mapToPart = new HashMap<>();
+ for (Partition p : partitions) {
+ String partName = Warehouse.makePartName(t.getPartitionKeys(),
p.getValues());
+ mapToPart.put(partName, p);
+ }
+
+ MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+ Map<String, ColumnStatistics> statsMap = new HashMap<>();
+ boolean useDirectSql = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.TRY_DIRECT_SQL);
+ for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet())
{
+ ColumnStatistics csNew = entry.getValue();
+ ColumnStatistics csOld = oldStatsMap.get(entry.getKey());
+ boolean isInvalidTxnStats = csOld != null
+ && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+ Partition part = mapToPart.get(entry.getKey());
+ if (part == null) {
+ LOG.warn("Partition {} does not exist, skip updating the column
statistics for this partition",
+ entry.getKey());
+ continue;
+ }
+ if (isInvalidTxnStats) {
+ // No columns can be merged; a shortcut for getMergableCols.
+ csNew.setStatsObj(Lists.newArrayList());
+ } else {
+ // we first use getParameters() to prune the stats
+ MetaStoreServerUtils.getMergableCols(csNew, part.getParameters());
+ // we merge those that can be merged
+ if (csOld != null && csOld.getStatsObjSize() != 0 &&
!csNew.getStatsObj().isEmpty()) {
+ MetaStoreServerUtils.mergeColStats(csNew, csOld);
+ }
+ }
+
+ if (!csNew.getStatsObj().isEmpty()) {
+ // We don't short-circuit on errors here anymore. That can leave
acid stats invalid.
+ if (useDirectSql) {
+ statsMap.put(csNew.getStatsDesc().getPartName(), csNew);
+ } else {
+ result = updatePartitionColStatsInternal(mTable, csNew,
+ request.getValidWriteIdList(), request.getWriteId()) && result;
+ }
+ } else if (isInvalidTxnStats) {
+ // For now because the stats state is such as it is, we will
invalidate everything.
+ // Overall the semantics here are not clear - we could invalidate
only some columns, but does
+ // that make any physical sense? Could query affect some columns but
not others?
+ part.setWriteId(request.getWriteId());
+ StatsSetupConst.clearColumnStatsState(part.getParameters());
+ StatsSetupConst.setBasicStatsState(part.getParameters(),
StatsSetupConst.FALSE);
+ ms.alterPartition(catName, dbName, tableName, part.getValues(), part,
+ request.getValidWriteIdList());
+ result = false;
+ } else {
+ // TODO: why doesn't the original call for non acid tables
invalidate the stats?
+ LOG.debug("All the column stats " +
csNew.getStatsDesc().getPartName()
+ + " are not accurate to merge.");
+ }
+ }
+ isCommitted = ms.commitTransaction();
+ // updatePartitionColStatsInBatch starts/commit transaction internally.
As there is no write or select for update
+ // operations is done in this transaction, it is safe to commit it
before calling updatePartitionColStatsInBatch.
+ if (!statsMap.isEmpty()) {
+ updatePartitionColStatsInBatch(statsMap,
request.getValidWriteIdList(), request.getWriteId());
+ }
+ } finally {
+ if (!isCommitted) {
+ ms.rollbackTransaction();
+ }
+ }
+ return result;
+ }
+
+ private boolean updatePartitionColStatsInBatch(Map<String, ColumnStatistics>
statsMap,
+ String validWriteIds, long writeId)
+ throws MetaException, InvalidObjectException, NoSuchObjectException,
InvalidInputException {
+
+ if (statsMap.isEmpty()) {
+ return false;
+ }
+
+ long start = System.currentTimeMillis();
+ Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+ long numStats = 0;
+ long numStatsMax = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
+ try {
+ for (Map.Entry<String, ColumnStatistics> entry : statsMap.entrySet()) {
+ ColumnStatistics colStats = entry.getValue();
+ normalizeColStatsInput(colStats);
+ assert catName.equalsIgnoreCase(colStats.getStatsDesc().getCatName());
+ assert dbName.equalsIgnoreCase(colStats.getStatsDesc().getDbName());
+ assert
tableName.equalsIgnoreCase(colStats.getStatsDesc().getTableName());
+ newStatsMap.put(entry.getKey(), colStats);
+ numStats += colStats.getStatsObjSize();
+
+ if (newStatsMap.size() >= numStatsMax) {
+ updatePartitionColStatsForOneBatch(t, newStatsMap, validWriteIds,
writeId);
+ newStatsMap.clear();
+ numStats = 0;
+ }
+ }
+ if (numStats != 0) {
+ updatePartitionColStatsForOneBatch(t, newStatsMap, validWriteIds,
writeId);
+ }
+ } finally {
+ long end = System.currentTimeMillis();
+ float sec = (end - start) / 1000F;
+ LOG.info("updatePartitionColStatsInBatch took " + sec + " seconds for "
+ statsMap.size() + " stats");
+ }
+ return true;
+ }
+
+ private boolean updatePartitionColStatsInternal(MTable mTable,
ColumnStatistics colStats,
+ String validWriteIds, long writeId)
+ throws MetaException, InvalidObjectException, NoSuchObjectException,
InvalidInputException {
+ normalizeColStatsInput(colStats);
+ ColumnStatisticsDesc csd = colStats.getStatsDesc();
+
+ Map<String, String> parameters;
+ List<String> partVals;
+ boolean committed = false;
+ ms.openTransaction();
+
+ try {
+ partVals = getPartValsFromName(t, csd.getPartName());
+ parameters = ms.updatePartitionColumnStatistics(t, mTable, colStats,
partVals, validWriteIds, writeId);
+ if (parameters != null) {
+ if (!handler.getTransactionalListeners().isEmpty()) {
+
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+ EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT,
+ new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, t,
+ writeId, handler));
+ }
+ if (!handler.getListeners().isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+ EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT,
+ new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, t,
+ writeId, handler));
+ }
+ }
+ committed = ms.commitTransaction();
+ } finally {
+ if (!committed) {
+ ms.rollbackTransaction();
+ }
+ }
+ return parameters != null;
+ }
+
+ private void normalizeColStatsInput(ColumnStatistics colStats) {
+ // TODO: is this really needed? this code is propagated from HIVE-1362 but
most of it is useless.
+ ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+ statsDesc.setCatName(statsDesc.isSetCatName() ?
statsDesc.getCatName().toLowerCase() : getDefaultCatalog(conf));
+ statsDesc.setDbName(statsDesc.getDbName().toLowerCase());
+ statsDesc.setTableName(statsDesc.getTableName().toLowerCase());
+ statsDesc.setPartName(statsDesc.getPartName());
+ long time = System.currentTimeMillis() / 1000;
+ statsDesc.setLastAnalyzed(time);
+
+ for (ColumnStatisticsObj statsObj : colStats.getStatsObj()) {
+ statsObj.setColName(statsObj.getColName().toLowerCase());
+ statsObj.setColType(statsObj.getColType().toLowerCase());
+ }
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(colStats.getStatsObj());
+ }
+
+ private void updatePartitionColStatsForOneBatch(Table tbl, Map<String,
ColumnStatistics> statsMap,
+ String validWriteIds, long writeId)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
+ Map<String, Map<String, String>> result =
ms.updatePartitionColumnStatisticsInBatch(statsMap, tbl,
+ handler.getTransactionalListeners(), validWriteIds, writeId);
+ if (result != null && !result.isEmpty() &&
!handler.getListeners().isEmpty()) {
+ // The normal listeners, unlike transaction listeners are not using the
same transactions used by the update
+ // operations. So there is no need of keeping them within the same
transactions. If notification to one of
+ // the listeners failed, then even if we abort the transaction, we can
not revert the notifications sent to the
+ // other listeners.
+ for (Map.Entry<String, Map<String,String>> entry : result.entrySet()) {
+ Map<String, String> parameters = entry.getValue();
+ ColumnStatistics colStats = statsMap.get(entry.getKey());
+ List<String> partVals = getPartValsFromName(tbl,
colStats.getStatsDesc().getPartName());
+ MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+ EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT,
+ new UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
+ tbl, writeId, handler));
+ }
+ }
+ }
+
+
+ @Override
+ protected String getMessagePrefix() {
+ return "SetAggrStatsHandler [" + id + "] - aggregating stats for " +
+ TableName.getQualified(catName, dbName, tableName) + ":";
+ }
+
+ public record SetAggrStatsResult(boolean success) implements Result {
+
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
index 6818a9cf942..a42807aadcd 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TruncateTableHandler.java
@@ -291,11 +291,7 @@ protected String getMessagePrefix() {
TableName.getQualified(catName, dbName, table.getTableName()) + ":";
}
- @Override
- protected String getRequestProgress() {
- return "Truncating table";
- }
-
public record TruncateTableResult(boolean success) implements Result {
+
}
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
index ef5c0c08327..e54dff94f8a 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
@@ -219,7 +219,10 @@ public void testListener() throws Exception {
Database db = msc.getDatabase(dbName);
assertEquals(listSize, notifyList.size());
assertEquals(listSize + 1, preNotifyList.size());
- validateCreateDb(db, preDbEvent.getDatabase());
+ // The location uri in preDbEvent.getDatabase() is null as
"skipAuthorization" is true.
+ Database expectedDb = new Database(db);
+ expectedDb.setLocationUri(null);
+ validateCreateDb(expectedDb, preDbEvent.getDatabase());
CreateDatabaseEvent dbEvent =
(CreateDatabaseEvent)(notifyList.get(listSize - 1));
Assert.assertTrue(dbEvent.getStatus());