This is an automated email from the ASF dual-hosted git repository. dengzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a1ff44ccd43 HIVE-25032: Optimise PartitionManagementTask (#4028) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Henri Biestro) a1ff44ccd43 is described below commit a1ff44ccd434373c7eef56fc081b40c343a23f33 Author: dengzh <dengzhhu...@gmail.com> AuthorDate: Thu Mar 23 23:05:01 2023 +0800 HIVE-25032: Optimise PartitionManagementTask (#4028) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Henri Biestro) --- .../ql/exec/TestMsckDropPartitionsInBatches.java | 7 +- .../thrift/gen-cpp/hive_metastore_constants.cpp | 4 + .../gen/thrift/gen-cpp/hive_metastore_constants.h | 2 + .../metastore/api/hive_metastoreConstants.java | 4 + .../src/gen/thrift/gen-php/metastore/Constant.php | 12 +++ .../gen/thrift/gen-py/hive_metastore/constants.py | 2 + .../gen/thrift/gen-rb/hive_metastore_constants.rb | 4 + .../src/main/thrift/hive_metastore.thrift | 2 + .../apache/hadoop/hive/metastore/HMSHandler.java | 2 +- .../hive/metastore/HiveMetaStoreChecker.java | 21 +++- .../org/apache/hadoop/hive/metastore/Msck.java | 22 ++-- .../metastore/MsckPartitionExpressionProxy.java | 37 ++++--- .../hadoop/hive/metastore/PartitionIterable.java | 27 ++++- .../hive/metastore/PartitionManagementTask.java | 117 ++++++++++----------- .../GetPartitionProjectionsSpecBuilder.java | 16 ++- .../hive/metastore/parser/ExpressionTree.java | 12 ++- .../hive/metastore/utils/MetaStoreServerUtils.java | 39 +++++++ .../hive/metastore/TestPartitionManagement.java | 16 +++ .../hadoop/hive/metastore/tools/BenchmarkTool.java | 30 +++++- .../hadoop/hive/metastore/tools/HMSBenchmarks.java | 92 ++++++++++++++++ .../hadoop/hive/metastore/tools/HMSClient.java | 6 +- 21 files changed, 360 insertions(+), 114 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java index e7318bf6d34..a61e066266a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult; @@ -274,10 +275,12 @@ public class TestMsckDropPartitionsInBatches { assertEquals(expectedCallCount, droppedParts.size()); for (int i = 0; i < expectedCallCount; i++) { + List<Pair<Integer, byte[]>> actualArgs = droppedParts.get(i); + int actualPartitionSize = actualArgs.get(0).getLeft(); Assert.assertEquals( String.format("Unexpected batch size in attempt %d. Expected: %d. Found: %d", i + 1, - expectedBatchSizes[i], droppedParts.get(i).size()), - expectedBatchSizes[i], droppedParts.get(i).size()); + expectedBatchSizes[i], actualPartitionSize), + expectedBatchSizes[i], actualPartitionSize); } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index a94b58fa2f5..9f0b0c8cf8d 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -27,6 +27,10 @@ hive_metastoreConstants::hive_metastoreConstants() { HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__"; + HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__"; + + HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__"; + IS_ARCHIVED = "is_archived"; ORIGINAL_LOCATION = "original_location"; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ddff39d4735..504b54a01d9 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -23,6 +23,8 @@ class hive_metastoreConstants { std::string HIVE_FILTER_FIELD_OWNER; std::string HIVE_FILTER_FIELD_PARAMS; std::string HIVE_FILTER_FIELD_LAST_ACCESS; + std::string HIVE_FILTER_FIELD_TABLE_NAME; + std::string HIVE_FILTER_FIELD_TABLE_TYPE; std::string IS_ARCHIVED; std::string ORIGINAL_LOCATION; std::string IS_IMMUTABLE; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 02eed33ce8d..f5a102ab964 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -25,6 +25,10 @@ package org.apache.hadoop.hive.metastore.api; public static final java.lang.String HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__"; + public static final java.lang.String HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__"; + + public static final java.lang.String HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__"; + public static final java.lang.String IS_ARCHIVED = "is_archived"; public static final java.lang.String ORIGINAL_LOCATION = "original_location"; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php index 84809315937..84961065fd5 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php @@ -26,6 +26,8 @@ final class Constant extends \Thrift\Type\TConstant static protected $HIVE_FILTER_FIELD_OWNER; static protected $HIVE_FILTER_FIELD_PARAMS; static protected $HIVE_FILTER_FIELD_LAST_ACCESS; + static protected $HIVE_FILTER_FIELD_TABLE_NAME; + static protected $HIVE_FILTER_FIELD_TABLE_TYPE; static protected $IS_ARCHIVED; static protected $ORIGINAL_LOCATION; static protected $IS_IMMUTABLE; @@ -101,6 +103,16 @@ final class Constant extends \Thrift\Type\TConstant return "hive_filter_field_last_access__"; } + protected static function init_HIVE_FILTER_FIELD_TABLE_NAME() + { + return "hive_filter_field_tableName__"; + } + + protected static function init_HIVE_FILTER_FIELD_TABLE_TYPE() + { + return "hive_filter_field_tableType__"; + } + protected static function init_IS_ARCHIVED() { return "is_archived"; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py index 7c98dd25fbe..b5891397a6e 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -20,6 +20,8 @@ ACCESSTYPE_READWRITE = 8 HIVE_FILTER_FIELD_OWNER = "hive_filter_field_owner__" HIVE_FILTER_FIELD_PARAMS = "hive_filter_field_params__" HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__" +HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__" +HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__" IS_ARCHIVED = "is_archived" ORIGINAL_LOCATION = "original_location" IS_IMMUTABLE = "immutable" diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb index 05135527cd1..e7c30a2c4dc 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -23,6 +23,10 @@ HIVE_FILTER_FIELD_PARAMS = %q"hive_filter_field_params__" HIVE_FILTER_FIELD_LAST_ACCESS = %q"hive_filter_field_last_access__" +HIVE_FILTER_FIELD_TABLE_NAME = %q"hive_filter_field_tableName__" + +HIVE_FILTER_FIELD_TABLE_TYPE = %q"hive_filter_field_tableType__" + IS_ARCHIVED = %q"is_archived" ORIGINAL_LOCATION = %q"original_location" diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index bca23c8111c..20ea52b71e1 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -164,6 +164,8 @@ enum PrincipalType { const string HIVE_FILTER_FIELD_OWNER = "hive_filter_field_owner__" const string HIVE_FILTER_FIELD_PARAMS = "hive_filter_field_params__" const string HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__" +const string HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__" +const string HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__" enum PartitionEventType { LOAD_DONE = 1, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index ffc6a162796..5c091836d81 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -5708,7 +5708,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { GetPartitionsResponse response = null; Exception ex = null; try { - Table table = get_table_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName); + Table table = get_table_core(catName, parsedDbName[DB_NAME], tableName); List<Partition> partitions = getMS() .getPartitionSpecsByFilterAndProjection(table, request.getProjectionSpec(), request.getFilterSpec()); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java index 88d761dc800..63497e7036f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java @@ -29,12 +29,14 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPar import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionListByFilterExp; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionColtoTypeMap; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionsByProjectSpec; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -58,12 +60,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec; +import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetastoreException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.FileUtils; @@ -264,12 +270,17 @@ public class HiveMetaStoreChecker { MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DEFAULTPARTITIONNAME), results); parts = new PartitionIterable(results); } else { + GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(), + null, null); + request.setProjectionSpec(new GetPartitionProjectionsSpecBuilder().addProjectField("sd.location") + .addProjectField("createTime").addProjectField("tableName") + .addProjectField("values").build()); + request.setCatName(table.getCatName()); int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); if (batchSize > 0) { - parts = new PartitionIterable(getMsc(), table, batchSize); + parts = new PartitionIterable(getMsc(), table, batchSize).withProjectSpec(request); } else { - List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table); - parts = new PartitionIterable(loadedPartitions); + parts = new PartitionIterable(getPartitionsByProjectSpec(msc, request)); } } } else { @@ -383,8 +394,8 @@ public class HiveMetaStoreChecker { pr.setTableName(partition.getTableName()); result.getExpiredPartitions().add(pr); if (LOG.isDebugEnabled()) { - LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(), - partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs, + LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", table.getCatName(), + table.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs, partitionAgeSeconds, partitionExpirySeconds); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java index 926875e514c..79bc4b49d1e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -130,9 +131,10 @@ public class Msck { String qualifiedTableName = null; boolean success = false; long txnId = -1; - long partitionExpirySeconds = msckInfo.getPartitionExpirySeconds(); + long partitionExpirySeconds = -1; try { Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName()); + partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(table); qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), partitionExpirySeconds); // checkMetastore call will fill in result with partitions that are present in filesystem @@ -502,9 +504,9 @@ public class Msck { }.run(); } - public static String makePartExpr(Map<String, String> spec) + private static String makePartExpr(Map<String, String> spec) throws MetaException { - StringBuilder suffixBuf = new StringBuilder(); + StringBuilder suffixBuf = new StringBuilder("("); int i = 0; for (Map.Entry<String, String> e : spec.entrySet()) { if (e.getValue() == null || e.getValue().length() == 0) { @@ -518,6 +520,7 @@ public class Msck { suffixBuf.append("'").append(Warehouse.escapePathName(e.getValue())).append("'"); i++; } + suffixBuf.append(")"); return suffixBuf.toString(); } @@ -536,7 +539,8 @@ public class Msck { if (expiredPartitions != null && !expiredPartitions.isEmpty()) { batchWork.addAll(expiredPartitions); } - PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData).ifExists(true); + PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData) + .ifExists(true).returnResults(false); new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) { @Override public Void execute(int size) throws MetastoreException { @@ -589,7 +593,7 @@ public class Msck { } private List<Pair<Integer, byte[]>> getPartitionExpr(final List<String> parts) throws MetaException { - List<Pair<Integer, byte[]>> expr = new ArrayList<>(parts.size()); + StringBuilder exprBuilder = new StringBuilder(); for (int i = 0; i < parts.size(); i++) { String partName = parts.get(i); Map<String, String> partSpec = Warehouse.makeSpecFromName(partName); @@ -597,9 +601,13 @@ public class Msck { if (LOG.isDebugEnabled()) { LOG.debug("Generated partExpr: {} for partName: {}", partExpr, partName); } - expr.add(Pair.of(i, partExpr.getBytes(StandardCharsets.UTF_8))); + if (i > 0) { + exprBuilder.append(" OR "); + } + exprBuilder.append(partExpr); } - return expr; + return Lists.newArrayList(Pair.of(parts.size(), + exprBuilder.toString().getBytes(StandardCharsets.UTF_8))); } }.run(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java index fcc0d8a2648..094a80cac1c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java @@ -60,31 +60,30 @@ public class MsckPartitionExpressionProxy implements PartitionExpressionProxy { } //This is to find in partitionNames all that match expr //reverse of the Msck.makePartExpr - Set<String> partValueSet = new HashSet<>(); - String[] parts = partExpr.split(" AND "); - for ( String part : parts){ - String[] colAndValue = part.split("="); - String key = FileUtils.unescapePathName(colAndValue[0]); - //take the value inside without the single quote marks '2018-10-30' becomes 2018-10-31 - String value = FileUtils.unescapePathName(colAndValue[1].substring(1, colAndValue[1].length()-1)); - partValueSet.add(key+"="+value); + Set<String> filterParts = new HashSet<>(); + String[] partitions = partExpr.split(" OR "); + for (String part : partitions) { + part = part.substring(1, part.length() - 1); + String[] pKeyValues = part.split(" AND "); + StringBuilder builder = new StringBuilder(); + for (String pKeyValue : pKeyValues) { + String[] colAndValue = pKeyValue.split("="); + String key = FileUtils.unescapePathName(colAndValue[0]); + //take the value inside without the single quote marks '2018-10-30' becomes 2018-10-31 + String value = FileUtils.unescapePathName(colAndValue[1].substring(1, colAndValue[1].length() - 1)); + builder.append(key + "=" + value).append("/"); + } + builder.setLength(builder.length() - 1); + filterParts.add(builder.toString()); } List<String> partNamesSeq = new ArrayList<>(); - for (String partition : partitionNames){ - boolean isMatch = true; + for (String part : partitionNames) { //list of partitions [year=2001/month=1, year=2002/month=2, year=2001/month=3] //Given expr: e.g. year='2001' AND month='1'. Only when all the expressions in the expr can be found, //do we add the partition to the filtered result [year=2001/month=1] - String [] partnames = partition.split("/"); - for (String part: partnames) { - if (!partValueSet.contains(FileUtils.unescapePathName(part))){ - isMatch = false; - break; - } - } - if (isMatch){ - partNamesSeq.add(partition); + if (filterParts.contains(FileUtils.unescapePathName(part))) { + partNamesSeq.add(part); } } partitionNames.clear(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java index 127313eb256..30a7ffcc3f6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java @@ -24,9 +24,13 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec; +import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest; import org.apache.hadoop.hive.metastore.api.MetastoreException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionFilterMode; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.convertToGetPartitionsByNamesRequest; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatalogToDbName; @@ -103,11 +107,18 @@ public class PartitionIterable implements Iterable<Partition> { batch_counter++; } try { - String dbName = prependCatalogToDbName(table.getCatName(), table.getDbName(), null); - GetPartitionsByNamesRequest req = - convertToGetPartitionsByNamesRequest(dbName, table.getTableName(), nameBatch); - batchIter = - msc.getPartitionsByNames(req).getPartitionsIterator(); + if (request != null) { + GetPartitionsFilterSpec getPartitionsFilterSpec = new GetPartitionsFilterSpec(); + getPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_NAMES); + getPartitionsFilterSpec.setFilters(nameBatch); + request.setFilterSpec(getPartitionsFilterSpec); + batchIter = MetaStoreServerUtils.getPartitionsByProjectSpec(msc, request).iterator(); + } else { + String dbName = prependCatalogToDbName(table.getCatName(), table.getDbName(), null); + GetPartitionsByNamesRequest req = convertToGetPartitionsByNamesRequest(dbName, table.getTableName(), + nameBatch); + batchIter = msc.getPartitionsByNames(req).getPartitionsIterator(); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -135,6 +146,7 @@ public class PartitionIterable implements Iterable<Partition> { private IMetaStoreClient msc = null; // Assumes one instance of this + single-threaded compilation for each query. private Table table = null; private List<String> partitionNames = null; + private GetPartitionsRequest request = null; private int batch_size; /** @@ -167,4 +179,9 @@ public class PartitionIterable implements Iterable<Partition> { throw new MetastoreException(e); } } + + public PartitionIterable withProjectSpec(GetPartitionsRequest request) { + this.request = request; + return this; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index a9ef163a9b0..ef0c4dcac47 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,9 +31,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.TimeValidator; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -97,77 +96,84 @@ public class PartitionManagementTask implements MetastoreTaskThread { IMetaStoreClient msc = null; try { msc = new HiveMetaStoreClient(conf); - List<Table> candidateTables = new ArrayList<>(); String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME); String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN); String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN); String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES); Set<String> tableTypesSet = new HashSet<>(); - List<String> tableTypesList; + for (String type : tableTypes.split(",")) { + try { + tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name()); + } catch (IllegalArgumentException e) { + // ignore + LOG.warn("Unknown table type: {}", type); + } + } // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables. - if (tableTypes.isEmpty()) { - tableTypesList = Lists.newArrayList(""); - } else { - for (String type : tableTypes.split(",")) { - try { - tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name()); - } catch (IllegalArgumentException e) { - // ignore - LOG.warn("Unknown table type: {}", type); - } - } - tableTypesList = Lists.newArrayList(tableTypesSet); + if (tableTypesSet.isEmpty()) { + LOG.info("Skipping partition management as no table types specified"); + return; } - List<TableMeta> foundTableMetas = msc.getTableMeta(catalogName, dbPattern, tablePattern, tableTypesList); - LOG.info("Looking for tables using catalog: {} dbPattern: {} tablePattern: {} found: {}", catalogName, - dbPattern, tablePattern, foundTableMetas.size()); - Map<String, Boolean> databasesToSkip = new HashMap<>(); + StringBuilder filterBuilder = new StringBuilder() + .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS) + .append("discover__partitions").append(" like \"true\" "); + boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name()); + boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name()); + if (!managed && external) { + // only for external tables + filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE) + .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" "); + } else if (managed && !external) { + // only for managed tables + filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE) + .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" "); + } + if (!tablePattern.trim().isEmpty()) { + filterBuilder.append(" and ") + .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME) + .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\""); + } - for (TableMeta tableMeta : foundTableMetas) { - try { - String dbName = MetaStoreUtils.prependCatalogToDbName(tableMeta.getCatName(), tableMeta.getDbName(), conf); - if (!databasesToSkip.containsKey(dbName)) { - databasesToSkip.put(dbName, MetaStoreUtils.checkIfDbNeedsToBeSkipped( - msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName()))); - } - if (databasesToSkip.get(dbName)) { - LOG.debug("Skipping table : {}", tableMeta.getTableName()); - continue; - } - Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName()); - if (partitionDiscoveryEnabled(table.getParameters())) { - candidateTables.add(table); - } - } catch (NoSuchObjectException e) { - // Ignore dropped tables after fetching TableMeta. - LOG.warn(e.getMessage()); + List<String> databases = msc.getDatabases(catalogName, dbPattern); + List<TableName> candidates = new ArrayList<>(); + for (String db : databases) { + Database database = msc.getDatabase(catalogName, db); + if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) { + LOG.debug("Skipping table under database: {}", db); + continue; + } + if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) { + LOG.info("Skipping table belongs to database {} being failed over.", db); + continue; } + List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db, + filterBuilder.toString(), -1); + tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db))); } - if (candidateTables.isEmpty()) { + + if (candidates.isEmpty()) { + LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern); return; } + // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also // defeats the purpose of thread pooled msck repair. int threadPoolSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE); final ExecutorService executorService = Executors - .newFixedThreadPool(Math.min(candidateTables.size(), threadPoolSize), + .newFixedThreadPool(Math.min(candidates.size(), threadPoolSize), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build()); - CountDownLatch countDownLatch = new CountDownLatch(candidateTables.size()); - LOG.info("Found {} candidate tables for partition discovery", candidateTables.size()); + CountDownLatch countDownLatch = new CountDownLatch(candidates.size()); + LOG.info("Found {} candidate tables for partition discovery", candidates.size()); setupMsckPathInvalidation(); Configuration msckConf = Msck.getMsckConf(conf); - for (Table table : candidateTables) { - qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); - long retentionSeconds = getRetentionPeriodInSeconds(table); - LOG.info("Running partition discovery for table {} retentionPeriod: {}s", qualifiedTableName, - retentionSeconds); + for (TableName table : candidates) { // this always runs in 'sync' mode where partitions can be added and dropped - MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(), - null, null, true, true, true, retentionSeconds); + MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(), + null, null, true, true, true, -1); executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch)); } countDownLatch.await(); @@ -231,13 +237,7 @@ public class PartitionManagementTask implements MetastoreTaskThread { @Override public void run() { - IMetaStoreClient msc = null; try { - msc = new HiveMetaStoreClient(conf); - if (MetaStoreUtils.isDbBeingPlannedFailedOver((msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName())))) { - LOG.info("Skipping table: {} as it belongs to database being failed over." + msckInfo.getTableName()); - return; - } Msck msck = new Msck( true, true); msck.init(conf); msck.repair(msckInfo); @@ -246,9 +246,6 @@ public class PartitionManagementTask implements MetastoreTaskThread { } finally { // there is no recovery from exception, so we always count down and retry in next attempt countDownLatch.countDown(); - if (msc != null) { - msc.close(); - } } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java index 1511f63cf13..6bf898b09af 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.client.builder; import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import java.util.ArrayList; import java.util.List; /** @@ -27,19 +28,16 @@ import java.util.List; */ public class GetPartitionProjectionsSpecBuilder { - private List<String> partitionList = null; + private List<String> fieldList = new ArrayList<>(); private String includePartitionPattern = null; private String excludePartitionPattern = null; - public GetPartitionProjectionsSpecBuilder(List<String> partitionList, String includePartitionPattern, - String excludePartitionPattern) { - this.partitionList = partitionList; - this.includePartitionPattern = includePartitionPattern; - this.excludePartitionPattern = excludePartitionPattern; + public GetPartitionProjectionsSpecBuilder() { + } - public GetPartitionProjectionsSpecBuilder setPartitionList(List<String> partitionList) { - this.partitionList = partitionList; + public GetPartitionProjectionsSpecBuilder addProjectField(String field) { + fieldList.add(field); return this; } @@ -54,6 +52,6 @@ public class GetPartitionProjectionsSpecBuilder { } public GetProjectionsSpec build() { - return new GetProjectionsSpec(partitionList, includePartitionPattern, excludePartitionPattern); + return new GetProjectionsSpec(fieldList, includePartitionPattern, excludePartitionPattern); } } \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java index b3cd8c33212..46098d39fa6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java @@ -284,7 +284,11 @@ public class ExpressionTree { private void generateJDOFilterOverTables(Map<String, Object> params, FilterBuilder filterBuilder) throws MetaException { - if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_OWNER)) { + if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)) { + keyName = "this.tableName"; + } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)) { + keyName = "this.tableType"; + } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_OWNER)) { keyName = "this.owner"; } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_LAST_ACCESS)) { //lastAccessTime expects an integer, so we cannot use the "like operator" @@ -304,6 +308,12 @@ public class ExpressionTree { //value is persisted as a string in the db, so make sure it's a string here // in case we get a long. value = value.toString(); + // dot in parameter is not supported when parsing the tree. + if ("discover__partitions".equals(paramKeyName)) { + paramKeyName = "discover.partitions"; + keyName = "this.parameters.get(\"" + paramKeyName + "\").toUpperCase()"; + value = value.toString().toUpperCase(); + } } else { filterBuilder.setError("Invalid key name in filter. " + "Use constants from org.apache.hadoop.hive.metastore.api"); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java index 5c417893590..d4bb8d47024 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java @@ -75,6 +75,8 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Decimal; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetastoreException; @@ -1397,6 +1399,43 @@ public class MetaStoreServerUtils { } } + public static List<Partition> getPartitionsByProjectSpec(IMetaStoreClient msc, GetPartitionsRequest request) + throws MetastoreException { + try { + GetPartitionsResponse response = msc.getPartitionsWithSpecs(request); + List<PartitionSpec> partitionSpecList = response.getPartitionSpec(); + List<Partition> result = new ArrayList<>(); + for (PartitionSpec spec : partitionSpecList) { + if (spec.getPartitionList() != null && spec.getPartitionList().getPartitions() != null) { + spec.getPartitionList().getPartitions().forEach(partition -> { + partition.setCatName(spec.getCatName()); + partition.setDbName(spec.getDbName()); + partition.setTableName(spec.getTableName()); + result.add(partition); + }); + } + PartitionSpecWithSharedSD pSpecWithSharedSD = spec.getSharedSDPartitionSpec(); + if (pSpecWithSharedSD == null) { + continue; + } + List<PartitionWithoutSD> withoutSDList = pSpecWithSharedSD.getPartitions(); + StorageDescriptor descriptor = pSpecWithSharedSD.getSd(); + if (withoutSDList != null) { + for (PartitionWithoutSD psd : withoutSDList) { + StorageDescriptor newSD = new StorageDescriptor(descriptor); + Partition partition = new Partition(psd.getValues(), spec.getDbName(), spec.getTableName(), + psd.getCreateTime(), psd.getLastAccessTime(), newSD, psd.getParameters()); + partition.getSd().setLocation(newSD.getLocation() + psd.getRelativePath()); + result.add(partition); + } + } + } + return result; + } catch (Exception e) { + throw new MetastoreException(e); + } + } + public static void getPartitionListByFilterExp(IMetaStoreClient msc, Table table, byte[] filterExp, String defaultPartName, List<Partition> results) throws MetastoreException { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java index f1f063490ce..2b1f65a6724 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.thrift.TException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -302,6 +303,21 @@ public class TestPartitionManagement { runPartitionManagementTask(conf); partitions = client.listPartitions(dbName, tableName, (short) -1); assertEquals(3, partitions.size()); + + // only MANAGED table type + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), TableType.MANAGED_TABLE.name()); + table.getParameters().remove("EXTERNAL"); + table.setTableType(TableType.MANAGED_TABLE.name()); + client.alter_table(dbName, tableName, table); + Assert.assertTrue(fs.mkdirs(newPart1)); + Assert.assertTrue(fs.mkdirs(newPart2)); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + Assert.assertTrue(fs.delete(newPart1, true)); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); } @Test diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java index e973db7d88e..08661b18950 100644 --- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java +++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java @@ -35,10 +35,12 @@ import java.io.PrintStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Formatter; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -61,6 +63,7 @@ import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkList import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListPartition; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListTables; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkOpenTxns; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkPartitionManagement; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkRenameTable; import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkTableCreate; import static org.apache.hadoop.hive.metastore.tools.Util.getServerUri; @@ -83,7 +86,8 @@ public class BenchmarkTool implements Runnable { private enum RunModes { ACID, NONACID, - ALL + ALL, + MSCK // test PartitionManagementTask } @@ -142,7 +146,7 @@ public class BenchmarkTool implements Runnable { private Pattern[] exclude; @Option(names = {"--runMode"}, - description = "flag for setting the mode for the benchmark, acceptable values are: ACID, NONACID, ALL") + description = "flag for setting the mode for the benchmark, acceptable values are: ACID, NONACID, ALL, MSCK") private RunModes runMode = RunModes.ALL; public static void main(String[] args) { @@ -181,10 +185,12 @@ public class BenchmarkTool implements Runnable { + nThreads); HMSConfig.getInstance().init(host, port, confDir); + preRunMsck(runMode == RunModes.MSCK); switch (runMode) { case ACID: runAcidBenchmarks(); break; + case MSCK: case NONACID: runNonAcidBenchmarks(); break; @@ -196,6 +202,18 @@ public class BenchmarkTool implements Runnable { } } + private void preRunMsck(boolean isMsck) { + if (isMsck) { + matches = new Pattern[]{Pattern.compile("PartitionManagementTask.*")}; + } else { + List<Pattern> excludes = new ArrayList<>(); + Optional.ofNullable(exclude) + .ifPresent(patterns -> Arrays.stream(patterns).forEach(p -> excludes.add(p))); + excludes.add(Pattern.compile("PartitionManagementTask.*")); + exclude = excludes.toArray(new Pattern[0]); + } + } + private void runAcidBenchmarks() { ChainedOptionsBuilder optsBuilder = new OptionsBuilder() @@ -267,7 +285,9 @@ public class BenchmarkTool implements Runnable { .add("dropDatabase", () -> benchmarkDropDatabase(bench, bData, 1)) .add("openTxn", - () -> benchmarkOpenTxns(bench, bData, 1)); + () -> benchmarkOpenTxns(bench, bData, 1)) + .add("PartitionManagementTask", + () -> benchmarkPartitionManagement(bench, bData, 1)); for (int howMany: instances) { suite.add("listTables" + '.' + howMany, @@ -291,7 +311,9 @@ public class BenchmarkTool implements Runnable { .add("dropDatabase" + '.' + howMany, () -> benchmarkDropDatabase(bench, bData, howMany)) .add("openTxns" + '.' + howMany, - () -> benchmarkOpenTxns(bench, bData, howMany)); + () -> benchmarkOpenTxns(bench, bData, howMany)) + .add("PartitionManagementTask" + "." + howMany, + () -> benchmarkPartitionManagement(bench, bData, howMany)); } List<String> toRun = suite.listMatching(matches, exclude); diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java index 7c6f3431b0a..e91ab78fbb3 100644 --- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java +++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hive.metastore.tools; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.PartitionManagementTask; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; @@ -27,13 +31,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitions; import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitionsNoException; +import static org.apache.hadoop.hive.metastore.tools.Util.createSchema; import static org.apache.hadoop.hive.metastore.tools.Util.throwingSupplierWrapper; /** @@ -447,4 +458,85 @@ final class HMSBenchmarks { throwingSupplierWrapper(client::getCurrentNotificationId)); } + static DescriptiveStatistics benchmarkPartitionManagement(@NotNull MicroBenchmark bench, + @NotNull BenchData data, + int tableCount) { + + String dbName = data.dbName + "_" + tableCount, tableNamePrefix = data.tableName; + final HMSClient client = data.getClient(); + final PartitionManagementTask partitionManagementTask = new PartitionManagementTask(); + final List<Path> paths = new ArrayList<>(); + final FileSystem fs; + try { + fs = FileSystem.get(client.getHadoopConf()); + client.getHadoopConf().set("hive.metastore.uris", client.getServerURI().toString()); + client.getHadoopConf().set("metastore.partition.management.database.pattern", dbName); + partitionManagementTask.setConf(client.getHadoopConf()); + + client.createDatabase(dbName); + for (int i = 0; i < tableCount; i++) { + String tableName = tableNamePrefix + "_" + i; + Util.TableBuilder tableBuilder = new Util.TableBuilder(dbName, tableName).withType(TableType.MANAGED_TABLE) + .withColumns(createSchema(Arrays.asList(new String[] {"astring:string", "aint:int", "adouble:double", "abigint:bigint"}))) + .withPartitionKeys(createSchema(Collections.singletonList("d"))); + boolean enableDynamicPart = i % 5 == 0; + if (enableDynamicPart) { + tableBuilder.withParameter("discover.partitions", "true"); + } + client.createTable(tableBuilder.build()); + addManyPartitionsNoException(client, dbName, tableName, null, Collections.singletonList("d"), 500); + if (enableDynamicPart) { + Table t = client.getTable(dbName, tableName); + Path tabLoc = new Path(t.getSd().getLocation()); + for (int j = 501; j <= 1000; j++) { + Path path = new Path(tabLoc, "d=d" + j + "_1"); + paths.add(path); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + final AtomicLong id = new AtomicLong(0); + ExecutorService service = Executors.newFixedThreadPool(20); + Runnable preRun = () -> { + int len = paths.size() / 20; + id.getAndIncrement(); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i <= 20; i++) { + int k = i; + futures.add(service.submit((Callable<Void>) () -> { + for (int j = k * len; j < (k + 1) * len && j < paths.size(); j++) { + Path path = paths.get(j); + if (id.get() == 1) { + fs.mkdirs(path); + } else { + String fileName = path.getName().split("_")[0]; + long seq = id.get(); + Path destPath = new Path(path.getParent(), fileName + "_" + seq); + Path sourcePath = new Path(path.getParent(), fileName + "_" + (seq-1)); + fs.rename(sourcePath, destPath); + } + } + return null; + })); + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + service.shutdown(); + throw new RuntimeException(e); + } + } + }; + + try { + return bench.measure(preRun, partitionManagementTask, null); + } finally { + service.shutdown(); + } + } + } diff --git a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java index cd4032be425..7da4ddb68b6 100644 --- a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java +++ b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java @@ -92,6 +92,7 @@ final class HMSClient implements AutoCloseable { private ThriftHiveMetastore.Iface client; private TTransport transport; private URI serverURI; + private Configuration hadoopConf; public URI getServerURI() { return serverURI; @@ -155,7 +156,7 @@ final class HMSClient implements AutoCloseable { LOG.debug("Opening kerberos connection to HMS"); addResource(conf, CORE_SITE); - Configuration hadoopConf = new Configuration(); + this.hadoopConf = new Configuration(); addResource(hadoopConf, HIVE_SITE); addResource(hadoopConf, CORE_SITE); @@ -516,4 +517,7 @@ final class HMSClient implements AutoCloseable { } } + public Configuration getHadoopConf() { + return hadoopConf; + } }