HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d85beaa9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d85beaa9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d85beaa9 Branch: refs/heads/master Commit: d85beaa99ba349d9334d3d96abb6e89c94db8481 Parents: 952fe6e Author: Vaibhav Gumashta <vgumas...@hortonworks.com> Authored: Mon May 22 15:52:58 2017 -0700 Committer: Vaibhav Gumashta <vgumas...@hortonworks.com> Committed: Mon May 22 15:52:58 2017 -0700 ---------------------------------------------------------------------- .../listener/DummyRawStoreFailEvent.java | 4 +- .../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +- .../hive/metastore/MetaStoreDirectSql.java | 73 +- .../hadoop/hive/metastore/MetaStoreUtils.java | 11 +- .../hadoop/hive/metastore/ObjectStore.java | 19 +- .../apache/hadoop/hive/metastore/RawStore.java | 8 +- .../hive/metastore/StatObjectConverter.java | 148 +++ .../hadoop/hive/metastore/cache/CacheUtils.java | 31 + .../hive/metastore/cache/CachedStore.java | 943 ++++++++++++------- .../hive/metastore/cache/SharedCache.java | 293 +++++- .../hadoop/hive/metastore/hbase/HBaseStore.java | 2 +- .../stats/merge/ColumnStatsMergerFactory.java | 18 +- .../stats/merge/DateColumnStatsMerger.java | 55 ++ .../DummyRawStoreControlledCommit.java | 2 +- .../DummyRawStoreForJdoConnection.java | 2 +- .../hive/metastore/cache/TestCachedStore.java | 450 ++++++++- 16 files changed, 1637 insertions(+), 424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 91a3a38..3dc63bd 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -914,9 +914,9 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { - return objectStore.getAggrColStatsForTablePartitions(dbName, tableName); + return objectStore.getColStatsForTablePartitions(dbName, tableName); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index d296851..111cc11 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -350,7 +350,7 @@ public class QTestUtil { if (!useHBaseMetastore) { // Plug verifying metastore in for testing DirectSQL. conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, - "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); } else { conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName()); conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true); http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b96c27e..df73693 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1208,7 +1208,9 @@ class MetaStoreDirectSql { } }; List<Object[]> list = runBatched(colNames, b); - if (list.isEmpty()) return null; + if (list.isEmpty()) { + return null; + } ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); ColumnStatistics result = makeColumnStats(list, csd, 0); b.closeAllQueries(); @@ -1343,41 +1345,26 @@ class MetaStoreDirectSql { // Get aggregated column stats for a table per partition for all columns in the partition // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, - String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " - + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " - // The following data is used to compute a partitioned table's NDV based - // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be - // accurately derived from partition NDVs, because the domain of column value two partitions - // can overlap. If there is no overlap then global NDV is just the sum - // of partition NDVs (UpperBound). But if there is some overlay then - // global NDV can be anywhere between sum of partition NDVs (no overlap) - // and same as one of the partition NDV (domain of column value in all other - // partitions is subset of the domain value in one of the partition) - // (LowerBound).But under uniform distribution, we can roughly estimate the global - // NDV by leveraging the min/max values. - // And, we also guarantee that the estimation makes sense by comparing it to the - // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") - // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, + String tblName) throws MetaException { + String queryText = + "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", " + + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", " + + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", " + + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\"" + + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + + " order by \"PARTITION_NAME\""; long start = 0; long end = 0; Query query = null; boolean doTrace = LOG.isDebugEnabled(); Object qResult = null; - ForwardQueryResult fqr = null; start = doTrace ? System.nanoTime() : 0; query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText); + qResult = + executeWithArray(query, + prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), + queryText); if (qResult == null) { query.closeAll(); return Maps.newHashMap(); @@ -1385,13 +1372,31 @@ class MetaStoreDirectSql { end = doTrace ? System.nanoTime() : 0; timingTrace(doTrace, queryText, start, end); List<Object[]> list = ensureList(qResult); - Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>(); + Map<String, List<ColumnStatisticsObj>> partColStatsMap = + new HashMap<String, List<ColumnStatisticsObj>>(); + String partNameCurrent = null; + List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>(); for (Object[] row : list) { String partName = (String) row[0]; - String colName = (String) row[1]; - partColStatsMap.put( - CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName), - prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner)); + if (partNameCurrent == null) { + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList<ColumnStatisticsObj>(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else if (!partNameCurrent.equalsIgnoreCase(partName)) { + // Save the previous partition and its col stat list + partColStatsMap.put(partNameCurrent, partColStatsList); + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList<ColumnStatisticsObj>(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else { + partColStatsList.add(prepareCSObj(row, 1)); + } Deadline.checkTimeout(); } query.closeAll(); http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 870896c..8328428 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1171,6 +1171,15 @@ public class MetaStoreUtils { return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols()); } + public static List<String> getColumnNamesForTable(Table table) { + List<String> colNames = new ArrayList<String>(); + Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator(); + while (colsIterator.hasNext()) { + colNames.add(colsIterator.next().getName()); + } + return colNames; + } + public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) { // we first take a look if any fieldSchemas contain COMMA for (int i = 0; i < fieldSchemas.size(); i++) { @@ -1180,7 +1189,7 @@ public class MetaStoreUtils { } return String.valueOf(SerDeUtils.COMMA); } - + /** * Convert FieldSchemas to columnNames. */ http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b28983f..19becb8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7173,23 +7173,18 @@ public class ObjectStore implements RawStore, Configurable { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { - final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); - final double ndvTuner = HiveConf.getFloatVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); - return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) { + return new GetHelper<Map<String, List<ColumnStatisticsObj>>>(dbName, tableName, true, false) { @Override - protected Map<String, ColumnStatisticsObj> getSqlResult( - GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException { - return directSql.getAggrColStatsForTablePartitions(dbName, tblName, - useDensityFunctionForNDVEstimation, ndvTuner); + protected Map<String, List<ColumnStatisticsObj>> getSqlResult( + GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException { + return directSql.getColStatsForTablePartitions(dbName, tblName); } @Override - protected Map<String, ColumnStatisticsObj> getJdoResult( - GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException, + protected Map<String, List<ColumnStatisticsObj>> getJdoResult( + GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException, NoSuchObjectException { // This is fast path for query optimizations, if we can find this info // quickly using directSql, do it. No point in failing back to slow path http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index c1af690..964ffb2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -579,14 +579,16 @@ public interface RawStore extends Configurable { List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException; /** - * Get all partition column statistics for a table + * Get all partition column statistics for a table in a db + * * @param dbName * @param tableName - * @return Map of partition column statistics + * @return Map of partition column statistics. Key in the map is partition name. Value is a list + * of column stat object for each column in the partition * @throws MetaException * @throws NoSuchObjectException */ - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java index fcf6f27..2dc2804 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.model.MPartition; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; import org.apache.hadoop.hive.metastore.model.MTable; @@ -700,4 +701,151 @@ public class StatObjectConverter { return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString(); } + /** + * Set field values in oldStatObj from newStatObj + * @param oldStatObj + * @param newStatObj + */ + public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj, + ColumnStatisticsObj newStatObj) { + _Fields typeNew = newStatObj.getStatsData().getSetField(); + _Fields typeOld = oldStatObj.getStatsData().getSetField(); + typeNew = typeNew == typeOld ? typeNew : null; + switch (typeNew) { + case BOOLEAN_STATS: + BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats(); + BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats(); + if (newBooleanStatsData.isSetNumTrues()) { + oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues()); + } + if (newBooleanStatsData.isSetNumFalses()) { + oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses()); + } + if (newBooleanStatsData.isSetNumNulls()) { + oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls()); + } + if (newBooleanStatsData.isSetBitVectors()) { + oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors()); + } + break; + case LONG_STATS: { + LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats(); + LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats(); + if (newLongStatsData.isSetHighValue()) { + oldLongStatsData.setHighValue(newLongStatsData.getHighValue()); + } + if (newLongStatsData.isSetLowValue()) { + oldLongStatsData.setLowValue(newLongStatsData.getLowValue()); + } + if (newLongStatsData.isSetNumNulls()) { + oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls()); + } + if (newLongStatsData.isSetNumDVs()) { + oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs()); + } + if (newLongStatsData.isSetBitVectors()) { + oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors()); + } + break; + } + case DOUBLE_STATS: { + DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats(); + DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats(); + if (newDoubleStatsData.isSetHighValue()) { + oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue()); + } + if (newDoubleStatsData.isSetLowValue()) { + oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue()); + } + if (newDoubleStatsData.isSetNumNulls()) { + oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls()); + } + if (newDoubleStatsData.isSetNumDVs()) { + oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs()); + } + if (newDoubleStatsData.isSetBitVectors()) { + oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors()); + } + break; + } + case STRING_STATS: { + StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats(); + StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats(); + if (newStringStatsData.isSetMaxColLen()) { + oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen()); + } + if (newStringStatsData.isSetAvgColLen()) { + oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen()); + } + if (newStringStatsData.isSetNumNulls()) { + oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls()); + } + if (newStringStatsData.isSetNumDVs()) { + oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs()); + } + if (newStringStatsData.isSetBitVectors()) { + oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors()); + } + break; + } + case BINARY_STATS: + BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats(); + BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats(); + if (newBinaryStatsData.isSetMaxColLen()) { + oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen()); + } + if (newBinaryStatsData.isSetAvgColLen()) { + oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen()); + } + if (newBinaryStatsData.isSetNumNulls()) { + oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls()); + } + if (newBinaryStatsData.isSetBitVectors()) { + oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors()); + } + break; + case DECIMAL_STATS: { + DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats(); + DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats(); + if (newDecimalStatsData.isSetHighValue()) { + oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue()); + } + if (newDecimalStatsData.isSetLowValue()) { + oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue()); + } + if (newDecimalStatsData.isSetNumNulls()) { + oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls()); + } + if (newDecimalStatsData.isSetNumDVs()) { + oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs()); + } + if (newDecimalStatsData.isSetBitVectors()) { + oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors()); + } + break; + } + case DATE_STATS: { + DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats(); + DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats(); + if (newDateStatsData.isSetHighValue()) { + oldDateStatsData.setHighValue(newDateStatsData.getHighValue()); + } + if (newDateStatsData.isSetLowValue()) { + oldDateStatsData.setLowValue(newDateStatsData.getLowValue()); + } + if (newDateStatsData.isSetNumNulls()) { + oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls()); + } + if (newDateStatsData.isSetNumDVs()) { + oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs()); + } + if (newDateStatsData.isSetBitVectors()) { + oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors()); + } + break; + } + default: + throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index 668499b..280655d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -38,6 +38,10 @@ public class CacheUtils { return dbName + delimit + tableName; } + public static String buildKeyWithDelimit(String dbName, String tableName) { + return buildKey(dbName, tableName) + delimit; + } + public static String buildKey(String dbName, String tableName, List<String> partVals) { String key = buildKey(dbName, tableName); if (partVals == null || partVals.size() == 0) { @@ -52,11 +56,38 @@ public class CacheUtils { return key; } + public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) { + return buildKey(dbName, tableName, partVals) + delimit; + } + public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) { String key = buildKey(dbName, tableName, partVals); return key + delimit + colName; } + public static String buildKey(String dbName, String tableName, String colName) { + String key = buildKey(dbName, tableName); + return key + delimit + colName; + } + + public static String[] splitTableColStats(String key) { + return key.split(delimit); + } + + public static Object[] splitPartitionColStats(String key) { + Object[] result = new Object[4]; + String[] comps = key.split(delimit); + result[0] = comps[0]; + result[1] = comps[1]; + List<String> vals = new ArrayList<String>(); + for (int i=2;i<comps.length-2;i++) { + vals.add(comps[i]); + } + result[2] = vals; + result[3] = comps[comps.length-1]; + return result; + } + public static Table assemble(TableWrapper wrapper) { Table t = wrapper.getTable().deepCopy(); if (wrapper.getSdHash()!=null) { http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 1cc838f..78aab91 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -26,12 +26,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.ObjectStore; @@ -41,18 +44,11 @@ import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Date; -import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; -import org.apache.hadoop.hive.metastore.api.Decimal; -import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; -import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; import org.apache.hadoop.hive.metastore.api.Function; @@ -61,7 +57,6 @@ import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -77,13 +72,14 @@ import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger; +import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -104,15 +100,25 @@ import com.google.common.annotations.VisibleForTesting; // TODO initial load slow? // TODO size estimation // TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation -// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object -// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects) public class CachedStore implements RawStore, Configurable { private static ScheduledExecutorService cacheUpdateMaster = null; - private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null); + private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( + true); + private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); RawStore rawStore; Configuration conf; private PartitionExpressionProxy expressionProxy = null; + // Default value set to 100 milliseconds for test purpose + private long cacheRefreshPeriod = 100; static boolean firstTime = true; static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @@ -209,6 +215,8 @@ public class CachedStore implements RawStore, Configurable { LOG.info("Prewarming CachedStore"); prewarm(); LOG.info("CachedStore initialized"); + // Start the cache update master-worker threads + startCacheUpdateService(); } catch (Exception e) { throw new RuntimeException(e); } @@ -216,7 +224,10 @@ public class CachedStore implements RawStore, Configurable { } } - private void prewarm() throws Exception { + @VisibleForTesting + void prewarm() throws Exception { + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); List<String> dbNames = rawStore.getAllDatabases(); for (String dbName : dbNames) { Database db = rawStore.getDatabase(dbName); @@ -226,35 +237,81 @@ public class CachedStore implements RawStore, Configurable { Table table = rawStore.getTable(dbName, tblName); SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), table); + Deadline.startTimer("getPartitions"); List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); for (Partition partition : partitions) { SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partition); } - Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore - .getAggrColStatsForTablePartitions(dbName, tblName); - SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition); + // Cache partition column stats + Deadline.startTimer("getColStatsForTablePartitions"); + Map<String, List<ColumnStatisticsObj>> colStatsPerPartition = + rawStore.getColStatsForTablePartitions(dbName, tblName); + Deadline.stopTimer(); + if (colStatsPerPartition != null) { + SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + } + // Cache table column stats + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { + SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } } } - // Start the cache update master-worker threads - startCacheUpdateService(); } - private synchronized void startCacheUpdateService() { + @VisibleForTesting + synchronized void startCacheUpdateService() { if (cacheUpdateMaster == null) { cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId()); t.setDaemon(true); return t; } }); - cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf - .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, - TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + cacheRefreshPeriod = + HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, + TimeUnit.MILLISECONDS); + } + LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod, + cacheRefreshPeriod, TimeUnit.MILLISECONDS); } } + @VisibleForTesting + synchronized boolean stopCacheUpdateService(long timeout) { + boolean tasksStoppedBeforeShutdown = false; + if (cacheUpdateMaster != null) { + LOG.info("CachedStore: shutting down cache update service"); + try { + tasksStoppedBeforeShutdown = + cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to " + + "complete before shutting down. Will make a hard stop now."); + } + cacheUpdateMaster.shutdownNow(); + cacheUpdateMaster = null; + } + return tasksStoppedBeforeShutdown; + } + + @VisibleForTesting + void setCacheRefreshPeriod(long time) { + this.cacheRefreshPeriod = time; + } + static class CacheUpdateMasterWork implements Runnable { private CachedStore cachedStore; @@ -265,86 +322,175 @@ public class CachedStore implements RawStore, Configurable { @Override public void run() { - runningMasterThread.set(Thread.currentThread()); - RawStore rawStore = cachedStore.getRawStore(); + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); + LOG.debug("CachedStore: updating cached objects"); + String rawStoreClassName = + HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, + ObjectStore.class.getName()); try { + RawStore rawStore = + ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance(); + rawStore.setConf(cachedStore.conf); List<String> dbNames = rawStore.getAllDatabases(); - // Update the database in cache - if (!updateDatabases(rawStore, dbNames)) { - return; - } - // Update the tables and their partitions in cache - if (!updateTables(rawStore, dbNames)) { - return; + if (dbNames != null) { + // Update the database in cache + updateDatabases(rawStore, dbNames); + for (String dbName : dbNames) { + // Update the tables in cache + updateTables(rawStore, dbName); + List<String> tblNames = cachedStore.getAllTables(dbName); + for (String tblName : tblNames) { + // Update the partitions for a table in cache + updateTablePartitions(rawStore, dbName, tblName); + // Update the table column stats for a table in cache + updateTableColStats(rawStore, dbName, tblName); + // Update the partitions column stats for a table in cache + updateTablePartitionColStats(rawStore, dbName, tblName); + } + } } } catch (MetaException e) { LOG.error("Updating CachedStore: error getting database names", e); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); } } - private boolean updateDatabases(RawStore rawStore, List<String> dbNames) { - if (dbNames != null) { - List<Database> databases = new ArrayList<Database>(); - for (String dbName : dbNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; + private void updateDatabases(RawStore rawStore, List<String> dbNames) { + // Prepare the list of databases + List<Database> databases = new ArrayList<Database>(); + for (String dbName : dbNames) { + Database db; + try { + db = rawStore.getDatabase(dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + } + } + // Update the cached database objects + try { + if (databaseCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isDatabaseCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping database cache update; the database list we have is dirty."); + return; } - Database db; - try { - db = rawStore.getDatabase(dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + SharedCache.refreshDatabases(databases); + } + } finally { + if (databaseCacheLock.isWriteLockedByCurrentThread()) { + databaseCacheLock.writeLock().unlock(); + } + } + } + + // Update the cached table objects + private void updateTables(RawStore rawStore, String dbName) { + List<Table> tables = new ArrayList<Table>(); + try { + List<String> tblNames = rawStore.getAllTables(dbName); + for (String tblName : tblNames) { + Table table = + rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + tables.add(table); + } + if (tableCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isTableCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table cache update; the table list we have is dirty."); + return; } + SharedCache.refreshTables(dbName, tables); + } + } catch (MetaException e) { + LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); + } finally { + if (tableCacheLock.isWriteLockedByCurrentThread()) { + tableCacheLock.writeLock().unlock(); } - // Update the cached database objects - SharedCache.refreshDatabases(databases); } - return true; } - private boolean updateTables(RawStore rawStore, List<String> dbNames) { - if (dbNames != null) { - List<Table> tables = new ArrayList<Table>(); - for (String dbName : dbNames) { - try { - List<String> tblNames = rawStore.getAllTables(dbName); - for (String tblName : tblNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; - } - Table table = rawStore.getTable(dbName, tblName); - tables.add(table); - } - // Update the cached database objects - SharedCache.refreshTables(dbName, tables); - for (String tblName : tblNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; - } - List<Partition> partitions = - rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - SharedCache.refreshPartitions(dbName, tblName, partitions); - } - } catch (MetaException | NoSuchObjectException e) { - LOG.error("Updating CachedStore: unable to read table", e); - return false; + // Update the cached partition objects for a table + private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) { + try { + Deadline.startTimer("getPartitions"); + List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + if (partitionCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition cache update; the partition list we have is dirty."); + return; } + SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partitions); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } finally { + if (partitionCacheLock.isWriteLockedByCurrentThread()) { + partitionCacheLock.writeLock().unlock(); } } - return true; } - } - // Interrupt the cache update background thread - // Fire and forget (the master will respond appropriately when it gets a chance) - // All writes to the cache go through synchronized methods, so fire & forget is fine. - private void interruptCacheUpdateMaster() { - if (runningMasterThread.get() != null) { - runningMasterThread.get().interrupt(); + // Update the cached col stats for this table + private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if (tableColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isTableColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table column stats cache update; the table column stats list we " + + "have is dirty."); + return; + } + SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e); + } finally { + if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) { + tableColStatsCacheLock.writeLock().unlock(); + } + } + } + + // Update the cached partition col stats for a table + private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { + try { + Deadline.startTimer("getColStatsForTablePartitions"); + Map<String, List<ColumnStatisticsObj>> colStatsPerPartition = + rawStore.getColStatsForTablePartitions(dbName, tblName); + Deadline.stopTimer(); + if (partitionColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update; the partition column stats " + + "list we have is dirty."); + return; + } + SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions column stats of table: " + + tblName, e); + } finally { + if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionColStatsCacheLock.writeLock().unlock(); + } + } } } @@ -374,11 +520,17 @@ public class CachedStore implements RawStore, Configurable { } @Override - public void createDatabase(Database db) - throws InvalidObjectException, MetaException { + public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - interruptCacheUpdateMaster(); - SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy()); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), + db.deepCopy()); + } finally { + databaseCacheLock.readLock().unlock(); + } } @Override @@ -387,26 +539,38 @@ public class CachedStore implements RawStore, Configurable { if (db == null) { throw new NoSuchObjectException(); } - return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + return db; } @Override public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(dbname); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + } finally { + databaseCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean alterDatabase(String dbName, Database db) - throws NoSuchObjectException, MetaException { + public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException, + MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + } finally { + databaseCacheLock.readLock().unlock(); + } } return succ; } @@ -462,24 +626,45 @@ public class CachedStore implements RawStore, Configurable { } @Override - public void createTable(Table tbl) - throws InvalidObjectException, MetaException { + public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - interruptCacheUpdateMaster(); validateTableType(tbl); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), - HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + } finally { + tableCacheLock.readLock().unlock(); + } } @Override - public boolean dropTable(String dbName, String tableName) - throws MetaException, NoSuchObjectException, InvalidObjectException, - InvalidInputException { + public boolean dropTable(String dbName, String tableName) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tableName); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName)); + // Remove table + try { + // Wait if background table cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + } finally { + tableCacheLock.readLock().unlock(); + } + // Remove table col stats + try { + // Wait if background table col stats cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -496,57 +681,74 @@ public class CachedStore implements RawStore, Configurable { } @Override - public boolean addPartition(Partition part) - throws InvalidObjectException, MetaException { + public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), - HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), + HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + } finally { + partitionCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean addPartitions(String dbName, String tblName, - List<Partition> parts) throws InvalidObjectException, MetaException { + public boolean addPartitions(String dbName, String tblName, List<Partition> parts) + throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { - interruptCacheUpdateMaster(); - for (Partition part : parts) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), - HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (Partition part : parts) { + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } finally { + partitionCacheLock.readLock().unlock(); } } return succ; } @Override - public boolean addPartitions(String dbName, String tblName, - PartitionSpecProxy partitionSpec, boolean ifNotExists) - throws InvalidObjectException, MetaException { + public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, + boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { - interruptCacheUpdateMaster(); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } finally { + partitionCacheLock.readLock().unlock(); } } return succ; } @Override - public Partition getPartition(String dbName, String tableName, - List<String> part_vals) throws MetaException, NoSuchObjectException { - Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + public Partition getPartition(String dbName, String tableName, List<String> part_vals) + throws MetaException, NoSuchObjectException { + Partition part = + SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); if (part != null) { part.unsetPrivileges(); } else { - throw new NoSuchObjectException(); + throw new NoSuchObjectException("partition values=" + part_vals.toString()); } return part; } @@ -559,14 +761,30 @@ public class CachedStore implements RawStore, Configurable { } @Override - public boolean dropPartition(String dbName, String tableName, - List<String> part_vals) throws MetaException, NoSuchObjectException, - InvalidObjectException, InvalidInputException { + public boolean dropPartition(String dbName, String tableName, List<String> part_vals) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + // Remove partition + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -588,10 +806,28 @@ public class CachedStore implements RawStore, Configurable { public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); - interruptCacheUpdateMaster(); validateTableType(newTable); - SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), newTable); + // Update table cache + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), newTable); + } finally { + tableCacheLock.readLock().unlock(); + } + // Update partition cache (key might have changed since table name is a + // component of key) + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), newTable); + } finally { + partitionCacheLock.readLock().unlock(); + } } @Override @@ -685,26 +921,62 @@ public class CachedStore implements RawStore, Configurable { } @Override - public void alterPartition(String dbName, String tblName, - List<String> partVals, Partition newPart) + public void alterPartition(String dbName, String tblName, List<String> partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); - interruptCacheUpdateMaster(); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + // Update partition cache + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Update partition column stats cache + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } } @Override - public void alterPartitions(String dbName, String tblName, - List<List<String>> partValsList, List<Partition> newParts) - throws InvalidObjectException, MetaException { + public void alterPartitions(String dbName, String tblName, List<List<String>> partValsList, + List<Partition> newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); - interruptCacheUpdateMaster(); - for (int i=0;i<partValsList.size();i++) { - List<String> partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + // Update partition cache + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (int i = 0; i < partValsList.size(); i++) { + List<String> partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + } finally { + partitionCacheLock.readLock().unlock(); + } + // Update partition column stats cache + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + for (int i = 0; i < partValsList.size(); i++) { + List<String> partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + } finally { + partitionColStatsCacheLock.readLock().unlock(); } } @@ -1095,55 +1367,199 @@ public class CachedStore implements RawStore, Configurable { @Override public boolean updateTableColumnStatistics(ColumnStatistics colStats) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updateTableColumnStatistics(colStats); if (succ) { - SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj()); + String dbName = colStats.getStatsDesc().getDbName(); + String tableName = colStats.getStatsDesc().getTableName(); + List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); + Table tbl = getTable(dbName, tableName); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); + + // Update table + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), tbl); + } finally { + tableCacheLock.readLock().unlock(); + } + + // Update table col stats + try { + // Wait if background cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), statsObjs); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, - List<String> partVals) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); + public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + List<String> colNames) throws MetaException, NoSuchObjectException { + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); + for (String colName : colNames) { + String colStatsCacheKey = + CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), colName); + ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey); + if (colStat != null) { + colStatObjs.add(colStat); + } + } + if (colStatObjs.isEmpty()) { + return null; + } else { + return new ColumnStatistics(csd, colStatObjs); + } + } + + @Override + public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); if (succ) { - SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj()); + try { + // Wait if background cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), colName); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @Override - public ColumnStatistics getTableColumnStatistics(String dbName, - String tableName, List<String> colName) - throws MetaException, NoSuchObjectException { - return rawStore.getTableColumnStatistics(dbName, tableName, colName); + public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); + if (succ) { + String dbName = colStats.getStatsDesc().getDbName(); + String tableName = colStats.getStatsDesc().getTableName(); + List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); + Partition part = getPartition(dbName, tableName, partVals); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + + // Update partition + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), partVals, part); + } finally { + partitionCacheLock.readLock().unlock(); + } + + // Update partition column stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.updatePartitionColStatsInCache( + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + colStats.getStatsObj()); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } + return succ; } @Override - public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, - String tblName, List<String> partNames, List<String> colNames) - throws MetaException, NoSuchObjectException { + // TODO: calculate from cached values. + // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache. + public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName, + List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, - String tableName, String partName, List<String> partVals, String colName) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { - return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, + List<String> partVals, String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + boolean succ = + rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + if (succ) { + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), partVals, colName); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } + return succ; } @Override - public boolean deleteTableColumnStatistics(String dbName, String tableName, - String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { - return rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames, + List<String> colNames) throws MetaException, NoSuchObjectException { + List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size()); + for (String colName : colNames) { + ColumnStatisticsObj colStat = + mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNames, colName); + if (colStat == null) { + // Stop and fall back to underlying RawStore + colStats = null; + break; + } else { + colStats.add(colStat); + } + } + if (colStats == null) { + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } else { + return new AggrStats(colStats, partNames.size()); + } + } + + private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, + List<String> partNames, String colName) throws MetaException { + ColumnStatisticsObj colStats = null; + for (String partName : partNames) { + String colStatsCacheKey = + CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = + SharedCache.getCachedPartitionColStats(colStatsCacheKey); + if (colStatsForPart == null) { + // we don't have stats for all the partitions + // logic for extrapolation hasn't been added to CacheStore + // So stop now, and lets fallback to underlying RawStore + return null; + } + if (colStats == null) { + colStats = colStatsForPart; + } else { + ColumnStatsMerger merger = + ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart); + merger.merge(colStats, colStatsForPart); + } + } + return colStats; } @Override @@ -1209,14 +1625,34 @@ public class CachedStore implements RawStore, Configurable { } @Override - public void dropPartitions(String dbName, String tblName, - List<String> partNames) throws MetaException, NoSuchObjectException { + public void dropPartitions(String dbName, String tblName, List<String> partNames) + throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); - interruptCacheUpdateMaster(); - for (String partName : partNames) { - List<String> vals = partNameToVals(partName); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), vals); + // Remove partitions + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (String partName : partNames) { + List<String> vals = partNameToVals(partName); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), vals); + } + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + for (String partName : partNames) { + List<String> part_vals = partNameToVals(partName); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part_vals); + } + } finally { + partitionColStatsCacheLock.readLock().unlock(); } } @@ -1326,130 +1762,6 @@ public class CachedStore implements RawStore, Configurable { } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, - List<String> partNames, List<String> colNames) - throws MetaException, NoSuchObjectException { - List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size()); - for (String colName : colNames) { - colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partNames, colName)); - } - // TODO: revisit the partitions not found case for extrapolation - return new AggrStats(colStats, partNames.size()); - } - - private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, - List<String> partNames, String colName) throws MetaException { - ColumnStatisticsObj colStats = null; - for (String partName : partNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); - ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats( - colStatsCacheKey); - if (colStats == null) { - colStats = colStatsForPart; - } else { - colStats = mergeColStatsObj(colStats, colStatsForPart); - } - } - return colStats; - } - - private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1, - ColumnStatisticsObj colStats2) throws MetaException { - if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType())) - && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) { - throw new MetaException("Can't merge column stats for two partitions for different columns."); - } - ColumnStatisticsData csd = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(), - colStats1.getColType(), csd); - ColumnStatisticsData csData1 = colStats1.getStatsData(); - ColumnStatisticsData csData2 = colStats2.getStatsData(); - String colType = colStats1.getColType().toLowerCase(); - if (colType.equals("boolean")) { - BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); - boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses() - + csData2.getBooleanStats().getNumFalses()); - boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues() - + csData2.getBooleanStats().getNumTrues()); - boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls() - + csData2.getBooleanStats().getNumNulls()); - csd.setBooleanStats(boolStats); - } else if (colType.equals("string") || colType.startsWith("varchar") - || colType.startsWith("char")) { - StringColumnStatsData stringStats = new StringColumnStatsData(); - stringStats.setNumNulls(csData1.getStringStats().getNumNulls() - + csData2.getStringStats().getNumNulls()); - stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2 - .getStringStats().getAvgColLen())); - stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2 - .getStringStats().getMaxColLen())); - stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats() - .getNumDVs())); - csd.setStringStats(stringStats); - } else if (colType.equals("binary")) { - BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); - binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls() - + csData2.getBinaryStats().getNumNulls()); - binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2 - .getBinaryStats().getAvgColLen())); - binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2 - .getBinaryStats().getMaxColLen())); - csd.setBinaryStats(binaryStats); - } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint") - || colType.equals("tinyint") || colType.equals("timestamp")) { - LongColumnStatsData longStats = new LongColumnStatsData(); - longStats.setNumNulls(csData1.getLongStats().getNumNulls() - + csData2.getLongStats().getNumNulls()); - longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats() - .getHighValue())); - longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats() - .getLowValue())); - longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats() - .getNumDVs())); - csd.setLongStats(longStats); - } else if (colType.equals("date")) { - DateColumnStatsData dateStats = new DateColumnStatsData(); - dateStats.setNumNulls(csData1.getDateStats().getNumNulls() - + csData2.getDateStats().getNumNulls()); - dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue() - .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch()))); - dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue() - .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch()))); - dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats() - .getNumDVs())); - csd.setDateStats(dateStats); - } else if (colType.equals("double") || colType.equals("float")) { - DoubleColumnStatsData doubleStats = new DoubleColumnStatsData(); - doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls() - + csData2.getDoubleStats().getNumNulls()); - doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2 - .getDoubleStats().getHighValue())); - doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2 - .getDoubleStats().getLowValue())); - doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats() - .getNumDVs())); - csd.setDoubleStats(doubleStats); - } else if (colType.startsWith("decimal")) { - DecimalColumnStatsData decimalStats = new DecimalColumnStatsData(); - decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls() - + csData2.getDecimalStats().getNumNulls()); - Decimal high = (csData1.getDecimalStats().getHighValue() - .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats() - .getHighValue() : csData2.getDecimalStats().getHighValue(); - decimalStats.setHighValue(high); - Decimal low = (csData1.getDecimalStats().getLowValue() - .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats() - .getLowValue() : csData2.getDecimalStats().getLowValue(); - decimalStats.setLowValue(low); - decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2 - .getDecimalStats().getNumDVs())); - csd.setDecimalStats(decimalStats); - } - return cso; - } - - @Override public NotificationEventResponse getNextNotification( NotificationEventRequest rqst) { return rawStore.getNextNotification(rqst); @@ -1565,10 +1877,9 @@ public class CachedStore implements RawStore, Configurable { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions( - String dbName, String tableName) - throws MetaException, NoSuchObjectException { - return rawStore.getAggrColStatsForTablePartitions(dbName, tableName); + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + return rawStore.getColStatsForTablePartitions(dbName, tableName); } public RawStore getRawStore() {