This is an automated email from the ASF dual-hosted git repository. ngangam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 7bca1b312d1 HIVE-26035: Implement direct SQL for add partitions to improve performance at HMS (#3905) 7bca1b312d1 is described below commit 7bca1b312d135edac8dd5e8f8ca6a1adfdeb5829 Author: Venu Reddy <35334869+venureddy2...@users.noreply.github.com> AuthorDate: Fri Feb 3 05:32:30 2023 +0530 HIVE-26035: Implement direct SQL for add partitions to improve performance at HMS (#3905) * HIVE-26035: Implement direct SQL for add partitions to improve performance at HMS (Venu Reddy reviewed by Zhihua Deng and Saihemanth Gantasala) --- .../hadoop/hive/metastore/conf/MetastoreConf.java | 5 + .../hadoop/hive/metastore/DatabaseProduct.java | 42 ++ .../hadoop/hive/metastore/DirectSqlInsertPart.java | 827 +++++++++++++++++++++ .../hadoop/hive/metastore/MetaStoreDirectSql.java | 19 + .../apache/hadoop/hive/metastore/ObjectStore.java | 60 +- 5 files changed, 938 insertions(+), 15 deletions(-) diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 6f9932dd3fd..bb65e8d1dad 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -706,6 +706,10 @@ public class MetastoreConf { "Default transaction isolation level for identity generation."), DATANUCLEUS_USE_LEGACY_VALUE_STRATEGY("datanucleus.rdbms.useLegacyNativeValueStrategy", "datanucleus.rdbms.useLegacyNativeValueStrategy", true, ""), + DATANUCLEUS_QUERY_SQL_ALLOWALL("datanucleus.query.sql.allowAll", "datanucleus.query.sql.allowAll", + true, "In strict JDO all SQL queries must begin with \"SELECT ...\", and consequently it " + + "is not possible to execute queries that change data. This DataNucleus property when set to true allows " + + "insert, update and delete operations from JDO SQL. Default value is true."), // Parameters for configuring SSL encryption to the database store // If DBACCESS_USE_SSL is false, then all other DBACCESS_SSL_* properties will be ignored @@ -1924,6 +1928,7 @@ public class MetastoreConf { ConfVars.DATANUCLEUS_PLUGIN_REGISTRY_BUNDLE_CHECK, ConfVars.DATANUCLEUS_TRANSACTION_ISOLATION, ConfVars.DATANUCLEUS_USE_LEGACY_VALUE_STRATEGY, + ConfVars.DATANUCLEUS_QUERY_SQL_ALLOWALL, ConfVars.DETACH_ALL_ON_COMMIT, ConfVars.IDENTIFIER_FACTORY, ConfVars.MANAGER_FACTORY_CLASS, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java index 301949c40f8..3f3d361b9a0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java @@ -691,6 +691,48 @@ public class DatabaseProduct implements Configurable { return map; } + /** + * Gets the multiple row insert query for the given table with specified columns and row format + * @param tableName table name to be used in query + * @param columns comma separated column names string + * @param rowFormat values format string used in the insert query. Format is like (?,?...?) and the number of + * question marks in the format is equal to number of column names in the columns argument + * @param batchCount number of rows in the query + * @return database specific multiple row insert query + */ + public String getBatchInsertQuery(String tableName, String columns, String rowFormat, int batchCount) { + StringBuilder sb = new StringBuilder(); + String fixedPart = tableName + " " + columns + " values "; + String row; + if (isORACLE()) { + sb.append("insert all "); + row = "into " + fixedPart + rowFormat + " "; + } else { + sb.append("insert into " + fixedPart); + row = rowFormat + ','; + } + for (int i = 0; i < batchCount; i++) { + sb.append(row); + } + if (isORACLE()) { + sb.append("select * from dual "); + } + sb.setLength(sb.length() - 1); + return sb.toString(); + } + + /** + * Gets the boolean value specific to database for the given input + * @param val boolean value + * @return database specific value + */ + public Object getBoolean(boolean val) { + if (isDERBY()) { + return val ? "Y" : "N"; + } + return val; + } + // This class implements the Configurable interface for the benefit // of "plugin" instances created via reflection (see invocation of // ReflectionUtils.newInstance in method determineDatabaseProduct) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java new file mode 100644 index 00000000000..182768402f6 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java @@ -0,0 +1,827 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import static org.apache.commons.lang3.StringUtils.repeat; +import static org.apache.hadoop.hive.metastore.Batchable.NO_BATCHING; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.jdo.PersistenceManager; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; +import org.apache.hadoop.hive.metastore.model.MFieldSchema; +import org.apache.hadoop.hive.metastore.model.MOrder; +import org.apache.hadoop.hive.metastore.model.MPartition; +import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; +import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MStringList; +import org.datanucleus.ExecutionContext; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.identity.DatastoreId; +import org.datanucleus.metadata.AbstractClassMetaData; +import org.datanucleus.metadata.IdentityType; + +/** + * This class contains the methods to insert into tables on the underlying database using direct SQL + */ +class DirectSqlInsertPart { + private final PersistenceManager pm; + private final DatabaseProduct dbType; + private final int batchSize; + + public DirectSqlInsertPart(PersistenceManager pm, DatabaseProduct dbType, int batchSize) { + this.pm = pm; + this.dbType = dbType; + this.batchSize = batchSize; + } + + /** + * Interface to execute multiple row insert query in batch for direct SQL + */ + interface BatchExecutionContext { + void execute(String batchQueryText, int batchRowCount) throws MetaException; + } + + private Long getDataStoreId(Class<?> modelClass) throws MetaException { + ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); + AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); + if (cmd.getIdentityType() == IdentityType.DATASTORE) { + return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, -1); + } else { + throw new MetaException("Identity type is not datastore."); + } + } + + private void insertInBatch(String tableName, String columns, int columnCount, int rowCount, + BatchExecutionContext batchExecutionContext) throws MetaException { + if (rowCount == 0 || columnCount == 0) { + return; + } + int maxRowsInBatch = (batchSize == NO_BATCHING) ? rowCount : batchSize; + int maxBatches = rowCount / maxRowsInBatch; + int last = rowCount % maxRowsInBatch; + String rowFormat = "(" + repeat(",?", columnCount).substring(1) + ")"; + String query = ""; + if (maxBatches > 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, maxRowsInBatch); + } + for (int batch = 0; batch < maxBatches; batch++) { + batchExecutionContext.execute(query, maxRowsInBatch); + } + if (last != 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last); + batchExecutionContext.execute(query, last); + } + } + + private void executeQuery(String queryText, Object[] params) throws MetaException { + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + MetastoreDirectSqlUtils.executeWithArray(query.getInnerQuery(), params, queryText); + } + } + + private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = serdeIdToSerDeInfo.size(); + String columns = "(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\"," + + "\"SERIALIZER_CLASS\")"; + int columnCount = 7; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> it = serdeIdToSerDeInfo.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MSerDeInfo> entry = it.next(); + MSerDeInfo serdeInfo = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = serdeInfo.getDescription(); + params[paramIndex++] = serdeInfo.getDeserializerClass(); + params[paramIndex++] = serdeInfo.getName(); + params[paramIndex++] = serdeInfo.getSerdeType(); + params[paramIndex++] = serdeInfo.getSerializationLib(); + params[paramIndex++] = serdeInfo.getSerializerClass(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SERDES\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor, + Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws MetaException { + int rowCount = sdIdToStorageDescriptor.size(); + String columns = "(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\"," + + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")"; + int columnCount = 9; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> it = sdIdToStorageDescriptor.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MStorageDescriptor> entry = it.next(); + MStorageDescriptor sd = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = sdIdToCdId.get(entry.getKey()); + params[paramIndex++] = sd.getInputFormat(); + params[paramIndex++] = dbType.getBoolean(sd.isCompressed()); + params[paramIndex++] = dbType.getBoolean(sd.isStoredAsSubDirectories()); + params[paramIndex++] = sd.getLocation(); + params[paramIndex++] = sd.getNumBuckets(); + params[paramIndex++] = sd.getOutputFormat(); + params[paramIndex++] = sdIdToSerdeId.get(entry.getKey()); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SDS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, Map<Long, Long> partIdToSdId) + throws MetaException { + int rowCount = partIdToPartition.size(); + String columns = "(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\"," + + "\"WRITE_ID\")"; + int columnCount = 7; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartition>> it = partIdToPartition.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartition> entry = it.next(); + MPartition partition = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partition.getCreateTime(); + params[paramIndex++] = partition.getLastAccessTime(); + params[paramIndex++] = partition.getPartitionName(); + params[paramIndex++] = partIdToSdId.get(entry.getKey()); + params[paramIndex++] = partition.getTable().getId(); + params[paramIndex++] = partition.getWriteId(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PARTITIONS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = 0; + for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) { + rowCount += serDeInfo.getParameters() != null ? serDeInfo.getParameters().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = serdeIdToSerDeInfo.entrySet().iterator(); + Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next(); + Iterator<Map.Entry<String, String>> it = serdeEntry.getValue().getParameters() != null ? + serdeEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = serdeEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + serdeEntry = serdeIt.next(); // serdeIt.next() cannot be null since it is within the row count + it = serdeEntry.getValue().getParameters() != null ? + serdeEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SERDE_PARAMS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertStorageDescriptorParamInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) + throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getParameters() != null ? sd.getParameters().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<Map.Entry<String, String>> it = sdEntry.getValue().getParameters() != null ? + sdEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + sdEntry = sdIt.next(); // sdIt.next() cannot be null since it is within the row count + it = sdEntry.getValue().getParameters() != null ? + sdEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SD_PARAMS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionParamInBatch(Map<Long, MPartition> partIdToPartition) throws MetaException { + int rowCount = 0; + for (MPartition part : partIdToPartition.values()) { + rowCount += part.getParameters() != null ? part.getParameters().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"PART_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartition>> partIt = partIdToPartition.entrySet().iterator(); + Map.Entry<Long, MPartition> partEntry = partIt.next(); + Iterator<Map.Entry<String, String>> it = partEntry.getValue().getParameters() != null ? + partEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = partEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + partEntry = partIt.next(); // partIt.next() cannot be null since it is within the row count + it = partEntry.getValue().getParameters() != null ? + partEntry.getValue().getParameters().entrySet().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PARTITION_PARAMS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionKeyValInBatch(Map<Long, MPartition> partIdToPartition) throws MetaException { + int rowCount = 0; + for (MPartition part : partIdToPartition.values()) { + rowCount += part.getValues().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"PART_ID\",\"PART_KEY_VAL\",\"INTEGER_IDX\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MPartition>> partIt = partIdToPartition.entrySet().iterator(); + Map.Entry<Long, MPartition> partEntry = partIt.next(); + Iterator<String> it = partEntry.getValue().getValues().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = partEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + partEntry = partIt.next(); // partIt.next() cannot be null since it is within the row count + it = partEntry.getValue().getValues().iterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PARTITION_KEY_VALS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertColumnDescriptorInBatch(Map<Long, MColumnDescriptor> cdIdToColumnDescriptor) throws MetaException { + int rowCount = cdIdToColumnDescriptor.size(); + String columns = "(\"CD_ID\")"; + int columnCount = 1; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Long> it = cdIdToColumnDescriptor.keySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + params[paramIndex++] = it.next(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"CDS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertColumnV2InBatch(Map<Long, MColumnDescriptor> cdIdToColumnDescriptor) throws MetaException { + int rowCount = 0; + for (MColumnDescriptor cd : cdIdToColumnDescriptor.values()) { + rowCount += cd.getCols().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"CD_ID\",\"COMMENT\",\"COLUMN_NAME\",\"TYPE_NAME\",\"INTEGER_IDX\")"; + int columnCount = 5; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MColumnDescriptor>> cdIt = cdIdToColumnDescriptor.entrySet().iterator(); + Map.Entry<Long, MColumnDescriptor> cdEntry = cdIt.next(); + Iterator<MFieldSchema> it = cdEntry.getValue().getCols().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + MFieldSchema fieldSchema = it.next(); + params[paramIndex++] = cdEntry.getKey(); + params[paramIndex++] = fieldSchema.getComment(); + params[paramIndex++] = fieldSchema.getName(); + params[paramIndex++] = fieldSchema.getType(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + cdEntry = cdIt.next(); // cdIt.next() cannot be null since it is within the row count + it = cdEntry.getValue().getCols().iterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"COLUMNS_V2\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertBucketColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getBucketCols() != null ? sd.getBucketCols().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"BUCKET_COL_NAME\",\"INTEGER_IDX\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<String> it = sdEntry.getValue().getBucketCols() != null ? + sdEntry.getValue().getBucketCols().iterator() : Collections.emptyIterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); // sdIt.next() cannot be null since it is within the row count + it = sdEntry.getValue().getBucketCols() != null ? + sdEntry.getValue().getBucketCols().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"BUCKETING_COLS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSortColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getSortCols() != null ? sd.getSortCols().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"COLUMN_NAME\",\"ORDER\",\"INTEGER_IDX\")"; + int columnCount = 4; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<MOrder> it = sdEntry.getValue().getSortCols() != null ? + sdEntry.getValue().getSortCols().iterator() : Collections.emptyIterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + MOrder order = it.next(); + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = order.getCol(); + params[paramIndex++] = order.getOrder(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); // sdIt.next() cannot be null since it is within the row count + it = sdEntry.getValue().getSortCols() != null ? + sdEntry.getValue().getSortCols().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SORT_COLS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSkewedStringListInBatch(List<Long> stringListIds) throws MetaException { + int rowCount = stringListIds.size(); + String columns = "(\"STRING_LIST_ID\")"; + int columnCount = 1; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Long> it = stringListIds.iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + params[paramIndex++] = it.next(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SKEWED_STRING_LIST\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSkewedStringListValInBatch(Map<Long, List<String>> stringListIdToStringList) throws MetaException { + int rowCount = 0; + for (List<String> stringList : stringListIdToStringList.values()) { + rowCount += stringList.size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"STRING_LIST_ID\",\"STRING_LIST_VALUE\",\"INTEGER_IDX\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, List<String>>> stringListIt = stringListIdToStringList.entrySet().iterator(); + Map.Entry<Long, List<String>> stringListEntry = stringListIt.next(); + Iterator<String> it = stringListEntry.getValue().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = stringListEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + stringListEntry = stringListIt.next(); // stringListIt.next() cannot be null since it is within row count + it = stringListEntry.getValue().iterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SKEWED_STRING_LIST_VALUES\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSkewedColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getSkewedColNames() != null ? sd.getSkewedColNames().size() : 0; + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"SKEWED_COL_NAME\",\"INTEGER_IDX\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<String> it = sdEntry.getValue().getSkewedColNames() != null ? + sdEntry.getValue().getSkewedColNames().iterator() : Collections.emptyIterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); // sdIt.next() cannot be null since it is within the row count + it = sdEntry.getValue().getSkewedColNames() != null ? + sdEntry.getValue().getSkewedColNames().iterator() : Collections.emptyIterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SKEWED_COL_NAMES\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSkewedValInBatch(List<Long> stringListIds, Map<Long, Long> stringListIdToSdId) + throws MetaException { + int rowCount = stringListIds.size(); + String columns = "(\"SD_ID_OID\",\"STRING_LIST_ID_EID\",\"INTEGER_IDX\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + int colIndex = 0; + long prevSdId = -1; + final Iterator<Long> it = stringListIds.iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Long stringListId = it.next(); + Long sdId = stringListIdToSdId.get(stringListId); + params[paramIndex++] = sdId; + params[paramIndex++] = stringListId; + if (prevSdId != sdId) { + colIndex = 0; + } + params[paramIndex++] = colIndex++; + prevSdId = sdId; + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SKEWED_VALUES\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSkewedLocationInBatch(Map<Long, String> stringListIdToLocation, Map<Long, Long> stringListIdToSdId) + throws MetaException { + int rowCount = stringListIdToLocation.size(); + String columns = "(\"SD_ID\",\"STRING_LIST_ID_KID\",\"LOCATION\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, String>> it = stringListIdToLocation.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, String> entry = it.next(); + params[paramIndex++] = stringListIdToSdId.get(entry.getKey()); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SKEWED_COL_VALUE_LOC_MAP\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionPrivilegeInBatch(Map<Long, MPartitionPrivilege> partGrantIdToPrivilege, + Map<Long, Long> partGrantIdToPartId) throws MetaException { + int rowCount = partGrantIdToPrivilege.size(); + String columns = "(\"PART_GRANT_ID\",\"AUTHORIZER\",\"CREATE_TIME\",\"GRANT_OPTION\",\"GRANTOR\",\"GRANTOR_TYPE\"," + + "\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_PRIV\")"; + int columnCount = 10; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartitionPrivilege>> it = partGrantIdToPrivilege.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartitionPrivilege> entry = it.next(); + MPartitionPrivilege partPrivilege = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partPrivilege.getAuthorizer(); + params[paramIndex++] = partPrivilege.getCreateTime(); + params[paramIndex++] = partPrivilege.getGrantOption() ? 1 : 0; + params[paramIndex++] = partPrivilege.getGrantor(); + params[paramIndex++] = partPrivilege.getGrantorType(); + params[paramIndex++] = partGrantIdToPartId.get(entry.getKey()); + params[paramIndex++] = partPrivilege.getPrincipalName(); + params[paramIndex++] = partPrivilege.getPrincipalType(); + params[paramIndex++] = partPrivilege.getPrivilege(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PART_PRIVS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionColPrivilegeInBatch(Map<Long, MPartitionColumnPrivilege> partColumnGrantIdToPrivilege, + Map<Long, Long> partColumnGrantIdToPartId) throws MetaException { + int rowCount = partColumnGrantIdToPrivilege.size(); + String columns = "(\"PART_COLUMN_GRANT_ID\",\"AUTHORIZER\",\"COLUMN_NAME\",\"CREATE_TIME\",\"GRANT_OPTION\"," + + "\"GRANTOR\",\"GRANTOR_TYPE\",\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_COL_PRIV\")"; + int columnCount = 11; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartitionColumnPrivilege>> it + = partColumnGrantIdToPrivilege.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount) throws MetaException { + Object[] params = new Object[batchRowCount * columnCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartitionColumnPrivilege> entry = it.next(); + MPartitionColumnPrivilege partColumnPrivilege = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partColumnPrivilege.getAuthorizer(); + params[paramIndex++] = partColumnPrivilege.getColumnName(); + params[paramIndex++] = partColumnPrivilege.getCreateTime(); + params[paramIndex++] = partColumnPrivilege.getGrantOption() ? 1 : 0; + params[paramIndex++] = partColumnPrivilege.getGrantor(); + params[paramIndex++] = partColumnPrivilege.getGrantorType(); + params[paramIndex++] = partColumnGrantIdToPartId.get(entry.getKey()); + params[paramIndex++] = partColumnPrivilege.getPrincipalName(); + params[paramIndex++] = partColumnPrivilege.getPrincipalType(); + params[paramIndex++] = partColumnPrivilege.getPrivilege(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PART_COL_PRIVS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + /** + * Add partitions in batch using direct SQL + * @param parts list of partitions + * @param partPrivilegesList list of partition privileges + * @param partColPrivilegesList list of partition column privileges + * @throws MetaException + */ + public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege>> partPrivilegesList, + List<List<MPartitionColumnPrivilege>> partColPrivilegesList) throws MetaException { + Map<Long, MSerDeInfo> serdeIdToSerDeInfo = new HashMap<>(); + Map<Long, MColumnDescriptor> cdIdToColumnDescriptor = new HashMap<>(); + Map<Long, MStorageDescriptor> sdIdToStorageDescriptor = new HashMap<>(); + Map<Long, MPartition> partIdToPartition = new HashMap<>(); + Map<Long, MPartitionPrivilege> partGrantIdToPrivilege = new HashMap<>(); + Map<Long, MPartitionColumnPrivilege> partColumnGrantIdToPrivilege = new HashMap<>(); + Map<Long, Long> sdIdToSerdeId = new HashMap<>(); + Map<Long, Long> sdIdToCdId = new HashMap<>(); + Map<Long, Long> partIdToSdId = new HashMap<>(); + Map<Long, List<String>> stringListIdToStringList = new HashMap<>(); + Map<Long, Long> stringListIdToSdId = new HashMap<>(); + Map<Long, String> stringListIdToLocation = new HashMap<>(); + Map<Long, Long> partGrantIdToPartId = new HashMap<>(); + Map<Long, Long> partColumnGrantIdToPartId = new HashMap<>(); + List<Long> stringListIds = new ArrayList<>(); + int partitionsCount = parts.size(); + for (int index = 0; index < partitionsCount; index++) { + MPartition part = parts.get(index); + MStorageDescriptor sd = part.getSd(); + if (part.getValues() == null || sd == null || sd.getSerDeInfo() == null || sd.getCD() == null + || sd.getCD().getCols() == null) { + throw new MetaException("Invalid partition"); + } + Long serDeId = getDataStoreId(MSerDeInfo.class); + serdeIdToSerDeInfo.put(serDeId, sd.getSerDeInfo()); + + Long cdId; + DatastoreId storeId = (DatastoreId) pm.getObjectId(sd.getCD()); + if (storeId == null) { + cdId = getDataStoreId(MColumnDescriptor.class); + cdIdToColumnDescriptor.put(cdId, sd.getCD()); + } else { + cdId = (Long) storeId.getKeyAsObject(); + } + + Long sdId = getDataStoreId(MStorageDescriptor.class); + sdIdToStorageDescriptor.put(sdId, sd); + sdIdToSerdeId.put(sdId, serDeId); + sdIdToCdId.put(sdId, cdId); + + Long partId = getDataStoreId(MPartition.class); + partIdToPartition.put(partId, part); + partIdToSdId.put(partId, sdId); + + Map<List<String>, String> stringListToLocation = new HashMap<>(); + if (sd.getSkewedColValueLocationMaps() != null) { + for (Map.Entry<MStringList, String> entry : sd.getSkewedColValueLocationMaps().entrySet()) { + stringListToLocation.put(entry.getKey().getInternalList(), entry.getValue()); + } + } + if (CollectionUtils.isNotEmpty(sd.getSkewedColValues())) { + int skewedValCount = sd.getSkewedColValues().size(); + for (int i = 0; i < skewedValCount; i++) { + Long stringListId = getDataStoreId(MStringList.class); + stringListIds.add(stringListId); + stringListIdToSdId.put(stringListId, sdId); + List<String> stringList = sd.getSkewedColValues().get(i).getInternalList(); + stringListIdToStringList.put(stringListId, stringList); + String location = stringListToLocation.get(stringList); + if (location != null) { + stringListIdToLocation.put(stringListId, location); + } + } + } + + List<MPartitionPrivilege> partPrivileges = partPrivilegesList.get(index); + for (MPartitionPrivilege partPrivilege : partPrivileges) { + Long partGrantId = getDataStoreId(MPartitionPrivilege.class); + partGrantIdToPrivilege.put(partGrantId, partPrivilege); + partGrantIdToPartId.put(partGrantId, partId); + } + List<MPartitionColumnPrivilege> partColPrivileges = partColPrivilegesList.get(index); + for (MPartitionColumnPrivilege partColPrivilege : partColPrivileges) { + Long partColumnGrantId = getDataStoreId(MPartitionColumnPrivilege.class); + partColumnGrantIdToPrivilege.put(partColumnGrantId, partColPrivilege); + partColumnGrantIdToPartId.put(partColumnGrantId, partId); + } + } + insertSerdeInBatch(serdeIdToSerDeInfo); + insertSerdeParamInBatch(serdeIdToSerDeInfo); + insertColumnDescriptorInBatch(cdIdToColumnDescriptor); + insertColumnV2InBatch(cdIdToColumnDescriptor); + insertStorageDescriptorInBatch(sdIdToStorageDescriptor, sdIdToSerdeId, sdIdToCdId); + insertStorageDescriptorParamInBatch(sdIdToStorageDescriptor); + insertBucketColInBatch(sdIdToStorageDescriptor); + insertSortColInBatch(sdIdToStorageDescriptor); + insertSkewedColInBatch(sdIdToStorageDescriptor); + insertSkewedStringListInBatch(stringListIds); + insertSkewedStringListValInBatch(stringListIdToStringList); + insertSkewedValInBatch(stringListIds, stringListIdToSdId); + insertSkewedLocationInBatch(stringListIdToLocation, stringListIdToSdId); + insertPartitionInBatch(partIdToPartition, partIdToSdId); + insertPartitionParamInBatch(partIdToPartition); + insertPartitionKeyValInBatch(partIdToPartition); + insertPartitionPrivilegeInBatch(partGrantIdToPrivilege, partGrantIdToPartId); + insertPartitionColPrivilegeInBatch(partColumnGrantIdToPrivilege, partColumnGrantIdToPartId); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index f09e97837b1..a39da29cbb8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hive.metastore.model.MCreationMetadata; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MNotificationLog; import org.apache.hadoop.hive.metastore.model.MNotificationNextId; +import org.apache.hadoop.hive.metastore.model.MPartition; import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege; @@ -157,6 +158,7 @@ class MetaStoreDirectSql { private final ImmutableMap<String, String> fieldnameToTableName; private AggregateStatsCache aggrStatsCache; private DirectSqlUpdateStat updateStat; + private DirectSqlInsertPart directSqlInsertPart; /** * This method returns a comma separated string consisting of String values of a given list. @@ -198,6 +200,11 @@ class MetaStoreDirectSql { } this.batchSize = batchSize; this.updateStat = new DirectSqlUpdateStat(pm, conf, dbType, batchSize); + + // TODO: Oracle supports to insert more than 1000 rows with a single insert query. Can use NO_BATCHING for oracle db + // too during batch detection(DETECT_BATCHING) for insert queries as future improvement. Currently, used the same + // limit as IN clause/operator limit(i.e., 1000) during batch detection. + this.directSqlInsertPart = new DirectSqlInsertPart(pm, dbType, batchSize); ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder = new ImmutableMap.Builder<>(); @@ -517,6 +524,18 @@ class MetaStoreDirectSql { } } + /** + * Add partitions in batch using direct SQL + * @param parts list of partitions + * @param partPrivilegesList list of partition privileges + * @param partColPrivilegesList list of partition column privileges + * @throws MetaException + */ + public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege>> partPrivilegesList, + List<List<MPartitionColumnPrivilege>> partColPrivilegesList) throws MetaException { + directSqlInsertPart.addPartitions(parts, partPrivilegesList, partColPrivilegesList); + } + /** * Get partition names by using direct SQL queries. * @param filter filter to use with direct sql diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index f9194e86e48..0e8caf785a7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2643,39 +2643,69 @@ public class ObjectStore implements RawStore, Configurable { tabGrants = this.listAllTableGrants(catName, dbName, tblName); tabColumnGrants = this.listTableAllColumnGrants(catName, dbName, tblName); } - List<Object> toPersist = new ArrayList<>(); + List<MPartition> mParts = new ArrayList<>(); + List<List<MPartitionPrivilege>> mPartPrivilegesList = new ArrayList<>(); + List<List<MPartitionColumnPrivilege>> mPartColPrivilegesList = new ArrayList<>(); for (Partition part : parts) { if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " + dbName + "." + tblName + ": " + part); } MPartition mpart = convertToMPart(part, table, true); - - toPersist.add(mpart); + mParts.add(mpart); int now = (int) (System.currentTimeMillis() / 1000); + List<MPartitionPrivilege> mPartPrivileges = new ArrayList<>(); if (tabGrants != null) { for (MTablePrivilege tab: tabGrants) { - toPersist.add(new MPartitionPrivilege(tab.getPrincipalName(), - tab.getPrincipalType(), mpart, tab.getPrivilege(), now, - tab.getGrantor(), tab.getGrantorType(), tab.getGrantOption(), - tab.getAuthorizer())); + MPartitionPrivilege mPartPrivilege = new MPartitionPrivilege(tab.getPrincipalName(), tab.getPrincipalType(), + mpart, tab.getPrivilege(), now, tab.getGrantor(), tab.getGrantorType(), tab.getGrantOption(), + tab.getAuthorizer()); + mPartPrivileges.add(mPartPrivilege); } } + List<MPartitionColumnPrivilege> mPartColumnPrivileges = new ArrayList<>(); if (tabColumnGrants != null) { for (MTableColumnPrivilege col : tabColumnGrants) { - toPersist.add(new MPartitionColumnPrivilege(col.getPrincipalName(), - col.getPrincipalType(), mpart, col.getColumnName(), col.getPrivilege(), - now, col.getGrantor(), col.getGrantorType(), col.getGrantOption(), - col.getAuthorizer())); + MPartitionColumnPrivilege mPartColumnPrivilege = new MPartitionColumnPrivilege(col.getPrincipalName(), + col.getPrincipalType(), mpart, col.getColumnName(), col.getPrivilege(), now, col.getGrantor(), + col.getGrantorType(), col.getGrantOption(), col.getAuthorizer()); + mPartColumnPrivileges.add(mPartColumnPrivilege); } } + mPartPrivilegesList.add(mPartPrivileges); + mPartColPrivilegesList.add(mPartColumnPrivileges); } - if (CollectionUtils.isNotEmpty(toPersist)) { - pm.makePersistentAll(toPersist); - pm.flush(); - } + if (CollectionUtils.isNotEmpty(mParts)) { + GetHelper<Void> helper = new GetHelper<Void>(null, null, null, true, + true) { + @Override + protected Void getSqlResult(GetHelper<Void> ctx) throws MetaException { + directSql.addPartitions(mParts, mPartPrivilegesList, mPartColPrivilegesList); + return null; + } + + @Override + protected Void getJdoResult(GetHelper<Void> ctx) { + List<Object> toPersist = new ArrayList<>(mParts); + mPartPrivilegesList.forEach(toPersist::addAll); + mPartColPrivilegesList.forEach(toPersist::addAll); + pm.makePersistentAll(toPersist); + pm.flush(); + return null; + } + @Override + protected String describeResult() { + return "add partitions"; + } + }; + try { + helper.run(false); + } catch (NoSuchObjectException e) { + throw new MetaException(e.getMessage()); + } + } success = commitTransaction(); } finally { if (!success) {