http://git-wip-us.apache.org/repos/asf/hive/blob/64bef36a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig deleted file mode 100644 index 0a25b77..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig +++ /dev/null @@ -1,2845 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import static org.apache.commons.lang.StringUtils.join; -import static org.apache.commons.lang.StringUtils.normalizeSpace; -import static org.apache.commons.lang.StringUtils.repeat; -import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; - -import java.sql.Blob; -import java.sql.Clob; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.stream.Collectors; - -import javax.jdo.PersistenceManager; -import javax.jdo.Query; -import javax.jdo.Transaction; -import javax.jdo.datastore.JDOConnection; - -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats; -import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; -import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.model.MConstraint; -import org.apache.hadoop.hive.metastore.model.MCreationMetadata; -import org.apache.hadoop.hive.metastore.model.MDatabase; -import org.apache.hadoop.hive.metastore.model.MNotificationLog; -import org.apache.hadoop.hive.metastore.model.MNotificationNextId; -import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; -import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; -import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege; -import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; -import org.apache.hadoop.hive.metastore.model.MWMResourcePlan; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; -import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo; -import org.apache.hive.common.util.BloomFilter; -import org.datanucleus.store.rdbms.query.ForwardQueryResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * This class contains the optimizations for MetaStore that rely on direct SQL access to - * the underlying database. It should use ANSI SQL and be compatible with common databases - * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc. - * - * As of now, only the partition retrieval is done this way to improve job startup time; - * JDOQL partition retrieval is still present so as not to limit the ORM solution we have - * to SQL stores only. There's always a way to do without direct SQL. - */ -class MetaStoreDirectSql { - private static final int NO_BATCHING = -1, DETECT_BATCHING = 0; - - private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class); - private final PersistenceManager pm; - private final Configuration conf; - private final String schema; - - /** - * We want to avoid db-specific code in this class and stick with ANSI SQL. However: - * 1) mysql and postgres are differently ansi-incompatible (mysql by default doesn't support - * quoted identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case). - * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will - * use that. MySQL detection is done by actually issuing the set-ansi-quotes command; - * - * Use sparingly, we don't want to devolve into another DataNucleus... - */ - private final DatabaseProduct dbType; - private final int batchSize; - private final boolean convertMapNullsToEmptyStrings; - private final String defaultPartName; - - /** - * Whether direct SQL can be used with the current datastore backing {@link #pm}. - */ - private final boolean isCompatibleDatastore; - private final boolean isAggregateStatsCacheEnabled; - private AggregateStatsCache aggrStatsCache; - - @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD) - @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME) - private @interface TableName {} - - // Table names with schema name, if necessary - @TableName - private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS, - SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES, - SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, - TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS; - - - public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { - this.pm = pm; - this.conf = conf; - this.schema = schema; - DatabaseProduct dbType = null; - try { - dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm)); - } catch (SQLException e) { - LOG.warn("Cannot determine database product; assuming OTHER", e); - dbType = DatabaseProduct.OTHER; - } - this.dbType = dbType; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE); - if (batchSize == DETECT_BATCHING) { - batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING; - } - this.batchSize = batchSize; - - for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) { - if (f.getAnnotation(TableName.class) == null) continue; - try { - f.set(this, getFullyQualifiedName(schema, f.getName())); - } catch (IllegalArgumentException | IllegalAccessException e) { - throw new RuntimeException("Internal error, cannot set " + f.getName()); - } - } - - convertMapNullsToEmptyStrings = - MetastoreConf.getBoolVar(conf, ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS); - defaultPartName = MetastoreConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); - - String jdoIdFactory = MetastoreConf.getVar(conf, ConfVars.IDENTIFIER_FACTORY); - if (! ("datanucleus1".equalsIgnoreCase(jdoIdFactory))){ - LOG.warn("Underlying metastore does not use 'datanucleus1' for its ORM naming scheme." - + " Disabling directSQL as it uses hand-hardcoded SQL with that assumption."); - isCompatibleDatastore = false; - } else { - boolean isInTest = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); - isCompatibleDatastore = (!isInTest || ensureDbInit()) && runTestQuery(); - if (isCompatibleDatastore) { - LOG.debug("Using direct SQL, underlying DB is " + dbType); - } - } - - isAggregateStatsCacheEnabled = MetastoreConf.getBoolVar( - conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED); - if (isAggregateStatsCacheEnabled) { - aggrStatsCache = AggregateStatsCache.getInstance(conf); - } - } - - private static String getFullyQualifiedName(String schema, String tblName) { - return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + "\".\"") - + "\"" + tblName + "\""; - } - - - public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) { - this(pm, conf, ""); - } - - static String getProductName(PersistenceManager pm) { - JDOConnection jdoConn = pm.getDataStoreConnection(); - try { - return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName(); - } catch (Throwable t) { - LOG.warn("Error retrieving product name", t); - return null; - } finally { - jdoConn.close(); // We must release the connection before we call other pm methods. - } - } - - private boolean ensureDbInit() { - Transaction tx = pm.currentTransaction(); - boolean doCommit = false; - if (!tx.isActive()) { - tx.begin(); - doCommit = true; - } - LinkedList<Query> initQueries = new LinkedList<>(); - - try { - // Force the underlying db to initialize. - initQueries.add(pm.newQuery(MDatabase.class, "name == ''")); - initQueries.add(pm.newQuery(MTableColumnStatistics.class, "dbName == ''")); - initQueries.add(pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''")); - initQueries.add(pm.newQuery(MConstraint.class, "childIntegerIndex < 0")); - initQueries.add(pm.newQuery(MNotificationLog.class, "dbName == ''")); - initQueries.add(pm.newQuery(MNotificationNextId.class, "nextEventId < -1")); - initQueries.add(pm.newQuery(MWMResourcePlan.class, "name == ''")); - initQueries.add(pm.newQuery(MCreationMetadata.class, "dbName == ''")); - initQueries.add(pm.newQuery(MPartitionPrivilege.class, "principalName == ''")); - initQueries.add(pm.newQuery(MPartitionColumnPrivilege.class, "principalName == ''")); - Query q; - while ((q = initQueries.peekFirst()) != null) { - q.execute(); - initQueries.pollFirst(); - } - - return true; - } catch (Exception ex) { - doCommit = false; - LOG.warn("Database initialization failed; direct SQL is disabled", ex); - tx.rollback(); - return false; - } finally { - if (doCommit) { - tx.commit(); - } - for (Query q : initQueries) { - try { - q.closeAll(); - } catch (Throwable t) { - } - } - } - } - - private boolean runTestQuery() { - Transaction tx = pm.currentTransaction(); - boolean doCommit = false; - if (!tx.isActive()) { - tx.begin(); - doCommit = true; - } - Query query = null; - // Run a self-test query. If it doesn't work, we will self-disable. What a PITA... - String selfTestQuery = "select \"DB_ID\" from " + DBS + ""; - try { - prepareTxn(); - query = pm.newQuery("javax.jdo.query.SQL", selfTestQuery); - query.execute(); - return true; - } catch (Throwable t) { - doCommit = false; - LOG.warn("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", t); - tx.rollback(); - return false; - } finally { - if (doCommit) { - tx.commit(); - } - if (query != null) { - query.closeAll(); - } - } - } - - public String getSchema() { - return schema; - } - - public boolean isCompatibleDatastore() { - return isCompatibleDatastore; - } - - private void executeNoResult(final String queryText) throws SQLException { - JDOConnection jdoConn = pm.getDataStoreConnection(); - Statement statement = null; - boolean doTrace = LOG.isDebugEnabled(); - try { - long start = doTrace ? System.nanoTime() : 0; - statement = ((Connection)jdoConn.getNativeConnection()).createStatement(); - statement.execute(queryText); - timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0); - } finally { - if(statement != null){ - statement.close(); - } - jdoConn.close(); // We must release the connection before we call other pm methods. - } - } - - public Database getDatabase(String catName, String dbName) throws MetaException{ - Query queryDbSelector = null; - Query queryDbParams = null; - try { - dbName = dbName.toLowerCase(); - catName = catName.toLowerCase(); - - String queryTextDbSelector= "select " - + "\"DB_ID\", \"NAME\", \"DB_LOCATION_URI\", \"DESC\", " - + "\"OWNER_NAME\", \"OWNER_TYPE\", \"CTLG_NAME\" " - + "FROM "+ DBS - + " where \"NAME\" = ? and \"CTLG_NAME\" = ? "; - Object[] params = new Object[] { dbName, catName }; - queryDbSelector = pm.newQuery("javax.jdo.query.SQL", queryTextDbSelector); - - if (LOG.isTraceEnabled()) { - LOG.trace("getDatabase:query instantiated : " + queryTextDbSelector - + " with param [" + params[0] + "]"); - } - - List<Object[]> sqlResult = executeWithArray( - queryDbSelector, params, queryTextDbSelector); - if ((sqlResult == null) || sqlResult.isEmpty()) { - return null; - } - - assert(sqlResult.size() == 1); - if (sqlResult.get(0) == null) { - return null; - } - - Object[] dbline = sqlResult.get(0); - Long dbid = extractSqlLong(dbline[0]); - - String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" " - + " from " + DATABASE_PARAMS + " " - + " WHERE \"DB_ID\" = ? " - + " AND \"PARAM_KEY\" IS NOT NULL"; - params[0] = dbid; - queryDbParams = pm.newQuery("javax.jdo.query.SQL", queryTextDbParams); - if (LOG.isTraceEnabled()) { - LOG.trace("getDatabase:query2 instantiated : " + queryTextDbParams - + " with param [" + params[0] + "]"); - } - - Map<String,String> dbParams = new HashMap<String,String>(); - List<Object[]> sqlResult2 = ensureList(executeWithArray( - queryDbParams, params, queryTextDbParams)); - if (!sqlResult2.isEmpty()) { - for (Object[] line : sqlResult2) { - dbParams.put(extractSqlString(line[0]), extractSqlString(line[1])); - } - } - Database db = new Database(); - db.setName(extractSqlString(dbline[1])); - db.setLocationUri(extractSqlString(dbline[2])); - db.setDescription(extractSqlString(dbline[3])); - db.setOwnerName(extractSqlString(dbline[4])); - String type = extractSqlString(dbline[5]); - db.setOwnerType( - (null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type)); - db.setCatalogName(extractSqlString(dbline[6])); - db.setParameters(MetaStoreServerUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings)); - if (LOG.isDebugEnabled()){ - LOG.debug("getDatabase: directsql returning db " + db.getName() - + " locn["+db.getLocationUri() +"] desc [" +db.getDescription() - + "] owner [" + db.getOwnerName() + "] ownertype ["+ db.getOwnerType() +"]"); - } - return db; - } finally { - if (queryDbSelector != null){ - queryDbSelector.closeAll(); - } - if (queryDbParams != null){ - queryDbParams.closeAll(); - } - } - } - - /** - * Get table names by using direct SQL queries. - * @param catName catalog name - * @param dbName Metastore database namme - * @param tableType Table type, or null if we want to get all tables - * @return list of table names - */ - public List<String> getTables(String catName, String dbName, TableType tableType) - throws MetaException { - String queryText = "SELECT " + TBLS + ".\"TBL_NAME\"" - + " FROM " + TBLS + " " - + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " - + " WHERE " + DBS + ".\"NAME\" = ? AND " + DBS + ".\"CTLG_NAME\" = ? " - + (tableType == null ? "" : "AND " + TBLS + ".\"TBL_TYPE\" = ? ") ; - - List<String> pms = new ArrayList<>(); - pms.add(dbName); - pms.add(catName); - if (tableType != null) { - pms.add(tableType.toString()); - } - - Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - return executeWithArray( - queryParams, pms.toArray(), queryText); - } - - /** - * Get table names by using direct SQL queries. - * - * @param dbName Metastore database namme - * @return list of table names - */ - public List<String> getMaterializedViewsForRewriting(String dbName) throws MetaException { - String queryText = "SELECT " + TBLS + ".\"TBL_NAME\"" - + " FROM " + TBLS + " " - + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " - + " WHERE " + DBS + ".\"NAME\" = ? AND " + TBLS + ".\"TBL_TYPE\" = ? " ; - - List<String> pms = new ArrayList<String>(); - pms.add(dbName); - pms.add(TableType.MATERIALIZED_VIEW.toString()); - - Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - return executeWithArray( - queryParams, pms.toArray(), queryText); - } - - /** - * Gets partitions by using direct SQL queries. - * @param catName Metastore catalog name. - * @param dbName Metastore db name. - * @param tblName Metastore table name. - * @param partNames Partition names to get. - * @return List of partitions. - */ - public List<Partition> getPartitionsViaSqlFilter(final String catName, final String dbName, - final String tblName, List<String> partNames) - throws MetaException { - if (partNames.isEmpty()) { - return Collections.emptyList(); - } - return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() { - @Override - public List<Partition> run(List<String> input) throws MetaException { - String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; - List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, - filter, input, Collections.<String>emptyList(), null); - if (partitionIds.isEmpty()) { - return Collections.emptyList(); // no partitions, bail early. - } - return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds); - } - }); - } - - /** - * Gets partitions by using direct SQL queries. - * @param filter The filter. - * @param max The maximum number of partitions to return. - * @return List of partitions. - */ - public List<Partition> getPartitionsViaSqlFilter( - SqlFilterForPushdown filter, Integer max) throws MetaException { - Boolean isViewTable = isViewTable(filter.table); - String catName = filter.table.isSetCatName() ? filter.table.getCatName() : - DEFAULT_CATALOG_NAME; - List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, - filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params, - filter.joins, max); - if (partitionIds.isEmpty()) { - return Collections.emptyList(); // no partitions, bail early. - } - return Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() { - @Override - public List<Partition> run(List<Object> input) throws MetaException { - return getPartitionsFromPartitionIds(catName, filter.table.getDbName(), - filter.table.getTableName(), isViewTable, input); - } - }); - } - - public static class SqlFilterForPushdown { - private final List<Object> params = new ArrayList<>(); - private final List<String> joins = new ArrayList<>(); - private String filter; - private Table table; - } - - public boolean generateSqlFilterForPushdown( - Table table, ExpressionTree tree, SqlFilterForPushdown result) throws MetaException { - return generateSqlFilterForPushdown(table, tree, null, result); - } - - public boolean generateSqlFilterForPushdown(Table table, ExpressionTree tree, String defaultPartitionName, - SqlFilterForPushdown result) throws MetaException { - // Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround. - boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType); - result.table = table; - result.filter = PartitionFilterGenerator.generateSqlFilter(table, tree, result.params, - result.joins, dbHasJoinCastBug, ((defaultPartitionName == null) ? defaultPartName : defaultPartitionName), - dbType, schema); - return result.filter != null; - } - - /** - * Gets all partitions of a table by using direct SQL queries. - * @param catName Metastore catalog name. - * @param dbName Metastore db name. - * @param tblName Metastore table name. - * @param max The maximum number of partitions to return. - * @return List of partitions. - */ - public List<Partition> getPartitions(String catName, - String dbName, String tblName, Integer max) throws MetaException { - List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, - tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max); - if (partitionIds.isEmpty()) { - return Collections.emptyList(); // no partitions, bail early. - } - - // Get full objects. For Oracle/etc. do it in batches. - List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() { - @Override - public List<Partition> run(List<Object> input) throws MetaException { - return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input); - } - }); - return result; - } - - private static Boolean isViewTable(Table t) { - return t.isSetTableType() ? - t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null; - } - - private boolean isViewTable(String catName, String dbName, String tblName) throws MetaException { - Query query = null; - try { - String queryText = "select \"TBL_TYPE\" from " + TBLS + "" + - " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " + - " where " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?"; - Object[] params = new Object[] { tblName, dbName, catName }; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - query.setUnique(true); - Object result = executeWithArray(query, params, queryText); - return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString()); - } finally { - if (query != null) { - query.closeAll(); - } - } - } - - /** - * Get partition ids for the query using direct SQL queries, to avoid bazillion - * queries created by DN retrieving stuff for each object individually. - * @param catName MetaStore catalog name - * @param dbName MetaStore db name - * @param tblName MetaStore table name - * @param sqlFilter SQL filter to use. Better be SQL92-compliant. - * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order. - * @param joinsForFilter if the filter needs additional join statement, they must be in - * this list. Better be SQL92-compliant. - * @param max The maximum number of partitions to return. - * @return List of partition objects. - */ - private List<Object> getPartitionIdsViaSqlFilter( - String catName, String dbName, String tblName, String sqlFilter, - List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max) - throws MetaException { - boolean doTrace = LOG.isDebugEnabled(); - final String dbNameLcase = dbName.toLowerCase(); - final String tblNameLcase = tblName.toLowerCase(); - final String catNameLcase = normalizeSpace(catName).toLowerCase(); - - // We have to be mindful of order during filtering if we are not returning all partitions. - String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : ""; - - String queryText = - "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + "" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " - + " and " + TBLS + ".\"TBL_NAME\" = ? " - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " - + " and " + DBS + ".\"NAME\" = ? " - + join(joinsForFilter, ' ') - + " where " + DBS + ".\"CTLG_NAME\" = ? " - + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + orderForFilter; - Object[] params = new Object[paramsForFilter.size() + 3]; - params[0] = tblNameLcase; - params[1] = dbNameLcase; - params[2] = catNameLcase; - for (int i = 0; i < paramsForFilter.size(); ++i) { - params[i + 3] = paramsForFilter.get(i); - } - - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - if (max != null) { - query.setRange(0, max.shortValue()); - } - List<Object> sqlResult = executeWithArray(query, params, queryText); - long queryTime = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, queryTime); - if (sqlResult.isEmpty()) { - return Collections.emptyList(); // no partitions, bail early. - } - - List<Object> result = new ArrayList<Object>(sqlResult.size()); - for (Object fields : sqlResult) { - result.add(extractSqlLong(fields)); - } - query.closeAll(); - return result; - } - - /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName, - Boolean isView, List<Object> partIdList) throws MetaException { - boolean doTrace = LOG.isDebugEnabled(); - - int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma - int sbCapacity = partIdList.size() * idStringWidth; - - String partIds = getIdListForIn(partIdList); - - // Get most of the fields for the IDs provided. - // Assume db and table names are the same for all partition, as provided in arguments. - String queryText = - "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\"," - + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," - + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\"," - + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\"," - + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\", " + PARTITIONS - + ".\"WRITE_ID\"" + " from " + PARTITIONS + "" - + " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " - + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" " - + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc"; - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - List<Object[]> sqlResult = executeWithArray(query, null, queryText); - long queryTime = doTrace ? System.nanoTime() : 0; - Deadline.checkTimeout(); - - // Read all the fields and create partitions, SDs and serdes. - TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>(); - TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>(); - TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>(); - TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>(); - // Keep order by name, consistent with JDO. - ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size()); - - // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries. - StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity); - StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema. - tblName = tblName.toLowerCase(); - dbName = dbName.toLowerCase(); - catName = normalizeSpace(catName).toLowerCase(); - for (Object[] fields : sqlResult) { - // Here comes the ugly part... - long partitionId = extractSqlLong(fields[0]); - Long sdId = extractSqlLong(fields[1]); - Long colId = extractSqlLong(fields[2]); - Long serdeId = extractSqlLong(fields[3]); - // A partition must have at least sdId and serdeId set, or nothing set if it's a view. - if (sdId == null || serdeId == null) { - if (isView == null) { - isView = isViewTable(catName, dbName, tblName); - } - if ((sdId != null || colId != null || serdeId != null) || !isView) { - throw new MetaException("Unexpected null for one of the IDs, SD " + sdId + - ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view"); - } - } - - Partition part = new Partition(); - orderedResult.add(part); - // Set the collection fields; some code might not check presence before accessing them. - part.setParameters(new HashMap<>()); - part.setValues(new ArrayList<String>()); - part.setCatName(catName); - part.setDbName(dbName); - part.setTableName(tblName); - if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4])); - if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5])); - Long writeId = extractSqlLong(fields[14]); - if (writeId != null) { - part.setWriteId(writeId); - } - partitions.put(partitionId, part); - - - if (sdId == null) continue; // Probably a view. - assert serdeId != null; - - // We assume each partition has an unique SD. - StorageDescriptor sd = new StorageDescriptor(); - StorageDescriptor oldSd = sds.put(sdId, sd); - if (oldSd != null) { - throw new MetaException("Partitions reuse SDs; we don't expect that"); - } - // Set the collection fields; some code might not check presence before accessing them. - sd.setSortCols(new ArrayList<Order>()); - sd.setBucketCols(new ArrayList<String>()); - sd.setParameters(new HashMap<String, String>()); - sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(), - new ArrayList<List<String>>(), new HashMap<List<String>, String>())); - sd.setInputFormat((String)fields[6]); - Boolean tmpBoolean = extractSqlBoolean(fields[7]); - if (tmpBoolean != null) sd.setCompressed(tmpBoolean); - tmpBoolean = extractSqlBoolean(fields[8]); - if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean); - sd.setLocation((String)fields[9]); - if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10])); - sd.setOutputFormat((String)fields[11]); - sdSb.append(sdId).append(","); - part.setSd(sd); - - if (colId != null) { - List<FieldSchema> cols = colss.get(colId); - // We expect that colId will be the same for all (or many) SDs. - if (cols == null) { - cols = new ArrayList<FieldSchema>(); - colss.put(colId, cols); - colsSb.append(colId).append(","); - } - sd.setCols(cols); - } - - // We assume each SD has an unique serde. - SerDeInfo serde = new SerDeInfo(); - SerDeInfo oldSerde = serdes.put(serdeId, serde); - if (oldSerde != null) { - throw new MetaException("SDs reuse serdes; we don't expect that"); - } - serde.setParameters(new HashMap<String, String>()); - serde.setName((String)fields[12]); - serde.setSerializationLib((String)fields[13]); - serdeSb.append(serdeId).append(","); - sd.setSerdeInfo(serde); - - Deadline.checkTimeout(); - } - query.closeAll(); - timingTrace(doTrace, queryText, start, queryTime); - - // Now get all the one-to-many things. Start with partitions. - queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + "" - + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null" - + " order by \"PART_ID\" asc"; - loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() { - @Override - public void apply(Partition t, Object[] fields) { - t.putToParameters((String)fields[1], extractSqlClob(fields[2])); - }}); - // Perform conversion of null map values - for (Partition t : partitions.values()) { - t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); - } - - queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + "" - + " where \"PART_ID\" in (" + partIds + ")" - + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc"; - loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() { - @Override - public void apply(Partition t, Object[] fields) { - t.addToValues((String)fields[1]); - }}); - - // Prepare IN (blah) lists for the following queries. Cut off the final ','s. - if (sdSb.length() == 0) { - assert serdeSb.length() == 0 && colsSb.length() == 0; - return orderedResult; // No SDs, probably a view. - } - - String sdIds = trimCommaList(sdSb); - String serdeIds = trimCommaList(serdeSb); - String colIds = trimCommaList(colsSb); - - // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs. - queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + "" - + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null" - + " order by \"SD_ID\" asc"; - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - @Override - public void apply(StorageDescriptor t, Object[] fields) { - t.putToParameters((String)fields[1], extractSqlClob(fields[2])); - }}); - // Perform conversion of null map values - for (StorageDescriptor t : sds.values()) { - t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); - } - - queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\"" - + " from " + SORT_COLS + "" - + " where \"SD_ID\" in (" + sdIds + ")" - + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - @Override - public void apply(StorageDescriptor t, Object[] fields) { - if (fields[2] == null) return; - t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2]))); - }}); - - queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + "" - + " where \"SD_ID\" in (" + sdIds + ")" - + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - @Override - public void apply(StorageDescriptor t, Object[] fields) { - t.addToBucketCols((String)fields[1]); - }}); - - // Skewed columns stuff. - queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + "" - + " where \"SD_ID\" in (" + sdIds + ")" - + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; - boolean hasSkewedColumns = - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - @Override - public void apply(StorageDescriptor t, Object[] fields) { - if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo()); - t.getSkewedInfo().addToSkewedColNames((String)fields[1]); - }}) > 0; - - // Assume we don't need to fetch the rest of the skewed column data if we have no columns. - if (hasSkewedColumns) { - // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless. - queryText = - "select " + SKEWED_VALUES + ".\"SD_ID_OID\"," - + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\"," - + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" " - + "from " + SKEWED_VALUES + " " - + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "." - + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" " - + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") " - + " and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null " - + " and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 " - + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc," - + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc"; - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - private Long currentListId; - private List<String> currentList; - @Override - public void apply(StorageDescriptor t, Object[] fields) throws MetaException { - if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo()); - // Note that this is not a typical list accumulator - there's no call to finalize - // the last list. Instead we add list to SD first, as well as locally to add elements. - if (fields[1] == null) { - currentList = null; // left outer join produced a list with no values - currentListId = null; - t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList()); - } else { - long fieldsListId = extractSqlLong(fields[1]); - if (currentListId == null || fieldsListId != currentListId) { - currentList = new ArrayList<String>(); - currentListId = fieldsListId; - t.getSkewedInfo().addToSkewedColValues(currentList); - } - currentList.add((String)fields[2]); - } - }}); - - // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless. - queryText = - "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\"," - + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID," - + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\"," - + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" " - + "from " + SKEWED_COL_VALUE_LOC_MAP + "" - + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "." - + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" " - + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")" - + " and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null " - + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc," - + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc," - + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc"; - - loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { - private Long currentListId; - private List<String> currentList; - @Override - public void apply(StorageDescriptor t, Object[] fields) throws MetaException { - if (!t.isSetSkewedInfo()) { - SkewedInfo skewedInfo = new SkewedInfo(); - skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>()); - t.setSkewedInfo(skewedInfo); - } - Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps(); - // Note that this is not a typical list accumulator - there's no call to finalize - // the last list. Instead we add list to SD first, as well as locally to add elements. - if (fields[1] == null) { - currentList = new ArrayList<String>(); // left outer join produced a list with no values - currentListId = null; - } else { - long fieldsListId = extractSqlLong(fields[1]); - if (currentListId == null || fieldsListId != currentListId) { - currentList = new ArrayList<String>(); - currentListId = fieldsListId; - } else { - skewMap.remove(currentList); // value based compare.. remove first - } - currentList.add((String)fields[3]); - } - skewMap.put(currentList, (String)fields[2]); - }}); - } // if (hasSkewedColumns) - - // Get FieldSchema stuff if any. - if (!colss.isEmpty()) { - // We are skipping the CDS table here, as it seems to be totally useless. - queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\"" - + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")" - + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc"; - loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() { - @Override - public void apply(List<FieldSchema> t, Object[] fields) { - t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1])); - }}); - } - - // Finally, get all the stuff for serdes - just the params. - queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + "" - + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null" - + " order by \"SERDE_ID\" asc"; - loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() { - @Override - public void apply(SerDeInfo t, Object[] fields) { - t.putToParameters((String)fields[1], extractSqlClob(fields[2])); - }}); - // Perform conversion of null map values - for (SerDeInfo t : serdes.values()) { - t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); - } - - return orderedResult; - } - - public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws MetaException { - boolean doTrace = LOG.isDebugEnabled(); - String catName = filter.table.getCatName().toLowerCase(); - String dbName = filter.table.getDbName().toLowerCase(); - String tblName = filter.table.getTableName().toLowerCase(); - - // Get number of partitions by doing count on PART_ID. - String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " + PARTITIONS + "" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " - + " and " + TBLS + ".\"TBL_NAME\" = ? " - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " - + " and " + DBS + ".\"NAME\" = ? " - + join(filter.joins, ' ') - + " where " + DBS + ".\"CTLG_NAME\" = ? " - + (filter.filter == null || filter.filter.trim().isEmpty() ? "" : (" and " + filter.filter)); - - Object[] params = new Object[filter.params.size() + 3]; - params[0] = tblName; - params[1] = dbName; - params[2] = catName; - for (int i = 0; i < filter.params.size(); ++i) { - params[i + 3] = filter.params.get(i); - } - - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - query.setUnique(true); - int sqlResult = extractSqlInt(query.executeWithArray(params)); - long queryTime = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, queryTime); - return sqlResult; - } - - - private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) { - if (!doTrace) return; - LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " + - (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]"); - } - - static Long extractSqlLong(Object obj) throws MetaException { - if (obj == null) return null; - if (!(obj instanceof Number)) { - throw new MetaException("Expected numeric type but got " + obj.getClass().getName()); - } - return ((Number)obj).longValue(); - } - - /** - * Convert a boolean value returned from the RDBMS to a Java Boolean object. - * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping. - * - * @param value - * column value from the database - * @return The Boolean value of the database column value, null if the column - * value is null - * @throws MetaException - * if the column value cannot be converted into a Boolean object - */ - private static Boolean extractSqlBoolean(Object value) throws MetaException { - if (value == null) { - return null; - } - if (value instanceof Boolean) { - return (Boolean)value; - } - if (value instanceof String) { - try { - return BooleanUtils.toBooleanObject((String) value, "Y", "N", null); - } catch (IllegalArgumentException iae) { - // NOOP - } - } - throw new MetaException("Cannot extract boolean from column value " + value); - } - - private int extractSqlInt(Object field) { - return ((Number)field).intValue(); - } - - private String extractSqlString(Object value) { - if (value == null) return null; - return value.toString(); - } - - static Double extractSqlDouble(Object obj) throws MetaException { - if (obj == null) - return null; - if (!(obj instanceof Number)) { - throw new MetaException("Expected numeric type but got " + obj.getClass().getName()); - } - return ((Number) obj).doubleValue(); - } - - private String extractSqlClob(Object value) { - if (value == null) return null; - try { - if (value instanceof Clob) { - // we trim the Clob value to a max length an int can hold - int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2; - return ((Clob)value).getSubString(1L, maxLength); - } else { - return value.toString(); - } - } catch (SQLException sqle) { - return null; - } - } - - static byte[] extractSqlBlob(Object value) throws MetaException { - if (value == null) - return null; - if (value instanceof Blob) { - //derby, oracle - try { - // getBytes function says: pos the ordinal position of the first byte in - // the BLOB value to be extracted; the first byte is at position 1 - return ((Blob) value).getBytes(1, (int) ((Blob) value).length()); - } catch (SQLException e) { - throw new MetaException("Encounter error while processing blob."); - } - } - else if (value instanceof byte[]) { - // mysql, postgres, sql server - return (byte[]) value; - } - else { - // this may happen when enablebitvector is false - LOG.debug("Expected blob type but got " + value.getClass().getName()); - return null; - } - } - - /** - * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries. - * @param objectIds the objectId collection - * @return The concatenated list - * @throws MetaException If the list contains wrong data - */ - private static String getIdListForIn(List<Object> objectIds) throws MetaException { - return objectIds.stream() - .map(i -> i.toString()) - .collect(Collectors.joining(",")); - } - - private static String trimCommaList(StringBuilder sb) { - if (sb.length() > 0) { - sb.setLength(sb.length() - 1); - } - return sb.toString(); - } - - private abstract class ApplyFunc<Target> { - public abstract void apply(Target t, Object[] fields) throws MetaException; - } - - /** - * Merges applies the result of a PM SQL query into a tree of object. - * Essentially it's an object join. DN could do this for us, but it issues queries - * separately for every object, which is suboptimal. - * @param tree The object tree, by ID. - * @param queryText The query text. - * @param keyIndex Index of the Long column corresponding to the map ID in query result rows. - * @param func The function that is called on each (object,row) pair with the same id. - * @return the count of results returned from the query. - */ - private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree, - String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException { - boolean doTrace = LOG.isDebugEnabled(); - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - Object result = query.execute(); - long queryTime = doTrace ? System.nanoTime() : 0; - if (result == null) { - query.closeAll(); - return 0; - } - List<Object[]> list = ensureList(result); - Iterator<Object[]> iter = list.iterator(); - Object[] fields = null; - for (Map.Entry<Long, T> entry : tree.entrySet()) { - if (fields == null && !iter.hasNext()) break; - long id = entry.getKey(); - while (fields != null || iter.hasNext()) { - if (fields == null) { - fields = iter.next(); - } - long nestedId = extractSqlLong(fields[keyIndex]); - if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId); - if (nestedId > id) break; // fields belong to one of the next entries - func.apply(entry.getValue(), fields); - fields = null; - } - Deadline.checkTimeout(); - } - int rv = list.size(); - query.closeAll(); - timingTrace(doTrace, queryText, start, queryTime); - return rv; - } - - private static class PartitionFilterGenerator extends TreeVisitor { - private final Table table; - private final FilterBuilder filterBuffer; - private final List<Object> params; - private final List<String> joins; - private final boolean dbHasJoinCastBug; - private final String defaultPartName; - private final DatabaseProduct dbType; - private final String PARTITION_KEY_VALS, PARTITIONS, DBS, TBLS; - - private PartitionFilterGenerator(Table table, List<Object> params, List<String> joins, - boolean dbHasJoinCastBug, String defaultPartName, DatabaseProduct dbType, String schema) { - this.table = table; - this.params = params; - this.joins = joins; - this.dbHasJoinCastBug = dbHasJoinCastBug; - this.filterBuffer = new FilterBuilder(false); - this.defaultPartName = defaultPartName; - this.dbType = dbType; - this.PARTITION_KEY_VALS = getFullyQualifiedName(schema, "PARTITION_KEY_VALS"); - this.PARTITIONS = getFullyQualifiedName(schema, "PARTITIONS"); - this.DBS = getFullyQualifiedName(schema, "DBS"); - this.TBLS = getFullyQualifiedName(schema, "TBLS"); - } - - /** - * Generate the ANSI SQL92 filter for the given expression tree - * @param table the table being queried - * @param params the ordered parameters for the resulting expression - * @param joins the joins necessary for the resulting expression - * @return the string representation of the expression tree - */ - private static String generateSqlFilter(Table table, ExpressionTree tree, List<Object> params, - List<String> joins, boolean dbHasJoinCastBug, String defaultPartName, - DatabaseProduct dbType, String schema) throws MetaException { - assert table != null; - if (tree == null) { - // consistent with other APIs like makeExpressionTree, null is returned to indicate that - // the filter could not pushed down due to parsing issue etc - return null; - } - if (tree.getRoot() == null) { - return ""; - } - PartitionFilterGenerator visitor = new PartitionFilterGenerator( - table, params, joins, dbHasJoinCastBug, defaultPartName, dbType, schema); - tree.accept(visitor); - if (visitor.filterBuffer.hasError()) { - LOG.info("Unable to push down SQL filter: " + visitor.filterBuffer.getErrorMessage()); - return null; - } - - // Some joins might be null (see processNode for LeafNode), clean them up. - for (int i = 0; i < joins.size(); ++i) { - if (joins.get(i) != null) continue; - joins.remove(i--); - } - return "(" + visitor.filterBuffer.getFilter() + ")"; - } - - @Override - protected void beginTreeNode(TreeNode node) throws MetaException { - filterBuffer.append(" ("); - } - - @Override - protected void midTreeNode(TreeNode node) throws MetaException { - filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or "); - } - - @Override - protected void endTreeNode(TreeNode node) throws MetaException { - filterBuffer.append(") "); - } - - @Override - protected boolean shouldStop() { - return filterBuffer.hasError(); - } - - private static enum FilterType { - Integral, - String, - Date, - - Invalid; - - static FilterType fromType(String colTypeStr) { - if (colTypeStr.equals(ColumnType.STRING_TYPE_NAME)) { - return FilterType.String; - } else if (colTypeStr.equals(ColumnType.DATE_TYPE_NAME)) { - return FilterType.Date; - } else if (ColumnType.IntegralTypes.contains(colTypeStr)) { - return FilterType.Integral; - } - return FilterType.Invalid; - } - - public static FilterType fromClass(Object value) { - if (value instanceof String) { - return FilterType.String; - } else if (value instanceof Long) { - return FilterType.Integral; - } else if (value instanceof java.sql.Date) { - return FilterType.Date; - } - return FilterType.Invalid; - } - } - - @Override - public void visit(LeafNode node) throws MetaException { - if (node.operator == Operator.LIKE) { - filterBuffer.setError("LIKE is not supported for SQL filter pushdown"); - return; - } - int partColCount = table.getPartitionKeys().size(); - int partColIndex = node.getPartColIndexForFilter(table, filterBuffer); - if (filterBuffer.hasError()) return; - - // We skipped 'like', other ops should all work as long as the types are right. - String colTypeStr = table.getPartitionKeys().get(partColIndex).getType(); - FilterType colType = FilterType.fromType(colTypeStr); - if (colType == FilterType.Invalid) { - filterBuffer.setError("Filter pushdown not supported for type " + colTypeStr); - return; - } - FilterType valType = FilterType.fromClass(node.value); - Object nodeValue = node.value; - if (valType == FilterType.Invalid) { - filterBuffer.setError("Filter pushdown not supported for value " + node.value.getClass()); - return; - } - - // if Filter.g does date parsing for quoted strings, we'd need to verify there's no - // type mismatch when string col is filtered by a string that looks like date. - if (colType == FilterType.Date && valType == FilterType.String) { - // Filter.g cannot parse a quoted date; try to parse date here too. - try { - nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().parse((String)nodeValue); - valType = FilterType.Date; - } catch (ParseException pe) { // do nothing, handled below - types will mismatch - } - } - - // We format it so we are sure we are getting the right value - if (valType == FilterType.Date) { - // Format - nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(nodeValue); - } - - boolean isDefaultPartition = (valType == FilterType.String) && defaultPartName.equals(nodeValue); - if ((colType != valType) && (!isDefaultPartition)) { - // It's not clear how filtering for e.g. "stringCol > 5" should work (which side is - // to be coerced?). Let the expression evaluation sort this one out, not metastore. - filterBuffer.setError("Cannot push down filter for " - + colTypeStr + " column and value " + nodeValue.getClass()); - return; - } - - if (joins.isEmpty()) { - // There's a fixed number of partition cols that we might have filters on. To avoid - // joining multiple times for one column (if there are several filters on it), we will - // keep numCols elements in the list, one for each column; we will fill it with nulls, - // put each join at a corresponding index when necessary, and remove nulls in the end. - for (int i = 0; i < partColCount; ++i) { - joins.add(null); - } - } - if (joins.get(partColIndex) == null) { - joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"FILTER" + partColIndex - + "\" on \"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex); - } - - // Build the filter and add parameters linearly; we are traversing leaf nodes LTR. - String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\""; - - if (node.isReverseOrder) { - params.add(nodeValue); - } - String tableColumn = tableValue; - if ((colType != FilterType.String) && (!isDefaultPartition)) { - // The underlying database field is varchar, we need to compare numbers. - if (colType == FilterType.Integral) { - tableValue = "cast(" + tableValue + " as decimal(21,0))"; - } else if (colType == FilterType.Date) { - if (dbType == DatabaseProduct.ORACLE) { - // Oracle requires special treatment... as usual. - tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')"; - } else { - tableValue = "cast(" + tableValue + " as date)"; - } - } - - // Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now. - String tableValue0 = tableValue; - tableValue = "(case when " + tableColumn + " <> ?"; - params.add(defaultPartName); - - if (dbHasJoinCastBug) { - // This is a workaround for DERBY-6358 and Oracle bug; it is pretty horrible. - tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " - + DBS + ".\"CTLG_NAME\" = ? and " - + "\"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and " - + "\"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex); - params.add(table.getTableName().toLowerCase()); - params.add(table.getDbName().toLowerCase()); - params.add(table.getCatName().toLowerCase()); - } - tableValue += " then " + tableValue0 + " else null end)"; - } - if (!node.isReverseOrder) { - params.add(nodeValue); - } - - filterBuffer.append(node.isReverseOrder - ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")" - : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)"); - } - } - - /** - * Retrieve the column statistics for the specified columns of the table. NULL - * is returned if the columns are not provided. - * @param catName the catalog name of the table - * @param dbName the database name of the table - * @param tableName the table name - * @param colNames the list of the column names - * @return the column statistics for the specified columns - * @throws MetaException - */ - public ColumnStatistics getTableStats(final String catName, final String dbName, - final String tableName, List<String> colNames, - boolean enableBitVector) throws MetaException { - if (colNames == null || colNames.isEmpty()) { - return null; - } - final boolean doTrace = LOG.isDebugEnabled(); - final String queryText0 = "select " + getStatsList(enableBitVector) + " from " + TAB_COL_STATS - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in ("; - Batchable<String, Object[]> b = new Batchable<String, Object[]>() { - @Override - public List<Object[]> run(List<String> input) throws MetaException { - String queryText = queryText0 + makeParams(input.size()) + ")"; - Object[] params = new Object[input.size() + 3]; - params[0] = catName; - params[1] = dbName; - params[2] = tableName; - for (int i = 0; i < input.size(); ++i) { - params[i + 3] = input.get(i); - } - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - Object qResult = executeWithArray(query, params, queryText); - timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0)); - if (qResult == null) { - query.closeAll(); - return null; - } - addQueryAfterUse(query); - return ensureList(qResult); - } - }; - List<Object[]> list = Batchable.runBatched(batchSize, colNames, b); - if (list.isEmpty()) { - return null; - } - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); - csd.setCatName(catName); - ColumnStatistics result = makeColumnStats(list, csd, 0); - b.closeAllQueries(); - return result; - } - - public AggrStats aggrColStatsForPartitions(String catName, String dbName, String tableName, - List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation, - double ndvTuner, boolean enableBitVector) throws MetaException { - if (colNames.isEmpty() || partNames.isEmpty()) { - LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval"); - return new AggrStats(Collections.<ColumnStatisticsObj>emptyList(), 0); // Nothing to aggregate - } - long partsFound = 0; - List<ColumnStatisticsObj> colStatsList; - // Try to read from the cache first - if (isAggregateStatsCacheEnabled - && (partNames.size() < aggrStatsCache.getMaxPartsPerCacheNode())) { - AggrColStats colStatsAggrCached; - List<ColumnStatisticsObj> colStatsAggrFromDB; - int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); - double fpp = aggrStatsCache.getFalsePositiveProbability(); - colStatsList = new ArrayList<ColumnStatisticsObj>(); - // Bloom filter for the new node that we will eventually add to the cache - BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames); - boolean computePartsFound = true; - for (String colName : colNames) { - // Check the cache first - colStatsAggrCached = aggrStatsCache.get(catName, dbName, tableName, colName, partNames); - if (colStatsAggrCached != null) { - colStatsList.add(colStatsAggrCached.getColStats()); - partsFound = colStatsAggrCached.getNumPartsCached(); - } else { - if (computePartsFound) { - partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames); - computePartsFound = false; - } - List<String> colNamesForDB = new ArrayList<>(); - colNamesForDB.add(colName); - // Read aggregated stats for one column - colStatsAggrFromDB = - columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNamesForDB, - partsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); - if (!colStatsAggrFromDB.isEmpty()) { - ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0); - colStatsList.add(colStatsAggr); - // Update the cache to add this new aggregate node - aggrStatsCache.add(catName, dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter); - } - } - } - } else { - partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames); - colStatsList = - columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNames, partsFound, - useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); - } - LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation - + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = " - + Arrays.toString(colStatsList.toArray())); - return new AggrStats(colStatsList, partsFound); - } - - private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, double fpp, - List<String> partNames) { - BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp); - for (String partName : partNames) { - bloomFilter.add(partName.getBytes()); - } - return bloomFilter; - } - - private long partsFoundForPartitions( - final String catName, final String dbName, final String tableName, - final List<String> partNames, List<String> colNames) throws MetaException { - assert !colNames.isEmpty() && !partNames.isEmpty(); - final boolean doTrace = LOG.isDebugEnabled(); - final String queryText0 = "select count(\"COLUMN_NAME\") from " + PART_COL_STATS + "" - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " - + " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)" - + " group by \"PARTITION_NAME\""; - List<Long> allCounts = Batchable.runBatched(batchSize, colNames, new Batchable<String, Long>() { - @Override - public List<Long> run(final List<String> inputColName) throws MetaException { - return Batchable.runBatched(batchSize, partNames, new Batchable<String, Long>() { - @Override - public List<Long> run(List<String> inputPartNames) throws MetaException { - long partsFound = 0; - String queryText = String.format(queryText0, - makeParams(inputColName.size()), makeParams(inputPartNames.size())); - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - try { - Object qResult = executeWithArray(query, prepareParams( - catName, dbName, tableName, inputPartNames, inputColName), queryText); - long end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult; - Iterator<?> iter = fqr.iterator(); - while (iter.hasNext()) { - if (extractSqlLong(iter.next()) == inputColName.size()) { - partsFound++; - } - } - return Lists.<Long>newArrayList(partsFound); - } finally { - query.closeAll(); - } - } - }); - } - }); - long partsFound = 0; - for (Long val : allCounts) { - partsFound += val; - } - return partsFound; - } - - private List<ColumnStatisticsObj> columnStatisticsObjForPartitions( - final String catName, final String dbName, - final String tableName, final List<String> partNames, List<String> colNames, long partsFound, - final boolean useDensityFunctionForNDVEstimation, final double ndvTuner, final boolean enableBitVector) throws MetaException { - final boolean areAllPartsFound = (partsFound == partNames.size()); - return Batchable.runBatched(batchSize, colNames, new Batchable<String, ColumnStatisticsObj>() { - @Override - public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException { - return Batchable.runBatched(batchSize, partNames, new Batchable<String, ColumnStatisticsObj>() { - @Override - public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException { - return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames, - inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); - } - }); - } - }); - } - - public List<ColStatsObjWithSourceInfo> getColStatsForAllTablePartitions(String catName, String dbName, - boolean enableBitVector) throws MetaException { - String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector) - + " from " + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"CAT_NAME\" = ?"; - long start = 0; - long end = 0; - Query query = null; - boolean doTrace = LOG.isDebugEnabled(); - Object qResult = null; - start = doTrace ? System.nanoTime() : 0; - List<ColStatsObjWithSourceInfo> colStatsForDB = new ArrayList<ColStatsObjWithSourceInfo>(); - try { - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, new Object[] { dbName, catName }, queryText); - if (qResult == null) { - query.closeAll(); - return colStatsForDB; - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - List<Object[]> list = ensureList(qResult); - for (Object[] row : list) { - String tblName = (String) row[0]; - String partName = (String) row[1]; - ColumnStatisticsObj colStatObj = prepareCSObj(row, 2); - colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName, dbName, tblName, partName)); - Deadline.checkTimeout(); - } - } finally { - query.closeAll(); - } - return colStatsForDB; - } - - /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String catName, String dbName, - String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) - throws MetaException { - if (enableBitVector) { - return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } else { - return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } - } - - private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String dbName, String tableName, - List<String> partNames, List<String> colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // 1. get all the stats for colNames in partNames; - List<ColumnStatistics> partStats = - getPartitionStats(catName, dbName, tableName, partNames, colNames, true); - // 2. use util function to aggr stats - return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName, tableName, partNames, colNames, - areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); - } - - private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName, - String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // TODO: all the extrapolation logic should be moved out of this class, - // only mechanical data retrieval should remain here. - String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " - + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " - // The following data is used to compute a partitioned table's NDV based - // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be - // accurately derived from partition NDVs, because the domain of column value two partitions - // can overlap. If there is no overlap then global NDV is just the sum - // of partition NDVs (UpperBound). But if there is some overlay then - // global NDV can be anywhere between sum of partition NDVs (no overlap) - // and same as one of the partition NDV (domain of column value in all other - // partitions is subset of the domain value in one of the partition) - // (LowerBound).But under uniform distribution, we can roughly estimate the global - // NDV by leveraging the min/max values. - // And, we also guarantee that the estimation makes sense by comparing it to the - // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") - // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "; - String queryText = null; - long start = 0; - long end = 0; - Query query = null; - boolean doTrace = LOG.isDebugEnabled(); - Object qResult = null; - ForwardQueryResult<?> fqr = null; - // Check if the status of all the columns of all the partitions exists - // Extrapolation is not needed. - if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames), - queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - List<Object[]> list = ensureList(qResult); - List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size()); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - query.closeAll(); - return colStats; - } else { - // Extrapolation is needed for some columns. - // In this case, at least a column status for a partition is missing. - // We need to extrapolate this partition based on the other partitions - List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size()); - queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") " - + " from " + PART_COL_STATS - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " - + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames), - queryText); - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - List<String> noExtraColumnNames = new ArrayList<String>(); - Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>(); - List<Object[]> list = ensureList(qResult); - for (Object[] row : list) { - String colName = (String) row[0]; - String colType = (String) row[1]; - // Extrapolation is not needed for this column if - // count(\"PARTITION_NAME\")==partNames.size() - // Or, extrapolation is not possible for this column if - // count(\"PARTITION_NAME\")<2 - Long count = extractSqlLong(row[2]); - if (count == partNames.size() || count < 2) { - noExtraColumnNames.add(colName); - } else { - extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) }); - } - Deadline.checkTimeout(); - } - query.closeAll(); - // Extrapolation is not needed for columns noExtraColumnNames - if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" - + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - list = ensureList(qResult); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - } - // Extrapolation is needed for extraColumnNames. - // give a sequence number for all the partitions - if (extraColumnNameTypeParts.size() != 0) { - Map<String, Integer> indexMap = new HashMap<String, Integer>(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - // get sum for all columns to reduce the number of queries - Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>(); - queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " - + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) - + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) - + ") group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - List<String> extraColumnNames = new ArrayList<String>(); - extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - qResult = executeWithArray(query, - prepareParams(catName, dbName, tableName, partNames, extraColumnNames), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - list = ensureList(qResult); - // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 }; - for (Object[] row : list) { - Map<Integer, Object> indexToObject = new HashMap<Integer, Object>(); - for (int ind = 1; ind < row.length; ind++) { - indexToObject.put(sumIndex[ind - 1], row[ind]); - } - // row[0] is the column name - sumMap.put((String) row[0], indexToObject); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) { - Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; - String colName = entry.getKey(); - String colType = entry.getValue()[0]; - Long sumVal = Long.parseLong(entry.getValue()[1]); - // fill in colname - row[0] = colName; - // fill in coltype - row[1] = colType; - // use linear extrapolation. more complicated one can be added in the - // future. - IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); - // fill in colstatus - Integer[] index = null; - boolean decimal = false; - if (colType.toLowerCase().startsWith("decimal")) { - index = IExtrapolatePartStatus.indexMaps.get("decimal"); - decimal = true; - } else { - index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); - } - // if the colType is not the known type, long, double, etc, then get - // all index. - if (index == null) { - index = IExtrapolatePartStatus.indexMaps.get("default"); - } - for (int colStatIndex : index) { - String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; - // if the aggregation type is sum, we do a scale-up - if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { - Object o = sumMap.get(colName).get(colStatIndex); - if (o == null) { - row[2 + colStatIndex] = null; - } else { - Long val = extractSqlLong(o); - row[2 + colStatIndex] = val / sumVal * (partNames.size()); - } - } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min - || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { - // if the aggregation type is min/max, we extrapolate from the - // left/right borders - if (!decimal) { - queryText = "select \"" + colStatName - + "\",\"PARTITION_NAME\" from " + PART_COL_STATS - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " order by \"" + colStatName + "\""; - } else { - queryText = "select \"" + colStatName - + "\",\"PARTITION_NAME\" from " + PART_COL_STATS - + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " order by cast(\"" + colStatName + "\" as decimal)"; - } - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - fqr = (ForwardQueryResult<?>) qResult; - Object[] min = (Object[]) (fqr.get(0)); - Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - if (min[0] == null || max[0] == null) { - row[2 + colStatIndex] = null; - } else { - row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, - indexMap); - } - } else { - // if the aggregation type is avg, we use the average on the existing ones. - queryText = "select " - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + "" + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" - + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyList(); - } - fqr = (ForwardQueryResult<?>) qResult; - Object[] avg = (Object[]) (fqr.get(0)); - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - } - } - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - } - return colStats; - } - } - - private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); - Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], - declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], bitVector = row[i++], - avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++]; - StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, - llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, avglen, maxlen, trues, falses); - return cso; - } -
<TRUNCATED>