HIVE-19532 : 04 patch (Steve Yeom)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d46608e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d46608e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d46608e Branch: refs/heads/master-txnstats Commit: 1d46608e89c26ed123e96c3b79ef59b50d2349a6 Parents: 1a610cc Author: sergey <ser...@apache.org> Authored: Mon Jun 18 14:50:31 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Mon Jun 18 14:51:14 2018 -0700 ---------------------------------------------------------------------- .../listener/DummyRawStoreFailEvent.java | 45 +- pom.xml | 2 +- .../hive/ql/exec/ColumnStatsUpdateTask.java | 3 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 3 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 110 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 7 + .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 1 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 1 - .../apache/hadoop/hive/ql/metadata/Hive.java | 297 +- .../hive/ql/optimizer/StatsOptimizer.java | 56 +- .../hive/ql/stats/BasicStatsNoJobTask.java | 4 +- .../hadoop/hive/ql/stats/BasicStatsTask.java | 15 +- .../hadoop/hive/ql/stats/ColStatsProcessor.java | 7 + .../test/queries/clientpositive/stats_nonpart.q | 53 + ql/src/test/queries/clientpositive/stats_part.q | 98 + .../test/queries/clientpositive/stats_part2.q | 100 + .../test/queries/clientpositive/stats_sizebug.q | 37 + .../results/clientpositive/stats_nonpart.q.out | 500 ++ .../results/clientpositive/stats_part.q.out | 660 ++ .../results/clientpositive/stats_part2.q.out | 1261 ++++ .../results/clientpositive/stats_sizebug.q.out | 216 + .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2426 ++++--- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 70 +- .../ThriftHiveMetastore_server.skeleton.cpp | 2 +- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 6383 ++++++++++-------- .../gen/thrift/gen-cpp/hive_metastore_types.h | 412 +- .../metastore/api/AddPartitionsRequest.java | 215 +- .../hive/metastore/api/AddPartitionsResult.java | 126 +- .../hadoop/hive/metastore/api/AggrStats.java | 124 +- .../metastore/api/AlterPartitionsRequest.java | 966 +++ .../metastore/api/AlterPartitionsResponse.java | 283 + .../hive/metastore/api/ColumnStatistics.java | 335 +- .../hive/metastore/api/GetTableRequest.java | 219 +- .../hive/metastore/api/GetTableResult.java | 124 +- .../metastore/api/IsolationLevelCompliance.java | 48 + .../hadoop/hive/metastore/api/Partition.java | 333 +- .../hive/metastore/api/PartitionSpec.java | 337 +- .../metastore/api/PartitionsStatsRequest.java | 219 +- .../metastore/api/PartitionsStatsResult.java | 124 +- .../api/SetPartitionsStatsRequest.java | 215 +- .../apache/hadoop/hive/metastore/api/Table.java | 333 +- .../hive/metastore/api/TableStatsRequest.java | 219 +- .../hive/metastore/api/TableStatsResult.java | 124 +- .../hive/metastore/api/ThriftHiveMetastore.java | 2553 ++++--- .../gen-php/metastore/ThriftHiveMetastore.php | 1231 ++-- .../src/gen/thrift/gen-php/metastore/Types.php | 905 +++ .../hive_metastore/ThriftHiveMetastore-remote | 8 +- .../hive_metastore/ThriftHiveMetastore.py | 834 ++- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 590 +- .../gen/thrift/gen-rb/hive_metastore_types.rb | 162 +- .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 27 +- .../hadoop/hive/metastore/AlterHandler.java | 2 +- .../hadoop/hive/metastore/HiveAlterHandler.java | 20 +- .../hadoop/hive/metastore/HiveMetaStore.java | 108 +- .../hive/metastore/HiveMetaStoreClient.java | 118 +- .../hadoop/hive/metastore/IHMSHandler.java | 5 + .../hadoop/hive/metastore/IMetaStoreClient.java | 45 +- .../hadoop/hive/metastore/ObjectStore.java | 487 +- .../apache/hadoop/hive/metastore/RawStore.java | 150 +- .../hive/metastore/cache/CachedStore.java | 140 +- .../hadoop/hive/metastore/model/MPartition.java | 18 +- .../model/MPartitionColumnStatistics.java | 9 + .../hadoop/hive/metastore/model/MTable.java | 19 + .../metastore/model/MTableColumnStatistics.java | 9 + .../metastore/txn/CompactionTxnHandler.java | 66 +- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 94 + .../hadoop/hive/metastore/txn/TxnUtils.java | 20 +- .../src/main/resources/package.jdo | 18 + .../main/sql/derby/hive-schema-3.0.0.derby.sql | 11 +- .../main/sql/derby/hive-schema-4.0.0.derby.sql | 10 +- .../sql/derby/upgrade-3.1.0-to-4.0.0.derby.sql | 8 +- .../main/sql/mssql/hive-schema-3.0.0.mssql.sql | 14 +- .../main/sql/mssql/hive-schema-4.0.0.mssql.sql | 14 +- .../sql/mssql/upgrade-3.1.0-to-4.0.0.mssql.sql | 8 + .../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 6 + .../main/sql/mysql/hive-schema-4.0.0.mysql.sql | 6 + .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 2 +- .../sql/mysql/upgrade-3.1.0-to-4.0.0.mysql.sql | 8 + .../sql/oracle/hive-schema-3.0.0.oracle.sql | 15 +- .../sql/oracle/hive-schema-4.0.0.oracle.sql | 14 +- .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 2 +- .../oracle/upgrade-3.1.0-to-4.0.0.oracle.sql | 7 + .../sql/postgres/hive-schema-3.0.0.postgres.sql | 19 +- .../sql/postgres/hive-schema-4.0.0.postgres.sql | 14 +- .../upgrade-3.1.0-to-4.0.0.postgres.sql | 8 + .../src/main/thrift/hive_metastore.thrift | 76 +- .../DummyRawStoreControlledCommit.java | 104 +- .../DummyRawStoreForJdoConnection.java | 99 +- .../HiveMetaStoreClientPreCatalog.java | 108 +- .../metastore/client/TestAlterPartitions.java | 3 +- .../hadoop/hive/common/ValidTxnWriteIdList.java | 4 + .../org/apache/hive/common/util/TxnIdUtils.java | 20 +- 92 files changed, 18056 insertions(+), 7275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 8f9a03f..498b2c6 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -266,6 +266,12 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public Table getTable(String catName, String dbName, String tableName, + long txnId, String writeIdList) throws MetaException { + return objectStore.getTable(catName, dbName, tableName, txnId, writeIdList); + } + + @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { return objectStore.addPartition(part); @@ -278,6 +284,13 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public Partition getPartition(String catName, String dbName, String tableName, + List<String> partVals, long txnId, String writeIdList) + throws MetaException, NoSuchObjectException { + return objectStore.getPartition(catName, dbName, tableName, partVals, txnId, writeIdList); + } + + @Override public boolean dropPartition(String catName, String dbName, String tableName, List<String> partVals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { @@ -376,9 +389,10 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public void alterPartitions(String catName, String dbName, String tblName, - List<List<String>> partValsList, List<Partition> newParts) + List<List<String>> partValsList, List<Partition> newParts, + long txnId, String writeIdList) throws InvalidObjectException, MetaException { - objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts); + objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, txnId, writeIdList); } @Override @@ -685,6 +699,14 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tableName, + List<String> colNames, + long txnId, String writeIdList) + throws MetaException, NoSuchObjectException { + return objectStore.getTableColumnStatistics(catName, dbName, tableName, colNames, txnId, writeIdList); + } + + @Override public boolean deleteTableColumnStatistics(String catName, String dbName, String tableName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { @@ -778,6 +800,17 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, + String tblName, List<String> colNames, + List<String> partNames, + long txnId, + String writeIdList) + throws MetaException, NoSuchObjectException { + return objectStore.getPartitionColumnStatistics( + catName, dbName, tblName , colNames, partNames, txnId, writeIdList); + } + + @Override public boolean doesPartitionExist(String catName, String dbName, String tableName, List<FieldSchema> partKeys, List<String> partVals) throws MetaException, NoSuchObjectException { @@ -855,6 +888,14 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public AggrStats get_aggr_stats_for(String catName, String dbName, + String tblName, List<String> partNames, List<String> colNames, + long txnId, String writeIdList) + throws MetaException { + return null; + } + + @Override public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { return objectStore.getNextNotification(rqst); } http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5202248..4278104 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ </modules> <properties> - <hive.version.shortname>3.1.0</hive.version.shortname> + <hive.version.shortname>4.0.0</hive.version.shortname> <!-- Build Properties --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index a53ff5a..7795c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -46,11 +46,14 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 8e32b02..ec32edf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1301,8 +1301,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throw new AssertionError("Unsupported alter materialized view type! : " + alterMVDesc.getOp()); } - db.alterTable(mv, environmentContext); - + db.alterTable(mv,environmentContext); return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7fce67f..7d35084 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -33,6 +33,7 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; +import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -40,13 +41,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -57,9 +56,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -1621,6 +1623,102 @@ public class AcidUtils { } } + public static class TableSnapshot { + private long txnId; + private String validWriteIdList; + + public TableSnapshot() { + } + + public TableSnapshot(long txnId, String validWriteIdList) { + this.txnId = txnId; + this.validWriteIdList = validWriteIdList; + } + + public long getTxnId() { + return txnId; + } + + public String getValidWriteIdList() { + return validWriteIdList; + } + + public void setTxnId(long txnId) { + this.txnId = txnId; + } + + public void setValidWriteIdList(String validWriteIdList) { + this.validWriteIdList = validWriteIdList; + } + } + + /** + * Create a TableShopshot with the given "conf" + * for the table of the given "tbl". + * + * @param conf + * @param tbl + * @return TableSnapshot on success, null on failure + * @throws LockException + */ + public static TableSnapshot getTableSnapshot( + Configuration conf, + Table tbl) throws LockException { + if (!isTransactionalTable(tbl)) { + return null; + } else { + long txnId = 0; + ValidWriteIdList validWriteIdList = null; + + HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr(); + + if (sessionTxnMgr != null) { + txnId = sessionTxnMgr.getCurrentTxnId(); + } + String fullTableName = getFullTableName(tbl.getDbName(), tbl.getTableName()); + if (txnId > 0 && isTransactionalTable(tbl)) { + validWriteIdList = + getTableValidWriteIdList(conf, fullTableName); + + if (validWriteIdList == null) { + validWriteIdList = getTableValidWriteIdListWithTxnList( + conf, tbl.getDbName(), tbl.getTableName()); + } + } + return new TableSnapshot(txnId, + validWriteIdList != null ? validWriteIdList.toString() : null); + } + } + + /** + * Returns ValidWriteIdList for the table with the given "dbName" and "tableName". + * This is called when HiveConf has no list for the table. + * Otherwise use getTableSnapshot(). + * @param conf Configuration + * @param dbName + * @param tableName + * @return ValidWriteIdList on success, null on failure to get a list. + * @throws LockException + */ + public static ValidWriteIdList getTableValidWriteIdListWithTxnList( + Configuration conf, String dbName, String tableName) throws LockException { + HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr(); + if (sessionTxnMgr == null) { + return null; + } + ValidWriteIdList validWriteIdList = null; + ValidTxnWriteIdList validTxnWriteIdList = null; + + String validTxnList = conf.get(ValidTxnList.VALID_TXNS_KEY); + List<String> tablesInput = new ArrayList<>(); + String fullTableName = getFullTableName(dbName, tableName); + tablesInput.add(fullTableName); + + validTxnWriteIdList = sessionTxnMgr.getValidWriteIds(tablesInput, validTxnList); + return validTxnWriteIdList != null ? + validTxnWriteIdList.getTableValidWriteIdList(fullTableName) : null; + } + public static String getFullTableName(String dbName, String tableName) { return dbName.toLowerCase() + "." + tableName.toLowerCase(); } @@ -1908,8 +2006,8 @@ public class AcidUtils { } public static boolean isAcidEnabled(HiveConf hiveConf) { - String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); - boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + String txnMgr = hiveConf.getVar(ConfVars.HIVE_TXN_MANAGER); + boolean concurrency = hiveConf.getBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY); String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; if (txnMgr.equals(dbTxnMgr) && concurrency) { return true; http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 4fd1d4e..9104786 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -1017,9 +1017,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl { @Override public long getTableWriteId(String dbName, String tableName) throws LockException { assert isTxnOpen(); + return getTableWriteId(dbName, tableName, true); + } + + private long getTableWriteId( + String dbName, String tableName, boolean allocateIfNotYet) throws LockException { String fullTableName = AcidUtils.getFullTableName(dbName, tableName); if (tableWriteIds.containsKey(fullTableName)) { return tableWriteIds.get(fullTableName); + } else if (!allocateIfNotYet) { + return 0; } try { long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName); http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index ab9d67e..78bb303 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -77,6 +77,7 @@ class DummyTxnManager extends HiveTxnManagerImpl { public long getTableWriteId(String dbName, String tableName) throws LockException { return 0L; } + @Override public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy, List<TxnToWriteId> srcTxnToWriteIdList) throws LockException { http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 5f68e08..9ea40f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -276,7 +276,6 @@ public interface HiveTxnManager { * if {@code isTxnOpen()}, returns the table write ID associated with current active transaction. */ long getTableWriteId(String dbName, String tableName) throws LockException; - /** * Allocates write id for each transaction in the list. * @param dbName database name http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2ec131e..3918e62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -63,21 +63,13 @@ import javax.jdo.JDODataStoreException; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptMaterialization; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.hep.HepPlanner; import org.apache.calcite.plan.hep.HepProgramBuilder; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelVisitor; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.RelBuilder; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -87,13 +79,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ObjectPair; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.common.log.InPlaceUpdate; @@ -114,60 +100,7 @@ import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest; -import org.apache.hadoop.hive.metastore.api.CmRecycleRequest; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.CreationMetadata; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.DefaultConstraintsRequest; -import org.apache.hadoop.hive.metastore.api.EnvironmentContext; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.FireEventRequest; -import org.apache.hadoop.hive.metastore.api.FireEventRequestData; -import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; -import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; -import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; -import org.apache.hadoop.hive.metastore.api.HiveObjectRef; -import org.apache.hadoop.hive.metastore.api.HiveObjectType; -import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.Materialization; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; -import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; -import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.PrivilegeBag; -import org.apache.hadoop.hive.metastore.api.Role; -import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; -import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; -import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMMapping; -import org.apache.hadoop.hive.metastore.api.WMNullablePool; -import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMPool; -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMTrigger; -import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; @@ -180,7 +113,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAugmentMaterializationRule; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; @@ -202,7 +134,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.TxnIdUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -652,6 +583,12 @@ public class Hive { alterTable(newTbl.getDbName(), newTbl.getTableName(), newTbl, false, environmentContext); } + + public void alterTable(String fullyQlfdTblName, Table newTbl, EnvironmentContext environmentContext) + throws HiveException { + alterTable(fullyQlfdTblName, newTbl, false, environmentContext); + } + /** * Updates the existing table metadata with the new metadata. * @@ -659,13 +596,17 @@ public class Hive { * name of the existing table * @param newTbl * new name of the table. could be the old name + * @param transactional + * Need to generate and save a table snapshot into the metastore? * @throws InvalidOperationException * if the changes in metadata is not acceptable * @throws TException */ - public void alterTable(String fullyQlfdTblName, Table newTbl, EnvironmentContext environmentContext) + public void alterTable(String fullyQlfdTblName, Table newTbl, EnvironmentContext environmentContext, + boolean transactional) throws HiveException { - alterTable(fullyQlfdTblName, newTbl, false, environmentContext); + String[] names = Utilities.getDbTableName(fullyQlfdTblName); + alterTable(names[0], names[1], newTbl, false, environmentContext, transactional); } public void alterTable(String fullyQlfdTblName, Table newTbl, boolean cascade, EnvironmentContext environmentContext) @@ -673,9 +614,13 @@ public class Hive { String[] names = Utilities.getDbTableName(fullyQlfdTblName); alterTable(names[0], names[1], newTbl, cascade, environmentContext); } - public void alterTable(String dbName, String tblName, Table newTbl, boolean cascade, - EnvironmentContext environmentContext) + EnvironmentContext environmentContext) + throws HiveException { + alterTable(dbName, tblName, newTbl, cascade, environmentContext, true); + } + public void alterTable(String dbName, String tblName, Table newTbl, boolean cascade, + EnvironmentContext environmentContext, boolean transactional) throws HiveException { try { @@ -690,6 +635,12 @@ public class Hive { if (cascade) { environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); } + + // Take a table snapshot and set it to newTbl. + if (transactional) { + setTableSnapshotForTransactionalTable(conf, newTbl); + } + getMSC().alter_table_with_environmentContext(dbName, tblName, newTbl.getTTable(), environmentContext); } catch (MetaException e) { throw new HiveException("Unable to alter table. " + e.getMessage(), e); @@ -739,6 +690,29 @@ public class Hive { */ public void alterPartition(String dbName, String tblName, Partition newPart, EnvironmentContext environmentContext) throws InvalidOperationException, HiveException { + alterPartition(dbName, tblName, newPart, environmentContext, true); + } + + /** + * Updates the existing partition metadata with the new metadata. + * + * @param dbName + * name of the exiting table's database + * @param tblName + * name of the existing table + * @param newPart + * new partition + * @param environmentContext + * environment context for the method + * @param transactional + * indicates this call is for transaction stats + * @throws InvalidOperationException + * if the changes in metadata is not acceptable + * @throws TException + */ + public void alterPartition(String dbName, String tblName, Partition newPart, + EnvironmentContext environmentContext, boolean transactional) + throws InvalidOperationException, HiveException { try { validatePartition(newPart); String location = newPart.getLocation(); @@ -746,6 +720,9 @@ public class Hive { location = Utilities.getQualifiedPath(conf, new Path(location)); newPart.setLocation(location); } + if (transactional) { + setTableSnapshotForTransactionalPartition(conf, newPart); + } getSynchronizedMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext); } catch (MetaException e) { @@ -763,6 +740,10 @@ public class Hive { newPart.checkValidity(); } + public void alterPartitions(String tblName, List<Partition> newParts, EnvironmentContext environmentContext) + throws InvalidOperationException, HiveException { + alterPartitions(tblName, newParts, environmentContext, false); + } /** * Updates the existing table metadata with the new metadata. * @@ -770,16 +751,23 @@ public class Hive { * name of the existing table * @param newParts * new partitions + * @param transactional + * Need to generate and save a table snapshot into the metastore? * @throws InvalidOperationException * if the changes in metadata is not acceptable * @throws TException */ - public void alterPartitions(String tblName, List<Partition> newParts, EnvironmentContext environmentContext) + public void alterPartitions(String tblName, List<Partition> newParts, + EnvironmentContext environmentContext, boolean transactional) throws InvalidOperationException, HiveException { String[] names = Utilities.getDbTableName(tblName); List<org.apache.hadoop.hive.metastore.api.Partition> newTParts = new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>(); try { + AcidUtils.TableSnapshot tableSnapshot = null; + if (transactional) { + tableSnapshot = AcidUtils.getTableSnapshot(conf, newParts.get(0).getTable()); + } // Remove the DDL time so that it gets refreshed for (Partition tmpPart: newParts) { if (tmpPart.getParameters() != null) { @@ -792,7 +780,9 @@ public class Hive { } newTParts.add(tmpPart.getTPartition()); } - getMSC().alter_partitions(names[0], names[1], newTParts, environmentContext); + getMSC().alter_partitions(names[0], names[1], newTParts, environmentContext, + tableSnapshot != null ? tableSnapshot.getTxnId() : -1, + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); } catch (MetaException e) { throw new HiveException("Unable to alter partition. " + e.getMessage(), e); } catch (TException e) { @@ -923,6 +913,8 @@ public class Hive { tTbl.setPrivileges(principalPrivs); } } + // Set table snapshot to api.Table to make it persistent. + setTableSnapshotForTransactionalTable(conf, tbl); if (primaryKeys == null && foreignKeys == null && uniqueConstraints == null && notNullConstraints == null && defaultConstraints == null && checkConstraints == null) { @@ -1125,7 +1117,27 @@ public class Hive { * @throws HiveException */ public Table getTable(final String dbName, final String tableName, - boolean throwException) throws HiveException { + boolean throwException) throws HiveException { + return this.getTable(dbName, tableName, throwException, false); + } + + /** + * Returns metadata of the table + * + * @param dbName + * the name of the database + * @param tableName + * the name of the table + * @param throwException + * controls whether an exception is thrown or a returns a null + * @param checkTransactional + * checks whether the metadata table stats are valid (or + * compilant with the snapshot isolation of) for the current transaction. + * @return the table or if throwException is false a null value. + * @throws HiveException + */ + public Table getTable(final String dbName, final String tableName, + boolean throwException, boolean checkTransactional) throws HiveException { if (tableName == null || tableName.equals("")) { throw new HiveException("empty table creation??"); @@ -1134,7 +1146,19 @@ public class Hive { // Get the table from metastore org.apache.hadoop.hive.metastore.api.Table tTable = null; try { - tTable = getMSC().getTable(dbName, tableName); + if (checkTransactional) { + ValidWriteIdList validWriteIdList = null; + long txnId = SessionState.get().getTxnMgr() != null ? + SessionState.get().getTxnMgr().getCurrentTxnId() : 0; + if (txnId > 0) { + validWriteIdList = AcidUtils.getTableValidWriteIdListWithTxnList(conf, + dbName, tableName); + } + tTable = getMSC().getTable(dbName, tableName, txnId, + validWriteIdList != null ? validWriteIdList.toString() : null); + } else { + tTable = getMSC().getTable(dbName, tableName); + } } catch (NoSuchObjectException e) { if (throwException) { LOG.error("Table " + dbName + "." + tableName + " not found: " + e.getMessage()); @@ -1791,6 +1815,7 @@ public class Hive { Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); + setTableSnapshotForTransactionalPartition(conf, newTPart); // Generate an insert event only if inserting into an existing partition // When inserting into a new partition, the add partition event takes care of insert event @@ -2424,8 +2449,13 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException { try { - return new Partition(tbl, getMSC().add_partition( - Partition.createMetaPartitionObject(tbl, partSpec, null))); + org.apache.hadoop.hive.metastore.api.Partition part = + Partition.createMetaPartitionObject(tbl, partSpec, null); + AcidUtils.TableSnapshot tableSnapshot = + AcidUtils.getTableSnapshot(conf, tbl); + part.setTxnId(tableSnapshot != null ? tableSnapshot.getTxnId() : 0); + part.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); + return new Partition(tbl, getMSC().add_partition(part)); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -2437,8 +2467,16 @@ private void constructOneLBLocationMap(FileStatus fSta, int size = addPartitionDesc.getPartitionCount(); List<org.apache.hadoop.hive.metastore.api.Partition> in = new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>(size); + AcidUtils.TableSnapshot tableSnapshot = + AcidUtils.getTableSnapshot(conf, tbl); for (int i = 0; i < size; ++i) { - in.add(convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf)); + org.apache.hadoop.hive.metastore.api.Partition tmpPart = + convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf); + if (tmpPart != null && tableSnapshot != null && tableSnapshot.getTxnId() > 0) { + tmpPart.setTxnId(tableSnapshot.getTxnId()); + tmpPart.setValidWriteIdList(tableSnapshot.getValidWriteIdList()); + } + in.add(tmpPart); } List<Partition> out = new ArrayList<Partition>(); try { @@ -2633,7 +2671,8 @@ private void constructOneLBLocationMap(FileStatus fSta, if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) { fullName = tbl.getFullyQualifiedName(); } - alterPartition(fullName, new Partition(tbl, tpart), null); + Partition newPart = new Partition(tbl, tpart); + alterPartition(fullName, newPart, null); } private void alterPartitionSpecInMemory(Table tbl, @@ -4359,8 +4398,16 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - public boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) throws HiveException { + public boolean setPartitionColumnStatistics( + SetPartitionsStatsRequest request) throws HiveException { try { + ColumnStatistics colStat = request.getColStats().get(0); + ColumnStatisticsDesc statsDesc = colStat.getStatsDesc(); + Table tbl = getTable(statsDesc.getDbName(), statsDesc.getTableName()); + + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl); + request.setTxnId(tableSnapshot != null ? tableSnapshot.getTxnId() : 0); + request.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); return getMSC().setPartitionColumnStatistics(request); } catch (Exception e) { LOG.debug(StringUtils.stringifyException(e)); @@ -4370,8 +4417,27 @@ private void constructOneLBLocationMap(FileStatus fSta, public List<ColumnStatisticsObj> getTableColumnStatistics( String dbName, String tableName, List<String> colNames) throws HiveException { + return getTableColumnStatistics(dbName, tableName, colNames, false); + } + + public List<ColumnStatisticsObj> getTableColumnStatistics( + String dbName, String tableName, List<String> colNames, boolean checkTransactional) + throws HiveException { + + List<ColumnStatisticsObj> retv = null; try { - return getMSC().getTableColumnStatistics(dbName, tableName, colNames); + if (checkTransactional) { + Table tbl = getTable(dbName, tableName); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl); + if (tableSnapshot.getTxnId() > 0) { + retv = getMSC().getTableColumnStatistics(dbName, tableName, colNames, + tableSnapshot != null ? tableSnapshot.getTxnId() : -1, + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); + } + } else { + retv = getMSC().getTableColumnStatistics(dbName, tableName, colNames); + } + return retv; } catch (Exception e) { LOG.debug(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -4380,8 +4446,25 @@ private void constructOneLBLocationMap(FileStatus fSta, public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tableName, List<String> partNames, List<String> colNames) throws HiveException { - try { - return getMSC().getPartitionColumnStatistics(dbName, tableName, partNames, colNames); + return getPartitionColumnStatistics(dbName, tableName, partNames, colNames, false); + } + + public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics( + String dbName, String tableName, List<String> partNames, List<String> colNames, + boolean checkTransactional) + throws HiveException { + long txnId = -1; + String writeIdList = null; + try { + if (checkTransactional) { + Table tbl = getTable(dbName, tableName); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl); + txnId = tableSnapshot != null ? tableSnapshot.getTxnId() : -1; + writeIdList = tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null; + } + + return getMSC().getPartitionColumnStatistics(dbName, tableName, partNames, colNames, + txnId, writeIdList); } catch (Exception e) { LOG.debug(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -4390,8 +4473,22 @@ private void constructOneLBLocationMap(FileStatus fSta, public AggrStats getAggrColStatsFor(String dbName, String tblName, List<String> colNames, List<String> partName) { - try { - return getMSC().getAggrColStatsFor(dbName, tblName, colNames, partName); + return getAggrColStatsFor(dbName, tblName, colNames, partName, false); + } + + public AggrStats getAggrColStatsFor(String dbName, String tblName, + List<String> colNames, List<String> partName, boolean checkTransactional) { + long txnId = -1; + String writeIdList = null; + try { + if (checkTransactional) { + Table tbl = getTable(dbName, tblName); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl); + txnId = tableSnapshot != null ? tableSnapshot.getTxnId() : -1; + writeIdList = tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null; + } + return getMSC().getAggrColStatsFor(dbName, tblName, colNames, partName, + txnId, writeIdList); } catch (Exception e) { LOG.debug(StringUtils.stringifyException(e)); return new AggrStats(new ArrayList<ColumnStatisticsObj>(),0); @@ -5189,4 +5286,26 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } } + + private void setTableSnapshotForTransactionalTable( + HiveConf conf, Table newTbl) + throws LockException { + + org.apache.hadoop.hive.metastore.api.Table newTTbl = newTbl.getTTable(); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, newTbl); + + newTTbl.setTxnId(tableSnapshot != null ? tableSnapshot.getTxnId() : -1); + newTTbl.setValidWriteIdList( + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); + } + + private void setTableSnapshotForTransactionalPartition(HiveConf conf, Partition partition) + throws LockException { + + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, partition.getTable()); + org.apache.hadoop.hive.metastore.api.Partition tpartition = partition.getTPartition(); + tpartition.setTxnId(tableSnapshot != null ? tableSnapshot.getTxnId() : -1); + tpartition.setValidWriteIdList( + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java index 857f300..4d69f4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -282,7 +283,17 @@ public class StatsOptimizer extends Transform { // limit. In order to be safe, we do not use it now. return null; } + + Hive hive = Hive.get(pctx.getConf()); Table tbl = tsOp.getConf().getTableMetadata(); + boolean isTransactionalTable = AcidUtils.isTransactionalTable(tbl); + + // If the table is transactional, get stats state by calling getTable() with + // transactional flag on to check the validity of table stats. + if (isTransactionalTable) { + tbl = hive.getTable(tbl.getDbName(), tbl.getTableName(), true, true); + } + if (MetaStoreUtils.isExternalTable(tbl.getTTable())) { Logger.info("Table " + tbl.getTableName() + " is external. Skip StatsOptimizer."); return null; @@ -291,11 +302,7 @@ public class StatsOptimizer extends Transform { Logger.info("Table " + tbl.getTableName() + " is non Native table. Skip StatsOptimizer."); return null; } - if (AcidUtils.isTransactionalTable(tbl)) { - //todo: should this be OK for MM table? - Logger.info("Table " + tbl.getTableName() + " is ACID table. Skip StatsOptimizer."); - return null; - } + Long rowCnt = getRowCnt(pctx, tsOp, tbl); // if we can not have correct table stats, then both the table stats and column stats are not useful. if (rowCnt == null) { @@ -375,7 +382,8 @@ public class StatsOptimizer extends Transform { List<Object> oneRow = new ArrayList<Object>(); - Hive hive = Hive.get(pctx.getConf()); + AcidUtils.TableSnapshot tableSnapshot = + AcidUtils.getTableSnapshot(pctx.getConf(), tbl); for (AggregationDesc aggr : pgbyOp.getConf().getAggregators()) { if (aggr.getDistinct()) { @@ -462,8 +470,13 @@ public class StatsOptimizer extends Transform { + " are not up to date."); return null; } - List<ColumnStatisticsObj> stats = hive.getMSC().getTableColumnStatistics( - tbl.getDbName(), tbl.getTableName(), Lists.newArrayList(colName)); + + List<ColumnStatisticsObj> stats = + hive.getMSC().getTableColumnStatistics( + tbl.getDbName(), tbl.getTableName(), + Lists.newArrayList(colName), + tableSnapshot != null ? tableSnapshot.getTxnId() : -1, + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); if (stats.isEmpty()) { Logger.debug("No stats for " + tbl.getTableName() + " column " + colName); return null; @@ -523,8 +536,13 @@ public class StatsOptimizer extends Transform { + " are not up to date."); return null; } - List<ColumnStatisticsObj> stats = hive.getMSC().getTableColumnStatistics( - tbl.getDbName(),tbl.getTableName(), Lists.newArrayList(colName)); + + List<ColumnStatisticsObj> stats = + hive.getMSC().getTableColumnStatistics( + tbl.getDbName(), tbl.getTableName(), + Lists.newArrayList(colName), + tableSnapshot.getTxnId(), + tableSnapshot.getValidWriteIdList()); if (stats.isEmpty()) { Logger.debug("No stats for " + tbl.getTableName() + " column " + colName); return null; @@ -664,9 +682,12 @@ public class StatsOptimizer extends Transform { + " are not up to date."); return null; } - ColumnStatisticsData statData = hive.getMSC().getTableColumnStatistics( - tbl.getDbName(), tbl.getTableName(), Lists.newArrayList(colName)) - .get(0).getStatsData(); + ColumnStatisticsData statData = + hive.getMSC().getTableColumnStatistics( + tbl.getDbName(), tbl.getTableName(), Lists.newArrayList(colName), + tableSnapshot.getTxnId(), + tableSnapshot.getValidWriteIdList()) + .get(0).getStatsData(); String name = colDesc.getTypeString().toUpperCase(); switch (type) { case Integer: { @@ -887,7 +908,7 @@ public class StatsOptimizer extends Transform { } private Collection<List<ColumnStatisticsObj>> verifyAndGetPartColumnStats( - Hive hive, Table tbl, String colName, Set<Partition> parts) throws TException { + Hive hive, Table tbl, String colName, Set<Partition> parts) throws TException, LockException { List<String> partNames = new ArrayList<String>(parts.size()); for (Partition part : parts) { if (!StatsUtils.areColumnStatsUptoDateForQueryAnswering(part.getTable(), part.getParameters(), colName)) { @@ -897,8 +918,13 @@ public class StatsOptimizer extends Transform { } partNames.add(part.getName()); } + AcidUtils.TableSnapshot tableSnapshot = + AcidUtils.getTableSnapshot(hive.getConf(), tbl); + Map<String, List<ColumnStatisticsObj>> result = hive.getMSC().getPartitionColumnStatistics( - tbl.getDbName(), tbl.getTableName(), partNames, Lists.newArrayList(colName)); + tbl.getDbName(), tbl.getTableName(), partNames, Lists.newArrayList(colName), + tableSnapshot != null ? tableSnapshot.getTxnId() : -1, + tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null); if (result.size() != parts.size()) { Logger.debug("Received " + result.size() + " stats for " + parts.size() + " partitions"); return null; http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java index d4d46a3..9a271a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java @@ -344,12 +344,12 @@ public class BasicStatsNoJobTask implements IStatsProcessor { } if (values.get(0).result instanceof Table) { - db.alterTable(tableFullName, (Table) values.get(0).result, environmentContext); + db.alterTable(tableFullName, (Table) values.get(0).result, environmentContext, true); LOG.debug("Updated stats for {}.", tableFullName); } else { if (values.get(0).result instanceof Partition) { List<Partition> results = Lists.transform(values, FooterStatCollector.EXTRACT_RESULT_FUNCTION); - db.alterPartitions(tableFullName, results, environmentContext); + db.alterPartitions(tableFullName, results, environmentContext, true); LOG.debug("Bulk updated {} partitions of {}.", results.size(), tableFullName); } else { throw new RuntimeException("inconsistent"); http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java index 8c23887..0a2992d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java @@ -127,10 +127,7 @@ public class BasicStatsTask implements Serializable, IStatsProcessor { public Object process(StatsAggregator statsAggregator) throws HiveException, MetaException { Partish p = partish; Map<String, String> parameters = p.getPartParameters(); - if (p.isTransactionalTable()) { - // TODO: this should also happen on any error. Right now this task will just fail. - StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); - } else if (work.isTargetRewritten()) { + if (work.isTargetRewritten()) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); } @@ -208,12 +205,6 @@ public class BasicStatsTask implements Serializable, IStatsProcessor { private void updateStats(StatsAggregator statsAggregator, Map<String, String> parameters, String aggKey, boolean isFullAcid) throws HiveException { for (String statType : StatsSetupConst.statsRequireCompute) { - if (isFullAcid && !work.isTargetRewritten()) { - // Don't bother with aggregation in this case, it will probably be invalid. - parameters.remove(statType); - continue; - } - String value = statsAggregator.aggregateStats(aggKey, statType); if (value != null && !value.isEmpty()) { long longValue = Long.parseLong(value); @@ -272,7 +263,7 @@ public class BasicStatsTask implements Serializable, IStatsProcessor { if (res == null) { return 0; } - db.alterTable(tableFullName, res, environmentContext); + db.alterTable(tableFullName, res, environmentContext, true); if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) { console.printInfo("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']'); @@ -340,7 +331,7 @@ public class BasicStatsTask implements Serializable, IStatsProcessor { } if (!updates.isEmpty()) { - db.alterPartitions(tableFullName, updates, environmentContext); + db.alterPartitions(tableFullName, updates, environmentContext, true); } if (work.isStatsReliable() && updates.size() != processors.size()) { LOG.info("Stats should be reliadble...however seems like there were some issue.. => ret 1"); http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java index d4cfd0a..acebf52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java @@ -34,12 +34,14 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -176,6 +178,11 @@ public class ColStatsProcessor implements IStatsProcessor { } SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats); request.setNeedMerge(colStatDesc.isNeedMerge()); + if (AcidUtils.isTransactionalTable(tbl) && SessionState.get().getTxnMgr() != null) { + request.setTxnId(SessionState.get().getTxnMgr().getCurrentTxnId()); + request.setValidWriteIdList(AcidUtils.getTableValidWriteIdList(conf, + AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName())).toString()); + } db.setPartitionColumnStatistics(request); return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/test/queries/clientpositive/stats_nonpart.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_nonpart.q b/ql/src/test/queries/clientpositive/stats_nonpart.q new file mode 100644 index 0000000..f6019cc --- /dev/null +++ b/ql/src/test/queries/clientpositive/stats_nonpart.q @@ -0,0 +1,53 @@ +set hive.stats.dbclass=fs; +set hive.stats.fetch.column.stats=true; +set datanucleus.cache.collections=false; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; +set hive.compute.query.using.stats=true; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; + +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.query.results.cache.enabled=false; + +-- create source. +drop table if exists mysource; +create table mysource (p int,key int); +insert into mysource values (100,20), (101,40), (102,50); +insert into mysource values (100,30), (101,50), (102,60); + +-- test nonpartitioned table +drop table if exists stats_nonpartitioned; + +--create table stats_nonpartitioned(key int, value int) stored as orc; +create table stats_nonpartitioned(key int, value int) stored as orc tblproperties ("transactional"="true"); +--create table stats_nonpartitioned(key int, value int) stored as orc tblproperties tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + + +explain select count(*) from stats_nonpartitioned; +select count(*) from stats_nonpartitioned; +desc formatted stats_nonpartitioned; + +explain insert into table stats_nonpartitioned select * from mysource where p == 100; +insert into table stats_nonpartitioned select * from mysource where p == 100; + +desc formatted stats_nonpartitioned; + +explain select count(*) from stats_nonpartitioned; +select count(*) from stats_nonpartitioned; +explain select count(key) from stats_nonpartitioned; +select count(key) from stats_nonpartitioned; + +--analyze table stats_nonpartitioned compute statistics; +analyze table stats_nonpartitioned compute statistics for columns key, value; + +explain select count(*) from stats_nonpartitioned; +select count(*) from stats_nonpartitioned; +explain select count(key) from stats_nonpartitioned; +select count(key) from stats_nonpartitioned; + http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/test/queries/clientpositive/stats_part.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_part.q b/ql/src/test/queries/clientpositive/stats_part.q new file mode 100644 index 0000000..d0812e1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/stats_part.q @@ -0,0 +1,98 @@ +set hive.stats.dbclass=fs; +set hive.stats.fetch.column.stats=true; +set datanucleus.cache.collections=false; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; +set hive.compute.query.using.stats=true; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; + +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.query.results.cache.enabled=false; + +-- create source. +drop table if exists mysource; +create table mysource (p int, key int, value int); +insert into mysource values (100,20,201), (101,40,401), (102,50,501); +insert into mysource values (100,21,211), (101,41,411), (102,51,511); + +--explain select count(*) from mysource; +--select count(*) from mysource; + +-- Gather col stats manually +--analyze table mysource compute statistics for columns p, key; + +--explain select count(*) from mysource; +--select count(*) from mysource; +--explain select count(key) from mysource; +--select count(key) from mysource; + +-- test partitioned table +drop table if exists stats_partitioned; + +--create table stats_part(key int,value string) partitioned by (p int) stored as orc; +create table stats_part(key int,value string) partitioned by (p int) stored as orc tblproperties ("transactional"="true"); +--create table stats_part(key int,value string) partitioned by (p int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +explain select count(key) from stats_part; +--select count(*) from stats_part; +--explain select count(*) from stats_part where p = 100; +--select count(*) from stats_part where p = 100; +explain select count(key) from stats_part where p > 100; +--select count(*) from stats_part where p > 100; +desc formatted stats_part; + +--explain insert into table stats_part partition(p=100) select distinct key, value from mysource where p == 100; +insert into table stats_part partition(p=100) select distinct key, value from mysource where p == 100; +insert into table stats_part partition(p=101) select distinct key, value from mysource where p == 101; +insert into table stats_part partition(p=102) select distinct key, value from mysource where p == 102; + +desc formatted stats_part; + +insert into table mysource values (103,20,200), (103,83,832), (103,53,530); +insert into table stats_part partition(p=102) select distinct key, value from mysource where p == 102; + +desc formatted stats_part; +show partitions stats_part; + +explain select count(*) from stats_part; +select count(*) from stats_part; +explain select count(key) from stats_part; +select count(key) from stats_part; +explain select count(key) from stats_part where p > 100; +select count(key) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; + +--update stats_part set key = key + 100 where key in(-50,40) and p > 100; +desc formatted stats_part; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; + +select count(value) from stats_part; +--update stats_part set value = concat(value, 'updated') where cast(key as integer) in(40,53) and p > 100; +select count(value) from stats_part; + +--delete from stats_part where key in (20, 41); +desc formatted stats_part; + +explain select count(*) from stats_part where p = 100; +select count(*) from stats_part where p = 100; +explain select count(*) from stats_part where p > 100; +select count(*) from stats_part where p > 100; +explain select count(key) from stats_part; +select count(key) from stats_part; +explain select count(*) from stats_part where p > 100; +select count(*) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; + +describe extended stats_part partition (p=101); +describe extended stats_part; + + http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/test/queries/clientpositive/stats_part2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_part2.q b/ql/src/test/queries/clientpositive/stats_part2.q new file mode 100644 index 0000000..24be218 --- /dev/null +++ b/ql/src/test/queries/clientpositive/stats_part2.q @@ -0,0 +1,100 @@ +set hive.stats.dbclass=fs; +set hive.stats.fetch.column.stats=true; +set datanucleus.cache.collections=false; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; +set hive.compute.query.using.stats=true; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; + +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.query.results.cache.enabled=false; + +-- create source. +drop table if exists mysource; +create table mysource (p int, key int, value string); +insert into mysource values (100,20,'value20'), (101,40,'string40'), (102,50,'string50'); +insert into mysource values (100,21,'value21'), (101,41,'value41'), (102,51,'value51'); + +-- test partitioned table +drop table if exists stats_partitioned; + +--create table stats_part(key int,value string) partitioned by (p int) stored as orc; +create table stats_part(key int,value string) partitioned by (p int) stored as orc tblproperties ("transactional"="true"); +--create table stats_part(key int,value string) partitioned by (p int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +--explain select count(*) from stats_part; +--select count(*) from stats_part; +--explain select count(*) from stats_part where p = 100; +--select count(*) from stats_part where p = 100; +explain select count(*) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; +--select count(*) from stats_part where p > 100; +desc formatted stats_part; + +--explain insert into table stats_part partition(p=100) select distinct key, value from mysource where p == 100; +insert into table stats_part partition(p=100) select distinct key, value from mysource where p == 100; +insert into table stats_part partition(p=101) select distinct key, value from mysource where p == 101; +insert into table stats_part partition(p=102) select distinct key, value from mysource where p == 102; + +desc formatted stats_part; +explain select count(key) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; + +insert into table mysource values (103,20,'value20'), (103,83,'value83'), (103,53,'value53'); +insert into table stats_part partition(p=102) select distinct key, value from mysource where p == 102; + +desc formatted stats_part; +show partitions stats_part; + +explain select count(*) from stats_part; +select count(*) from stats_part; +explain select count(key) from stats_part; +select count(key) from stats_part; +explain select count(key) from stats_part where p > 100; +select count(key) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; + +desc formatted stats_part partition(p = 100); +desc formatted stats_part partition(p = 101); +desc formatted stats_part partition(p = 102); +update stats_part set key = key + 100 where key in(-50,40) and p > 100; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; +desc formatted stats_part partition(p = 100); +desc formatted stats_part partition(p = 101); +desc formatted stats_part partition(p = 102); + +select count(value) from stats_part; +update stats_part set value = concat(value, 'updated') where cast(key as integer) in(40,53) and p > 100; +desc formatted stats_part partition(p = 100); +desc formatted stats_part partition(p = 101); +desc formatted stats_part partition(p = 102); +select count(value) from stats_part; + +delete from stats_part where key in (20, 41); +desc formatted stats_part partition(p = 100); +desc formatted stats_part partition(p = 101); +desc formatted stats_part partition(p = 102); + +explain select count(*) from stats_part where p = 100; +select count(*) from stats_part where p = 100; +explain select count(*) from stats_part where p > 100; +select count(*) from stats_part where p > 100; +explain select count(key) from stats_part; +select count(key) from stats_part; +explain select count(*) from stats_part where p > 100; +select count(*) from stats_part where p > 100; +explain select max(key) from stats_part where p > 100; +select max(key) from stats_part where p > 100; + +describe extended stats_part partition (p=101); +describe extended stats_part; + + http://git-wip-us.apache.org/repos/asf/hive/blob/1d46608e/ql/src/test/queries/clientpositive/stats_sizebug.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_sizebug.q b/ql/src/test/queries/clientpositive/stats_sizebug.q new file mode 100644 index 0000000..7108766 --- /dev/null +++ b/ql/src/test/queries/clientpositive/stats_sizebug.q @@ -0,0 +1,37 @@ +set hive.stats.dbclass=fs; +set hive.stats.fetch.column.stats=true; +set datanucleus.cache.collections=false; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; +set hive.compute.query.using.stats=true; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; + +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.query.results.cache.enabled=false; + +-- create source. +drop table if exists mysource; +create table mysource (p int,key int); +insert into mysource values (100,20), (101,40), (102,50); +insert into mysource values (100,20), (101,40), (102,50); + +-- test nonpartitioned table +drop table if exists stats_nonpartitioned; + +--create table stats_nonpartitioned(key int, value int) stored as orc; +create table stats_nonpartitioned(key int, value int) stored as orc tblproperties ("transactional"="true"); +--create table stats_nonpartitioned(key int, value int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +explain insert into table stats_nonpartitioned select * from mysource where p == 100; +insert into table stats_nonpartitioned select * from mysource where p == 100; + +desc formatted stats_nonpartitioned; +analyze table mysource compute statistics for columns p, key; +desc formatted stats_nonpartitioned; + +