Repository: hive Updated Branches: refs/heads/master 8d5e8a60a -> e84e89d78
HIVE-6980: Drop table by using direct sql (Peter Vary, reviewed by Alexander Kolbasov, Vihang Karajgaonkar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e84e89d7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e84e89d7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e84e89d7 Branch: refs/heads/master Commit: e84e89d78051d7c4ac474a407c85f8200d8537bb Parents: 8d5e8a6 Author: Peter Vary <pv...@cloudera.com> Authored: Fri May 25 10:15:39 2018 +0200 Committer: Peter Vary <pv...@cloudera.com> Committed: Fri May 25 10:15:39 2018 +0200 ---------------------------------------------------------------------- .../hive/metastore/MetaStoreDirectSql.java | 382 +++++++++++++++++-- .../hadoop/hive/metastore/ObjectStore.java | 18 +- 2 files changed, 357 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e84e89d7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 48f77b9..5bb1985 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -38,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.stream.Collectors; import javax.jdo.PersistenceManager; import javax.jdo.Query; @@ -141,7 +142,7 @@ class MetaStoreDirectSql { private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS, SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES, SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, - TAB_COL_STATS, PARTITION_KEY_VALS; + TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS; public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { this.pm = pm; @@ -443,7 +444,6 @@ class MetaStoreDirectSql { /** * Gets partitions by using direct SQL queries. - * Note that batching is not needed for this method - list of names implies the batch size; * @param catName Metastore catalog name. * @param dbName Metastore db name. * @param tblName Metastore table name. @@ -460,8 +460,12 @@ class MetaStoreDirectSql { @Override public List<Partition> run(List<String> input) throws MetaException { String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; - return getPartitionsViaSqlFilterInternal(catName, dbName, tblName, null, filter, input, - Collections.<String>emptyList(), null); + List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, + filter, input, Collections.<String>emptyList(), null); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds); } }); } @@ -477,8 +481,19 @@ class MetaStoreDirectSql { Boolean isViewTable = isViewTable(filter.table); String catName = filter.table.isSetCatName() ? filter.table.getCatName() : DEFAULT_CATALOG_NAME; - return getPartitionsViaSqlFilterInternal(catName, filter.table.getDbName(), - filter.table.getTableName(), isViewTable, filter.filter, filter.params, filter.joins, max); + List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, + filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params, + filter.joins, max); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + return Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() { + @Override + public List<Partition> run(List<Object> input) throws MetaException { + return getPartitionsFromPartitionIds(catName, filter.table.getDbName(), + filter.table.getTableName(), isViewTable, input); + } + }); } public static class SqlFilterForPushdown { @@ -508,8 +523,20 @@ class MetaStoreDirectSql { */ public List<Partition> getPartitions(String catName, String dbName, String tblName, Integer max) throws MetaException { - return getPartitionsViaSqlFilterInternal(catName, dbName, tblName, null, - null, Collections.<String>emptyList(), Collections.<String>emptyList(), max); + List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, + tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + + // Get full objects. For Oracle/etc. do it in batches. + List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() { + @Override + public List<Partition> run(List<Object> input) throws MetaException { + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input); + } + }); + return result; } private static Boolean isViewTable(Table t) { @@ -536,12 +563,11 @@ class MetaStoreDirectSql { } /** - * Get partition objects for the query using direct SQL queries, to avoid bazillion + * Get partition ids for the query using direct SQL queries, to avoid bazillion * queries created by DN retrieving stuff for each object individually. - * @param dbName Metastore db name. - * @param tblName Metastore table name. - * @param isView Whether table is a view. Can be passed as null if not immediately - * known, then this method will get it only if necessary. + * @param catName MetaStore catalog name + * @param dbName MetaStore db name + * @param tblName MetaStore table name * @param sqlFilter SQL filter to use. Better be SQL92-compliant. * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order. * @param joinsForFilter if the filter needs additional join statement, they must be in @@ -549,24 +575,18 @@ class MetaStoreDirectSql { * @param max The maximum number of partitions to return. * @return List of partition objects. */ - private List<Partition> getPartitionsViaSqlFilterInternal( - String catName, String dbName, String tblName, final Boolean isView, String sqlFilter, - List<? extends Object> paramsForFilter, List<String> joinsForFilter,Integer max) + private List<Object> getPartitionIdsViaSqlFilter( + String catName, String dbName, String tblName, String sqlFilter, + List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max) throws MetaException { boolean doTrace = LOG.isDebugEnabled(); - final String dbNameLcase = dbName.toLowerCase(), tblNameLcase = tblName.toLowerCase(); - final String catNameLcase = normalizeSpace(catName); + final String dbNameLcase = dbName.toLowerCase(); + final String tblNameLcase = tblName.toLowerCase(); + final String catNameLcase = normalizeSpace(catName).toLowerCase(); + // We have to be mindful of order during filtering if we are not returning all partitions. String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : ""; - // Get all simple fields for partitions and related objects, which we can map one-on-one. - // We will do this in 2 queries to use different existing indices for each one. - // We do not get table and DB name, assuming they are the same as we are using to filter. - // TODO: We might want to tune the indexes instead. With current ones MySQL performs - // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual - // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by - // just adding a \"PART_ID\" IN (...) filter that doesn't alter the results to it, probably - // causing it to not sort the entire table due to not knowing how selective the filter is. String queryText = "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + "" + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " @@ -596,15 +616,10 @@ class MetaStoreDirectSql { return Collections.emptyList(); // no partitions, bail early. } - // Get full objects. For Oracle/etc. do it in batches. - List<Partition> result = Batchable.runBatched(batchSize, sqlResult, new Batchable<Object, Partition>() { - @Override - public List<Partition> run(List<Object> input) throws MetaException { - return getPartitionsFromPartitionIds(catNameLcase, dbNameLcase, tblNameLcase, isView, - input); - } - }); - + List<Object> result = new ArrayList<Object>(sqlResult.size()); + for (Object fields : sqlResult) { + result.add(extractSqlLong(fields)); + } query.closeAll(); return result; } @@ -613,14 +628,11 @@ class MetaStoreDirectSql { private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName, Boolean isView, List<Object> partIdList) throws MetaException { boolean doTrace = LOG.isDebugEnabled(); + int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma int sbCapacity = partIdList.size() * idStringWidth; - // Prepare StringBuilder for "PART_ID in (...)" to use in future queries. - StringBuilder partSb = new StringBuilder(sbCapacity); - for (Object partitionId : partIdList) { - partSb.append(extractSqlLong(partitionId)).append(","); - } - String partIds = trimCommaList(partSb); + + String partIds = getIdListForIn(partIdList); // Get most of the fields for the IDs provided. // Assume db and table names are the same for all partition, as provided in arguments. @@ -653,7 +665,7 @@ class MetaStoreDirectSql { StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema. tblName = tblName.toLowerCase(); dbName = dbName.toLowerCase(); - catName = catName.toLowerCase(); + catName = normalizeSpace(catName).toLowerCase(); for (Object[] fields : sqlResult) { // Here comes the ugly part... long partitionId = extractSqlLong(fields[0]); @@ -1061,6 +1073,18 @@ class MetaStoreDirectSql { } } + /** + * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries. + * @param objectIds the objectId collection + * @return The concatenated list + * @throws MetaException If the list contains wrong data + */ + private static String getIdListForIn(List<Object> objectIds) throws MetaException { + return objectIds.stream() + .map(i -> i.toString()) + .collect(Collectors.joining(",")); + } + private static String trimCommaList(StringBuilder sb) { if (sb.length() > 0) { sb.setLength(sb.length() - 1); @@ -2416,4 +2440,278 @@ class MetaStoreDirectSql { return ret; } + /** + * Drop partitions by using direct SQL queries. + * @param catName Metastore catalog name. + * @param dbName Metastore db name. + * @param tblName Metastore table name. + * @param partNames Partition names to get. + * @return List of partitions. + */ + public void dropPartitionsViaSqlFilter(final String catName, final String dbName, + final String tblName, List<String> partNames) + throws MetaException { + if (partNames.isEmpty()) { + return; + } + + Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() { + @Override + public List<Void> run(List<String> input) throws MetaException { + String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; + // Get partition ids + List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, + filter, input, Collections.<String>emptyList(), null); + if (partitionIds.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + dropPartitionsByPartitionIds(partitionIds); + return Collections.emptyList(); + } + }); + } + + + /** + * Drops Partition-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param partitionIdList The partition identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropPartitionsByPartitionIds(List<Object> partitionIdList) throws MetaException { + String queryText; + + String partitionIds = getIdListForIn(partitionIdList); + + // Get the corresponding SD_ID-s, CD_ID-s, SERDE_ID-s + queryText = + "SELECT " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\", " + SDS + ".\"SERDE_ID\" " + + "from " + SDS + " " + + "INNER JOIN " + PARTITIONS + " ON " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " + + "WHERE " + PARTITIONS + ".\"PART_ID\" in (" + partitionIds + ")"; + + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List<Object> sdIdList = new ArrayList<>(partitionIdList.size()); + List<Object> columnDescriptorIdList = new ArrayList<>(1); + List<Object> serdeIdList = new ArrayList<>(partitionIdList.size()); + + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + sdIdList.add(extractSqlLong(fields[0])); + Long colId = extractSqlLong(fields[1]); + if (!columnDescriptorIdList.contains(colId)) { + columnDescriptorIdList.add(colId); + } + serdeIdList.add(extractSqlLong(fields[2])); + } + } + query.closeAll(); + + try { + // Drop privileges + queryText = "delete from " + PART_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop column level privileges + queryText = "delete from " + PART_COL_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop partition statistics + queryText = "delete from " + PART_COL_STATS + " where \"PART_ID\" in (" + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the partition params + queryText = "delete from " + PARTITION_PARAMS + " where \"PART_ID\" in (" + + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the partition key vals + queryText = "delete from " + PARTITION_KEY_VALS + " where \"PART_ID\" in (" + + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the partitions + queryText = "delete from " + PARTITIONS + " where \"PART_ID\" in (" + partitionIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping partition", sqlException); + throw new MetaException("Encountered error while dropping partitions."); + } + dropStorageDescriptors(sdIdList); + Deadline.checkTimeout(); + + dropSerdes(serdeIdList); + Deadline.checkTimeout(); + + dropDanglingColumnDescriptors(columnDescriptorIdList); + } + + /** + * Drops SD-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param storageDescriptorIdList The storage descriptor identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropStorageDescriptors(List<Object> storageDescriptorIdList) throws MetaException { + String queryText; + String sdIds = getIdListForIn(storageDescriptorIdList); + + // Get the corresponding SKEWED_STRING_LIST_ID data + queryText = + "select " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" " + + "from " + SKEWED_VALUES + " " + + "WHERE " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ")"; + + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List<Object> skewedStringListIdList = new ArrayList<>(0); + + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + skewedStringListIdList.add(extractSqlLong(fields[0])); + } + } + query.closeAll(); + + String skewedStringListIds = getIdListForIn(skewedStringListIdList); + + try { + // Drop the SD params + queryText = "delete from " + SD_PARAMS + " where \"SD_ID\" in (" + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the sort cols + queryText = "delete from " + SORT_COLS + " where \"SD_ID\" in (" + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the bucketing cols + queryText = "delete from " + BUCKETING_COLS + " where \"SD_ID\" in (" + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the skewed string lists + if (skewedStringListIdList.size() > 0) { + // Drop the skewed string value loc map + queryText = "delete from " + SKEWED_COL_VALUE_LOC_MAP + " where \"SD_ID\" in (" + + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the skewed values + queryText = "delete from " + SKEWED_VALUES + " where \"SD_ID_OID\" in (" + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the skewed string list values + queryText = "delete from " + SKEWED_STRING_LIST_VALUES + " where \"STRING_LIST_ID\" in (" + + skewedStringListIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the skewed string list + queryText = "delete from " + SKEWED_STRING_LIST + " where \"STRING_LIST_ID\" in (" + + skewedStringListIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + } + + // Drop the skewed cols + queryText = "delete from " + SKEWED_COL_NAMES + " where \"SD_ID\" in (" + sdIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the sds + queryText = "delete from " + SDS + " where \"SD_ID\" in (" + sdIds + ")"; + executeNoResult(queryText); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping storage descriptor.", sqlException); + throw new MetaException("Encountered error while dropping storage descriptor."); + } + } + + /** + * Drops Serde-s. Should be called with the list short enough to not trip up Oracle/etc. + * @param serdeIdList The serde identifiers to drop + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropSerdes(List<Object> serdeIdList) throws MetaException { + String queryText; + String serdeIds = getIdListForIn(serdeIdList); + + try { + // Drop the serde params + queryText = "delete from " + SERDE_PARAMS + " where \"SERDE_ID\" in (" + serdeIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the serdes + queryText = "delete from " + SERDES + " where \"SERDE_ID\" in (" + serdeIds + ")"; + executeNoResult(queryText); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping serde.", sqlException); + throw new MetaException("Encountered error while dropping serde."); + } + } + + /** + * Checks if the column descriptors still has references for other SD-s. If not, then removes + * them. Should be called with the list short enough to not trip up Oracle/etc. + * @param columnDescriptorIdList The column identifiers + * @throws MetaException If there is an SQL exception during the execution it converted to + * MetaException + */ + private void dropDanglingColumnDescriptors(List<Object> columnDescriptorIdList) + throws MetaException { + String queryText; + String colIds = getIdListForIn(columnDescriptorIdList); + + // Drop column descriptor, if no relation left + queryText = + "SELECT " + SDS + ".\"CD_ID\", count(1) " + + "from " + SDS + " " + + "WHERE " + SDS + ".\"CD_ID\" in (" + colIds + ") " + + "GROUP BY " + SDS + ".\"CD_ID\""; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText)); + + List<Object> danglingColumnDescriptorIdList = new ArrayList<>(columnDescriptorIdList.size()); + if (!sqlResult.isEmpty()) { + for (Object[] fields : sqlResult) { + if (extractSqlInt(fields[1]) == 0) { + danglingColumnDescriptorIdList.add(extractSqlLong(fields[0])); + } + } + } + query.closeAll(); + + if (!danglingColumnDescriptorIdList.isEmpty()) { + try { + String danglingCDIds = getIdListForIn(danglingColumnDescriptorIdList); + + // Drop the columns_v2 + queryText = "delete from " + COLUMNS_V2 + " where \"CD_ID\" in (" + danglingCDIds + ")"; + executeNoResult(queryText); + Deadline.checkTimeout(); + + // Drop the cols + queryText = "delete from " + CDS + " where \"CD_ID\" in (" + danglingCDIds + ")"; + executeNoResult(queryText); + } catch (SQLException sqlException) { + LOG.warn("SQL error executing query while dropping dangling col descriptions", sqlException); + throw new MetaException("Encountered error while dropping col descriptions"); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e84e89d7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 264fdb9..13ccdb1 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2550,6 +2550,22 @@ public class ObjectStore implements RawStore, Configurable { if (CollectionUtils.isEmpty(partNames)) { return; } + new GetListHelper<Void>(catName, dbName, tblName, true, true) { + @Override + protected List<Void> getSqlResult(GetHelper<List<Void>> ctx) throws MetaException { + directSql.dropPartitionsViaSqlFilter(catName, dbName, tblName, partNames); + return Collections.emptyList(); + } + @Override + protected List<Void> getJdoResult(GetHelper<List<Void>> ctx) throws MetaException { + dropPartitionsViaJdo(catName, dbName, tblName, partNames); + return Collections.emptyList(); + } + }.run(false); + } + + private void dropPartitionsViaJdo(String catName, String dbName, String tblName, + List<String> partNames) throws MetaException { boolean success = false; openTransaction(); try { @@ -2558,7 +2574,7 @@ public class ObjectStore implements RawStore, Configurable { dropPartitionAllColumnGrantsNoTxn(catName, dbName, tblName, partNames); dropPartitionColumnStatisticsNoTxn(catName, dbName, tblName, partNames); - // CDs are reused; go thry partition SDs, detach all CDs from SDs, then remove unused CDs. + // CDs are reused; go try partition SDs, detach all CDs from SDs, then remove unused CDs. for (MColumnDescriptor mcd : detachCdsFromSdsNoTxn(catName, dbName, tblName, partNames)) { removeUnusedColumnDescriptor(mcd); }