http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java deleted file mode 100644 index e713de0..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ /dev/null @@ -1,559 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.cache; - -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TableMeta; -import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; -import org.apache.hive.common.util.HiveStringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -public class SharedCache { - private Map<String, Database> databaseCache = new TreeMap<String, Database>(); - private Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>(); - private Map<String, PartitionWrapper> partitionCache = new TreeMap<>(); - private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>(); - private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>(); - private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>(); - private static MessageDigest md; - - private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); - - static { - try { - md = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("should not happen", e); - } - } - - public synchronized Database getDatabaseFromCache(String name) { - return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; - } - - public synchronized void addDatabaseToCache(String dbName, Database db) { - Database dbCopy = db.deepCopy(); - dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName)); - databaseCache.put(dbName, dbCopy); - } - - public synchronized void removeDatabaseFromCache(String dbName) { - databaseCache.remove(dbName); - } - - public synchronized List<String> listCachedDatabases() { - return new ArrayList<String>(databaseCache.keySet()); - } - - public synchronized void alterDatabaseInCache(String dbName, Database newDb) { - removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); - addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); - } - - public synchronized int getCachedDatabaseCount() { - return databaseCache.size(); - } - - public synchronized Table getTableFromCache(String dbName, String tableName) { - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); - if (tblWrapper == null) { - return null; - } - Table t = CacheUtils.assemble(tblWrapper, this); - return t; - } - - public synchronized void addTableToCache(String dbName, String tblName, Table tbl) { - Table tblCopy = tbl.deepCopy(); - tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); - tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); - if (tblCopy.getPartitionKeys() != null) { - for (FieldSchema fs : tblCopy.getPartitionKeys()) { - fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); - } - } - TableWrapper wrapper; - if (tbl.getSd() != null) { - byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md); - StorageDescriptor sd = tbl.getSd(); - increSd(sd, sdHash); - tblCopy.setSd(null); - wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); - } else { - wrapper = new TableWrapper(tblCopy, null, null, null); - } - tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); - } - - public synchronized void removeTableFromCache(String dbName, String tblName) { - TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); - byte[] sdHash = tblWrapper.getSdHash(); - if (sdHash!=null) { - decrSd(sdHash); - } - } - - public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { - return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null; - } - - public synchronized void removeTableColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - tableColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); - } - } - } - - public synchronized void removeTableColStatsFromCache(String dbName, String tblName, - String colName) { - tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); - } - - public synchronized void updateTableColStatsInCache(String dbName, String tableName, - List<ColumnStatisticsObj> colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); - if (oldStatsObj != null) { - LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() - + ", of table: " + tableName + " and database: " + dbName); - // Update existing stat object's field - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - tableColStatsCache.put(key, colStatObj); - } - } - } - - public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { - removeTableFromCache(dbName, tblName); - addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), - HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); - } - - public synchronized void alterTableInPartitionCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); - for (Partition part : partitions) { - removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); - part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName())); - part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName())); - addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), - HiveStringUtils.normalizeIdentifier(newTable.getTableName()), part); - } - } - } - - public synchronized void alterTableInTableColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - tableColStatsCache.entrySet().iterator(); - Map<String, ColumnStatisticsObj> newTableColStats = - new HashMap<String, ColumnStatisticsObj>(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { - String[] decomposedKey = CacheUtils.splitTableColStats(key); - String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); - newTableColStats.put(newKey, colStatObj); - iterator.remove(); - } - } - tableColStatsCache.putAll(newTableColStats); - } - } - - public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); - Map<String, ColumnStatisticsObj> newPartitionColStats = - new HashMap<String, ColumnStatisticsObj>(); - for (Partition part : partitions) { - String oldPartialPartitionKey = - CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - String newKey = - CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], - (List<String>) decomposedKey[2], (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); - } - } - } - partitionColStatsCache.putAll(newPartitionColStats); - } - } - - public synchronized int getCachedTableCount() { - return tableCache.size(); - } - - public synchronized List<Table> listCachedTables(String dbName) { - List<Table> tables = new ArrayList<Table>(); - for (TableWrapper wrapper : tableCache.values()) { - if (wrapper.getTable().getDbName().equals(dbName)) { - tables.add(CacheUtils.assemble(wrapper, this)); - } - } - return tables; - } - - public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { - List<TableMeta> tableMetas = new ArrayList<TableMeta>(); - for (String dbName : listCachedDatabases()) { - if (CacheUtils.matches(dbName, dbNames)) { - for (Table table : listCachedTables(dbName)) { - if (CacheUtils.matches(table.getTableName(), tableNames)) { - if (tableTypes==null || tableTypes.contains(table.getTableType())) { - TableMeta metaData = new TableMeta( - dbName, table.getTableName(), table.getTableType()); - metaData.setComments(table.getParameters().get("comment")); - tableMetas.add(metaData); - } - } - } - } - } - return tableMetas; - } - - public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { - Partition partCopy = part.deepCopy(); - PartitionWrapper wrapper; - if (part.getSd()!=null) { - byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); - StorageDescriptor sd = part.getSd(); - increSd(sd, sdHash); - partCopy.setSd(null); - wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); - } else { - wrapper = new PartitionWrapper(partCopy, null, null, null); - } - partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); - } - - public synchronized Partition getPartitionFromCache(String key) { - PartitionWrapper wrapper = partitionCache.get(key); - if (wrapper == null) { - return null; - } - Partition p = CacheUtils.assemble(wrapper, this); - return p; - } - - public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) { - return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); - } - - public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) { - return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); - } - - public synchronized Partition removePartitionFromCache(String dbName, String tblName, - List<String> part_vals) { - PartitionWrapper wrapper = - partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash() != null) { - decrSd(wrapper.getSdHash()); - } - return wrapper.getPartition(); - } - - // Remove cached column stats for all partitions of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); - } - } - } - - // Remove cached column stats for a particular partition of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List<String> partVals) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); - } - } - } - - // Remove cached column stats for a particular partition and a particular column of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List<String> partVals, String colName) { - partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); - } - - public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { - List<Partition> partitions = new ArrayList<Partition>(); - int count = 0; - for (PartitionWrapper wrapper : partitionCache.values()) { - if (wrapper.getPartition().getDbName().equals(dbName) - && wrapper.getPartition().getTableName().equals(tblName) - && (max == -1 || count < max)) { - partitions.add(CacheUtils.assemble(wrapper, this)); - count++; - } - } - return partitions; - } - - public synchronized void alterPartitionInCache(String dbName, String tblName, - List<String> partVals, Partition newPart) { - removePartitionFromCache(dbName, tblName, partVals); - addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), - HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); - } - - public synchronized void alterPartitionInColStatsCache(String dbName, String tblName, - List<String> partVals, Partition newPart) { - String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Map<String, ColumnStatisticsObj> newPartitionColStats = - new HashMap<String, ColumnStatisticsObj>(); - Iterator<Entry<String, ColumnStatisticsObj>> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, ColumnStatisticsObj> entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - String newKey = - CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), - HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), - (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); - } - } - partitionColStatsCache.putAll(newPartitionColStats); - } - - public synchronized void updatePartitionColStatsInCache(String dbName, String tableName, - List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) { - for (ColumnStatisticsObj colStatObj : colStatsObjs) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); - if (oldStatsObj != null) { - // Update existing stat object's field - LOG.debug("CachedStore: updating partition column stats for column: " - + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - partitionColStatsCache.put(key, colStatObj); - } - } - } - - public synchronized int getCachedPartitionCount() { - return partitionCache.size(); - } - - public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { - return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; - } - - public synchronized void addPartitionColStatsToCache(String dbName, String tableName, - Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) { - for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) { - String partName = entry.getKey(); - try { - List<String> partVals = Warehouse.getPartValuesFromPartName(partName); - for (ColumnStatisticsObj colStatObj : entry.getValue()) { - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - partitionColStatsCache.put(key, colStatObj); - } - } catch (MetaException e) { - LOG.info("Unable to add partition: " + partName + " to SharedCache", e); - } - } - } - - public synchronized void refreshPartitionColStats(String dbName, String tableName, - Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) { - LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName - + " and table: " + tableName); - removePartitionColStatsFromCache(dbName, tableName); - addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); - } - - public synchronized void addTableColStatsToCache(String dbName, String tableName, - List<ColumnStatisticsObj> colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - tableColStatsCache.put(key, colStatObj); - } - } - - public synchronized void refreshTableColStats(String dbName, String tableName, - List<ColumnStatisticsObj> colStatsForTable) { - LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName - + " and table: " + tableName); - // Remove all old cache entries for this table - removeTableColStatsFromCache(dbName, tableName); - // Add new entries to cache - addTableColStatsToCache(dbName, tableName, colStatsForTable); - } - - public void increSd(StorageDescriptor sd, byte[] sdHash) { - ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); - if (sdCache.containsKey(byteArray)) { - sdCache.get(byteArray).refCount++; - } else { - StorageDescriptor sdToCache = sd.deepCopy(); - sdToCache.setLocation(null); - sdToCache.setParameters(null); - sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1)); - } - } - - public void decrSd(byte[] sdHash) { - ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); - StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); - sdWrapper.refCount--; - if (sdWrapper.getRefCount() == 0) { - sdCache.remove(byteArray); - } - } - - public StorageDescriptor getSdFromCache(byte[] sdHash) { - StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); - return sdWrapper.getSd(); - } - - // Replace databases in databaseCache with the new list - public synchronized void refreshDatabases(List<Database> databases) { - LOG.debug("CachedStore: updating cached database objects"); - for (String dbName : listCachedDatabases()) { - removeDatabaseFromCache(dbName); - } - for (Database db : databases) { - addDatabaseToCache(db.getName(), db); - } - } - - // Replace tables in tableCache with the new list - public synchronized void refreshTables(String dbName, List<Table> tables) { - LOG.debug("CachedStore: updating cached table objects for database: " + dbName); - for (Table tbl : listCachedTables(dbName)) { - removeTableFromCache(dbName, tbl.getTableName()); - } - for (Table tbl : tables) { - addTableToCache(dbName, tbl.getTableName(), tbl); - } - } - - public synchronized void refreshPartitions(String dbName, String tblName, - List<Partition> partitions) { - LOG.debug("CachedStore: updating cached partition objects for database: " + dbName - + " and table: " + tblName); - Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator(); - while (iterator.hasNext()) { - PartitionWrapper partitionWrapper = iterator.next().getValue(); - if (partitionWrapper.getPartition().getDbName().equals(dbName) - && partitionWrapper.getPartition().getTableName().equals(tblName)) { - iterator.remove(); - } - } - for (Partition part : partitions) { - addPartitionToCache(dbName, tblName, part); - } - } - - @VisibleForTesting - Map<String, Database> getDatabaseCache() { - return databaseCache; - } - - @VisibleForTesting - Map<String, TableWrapper> getTableCache() { - return tableCache; - } - - @VisibleForTesting - Map<String, PartitionWrapper> getPartitionCache() { - return partitionCache; - } - - @VisibleForTesting - Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() { - return sdCache; - } - - @VisibleForTesting - Map<String, ColumnStatisticsObj> getPartitionColStatsCache() { - return partitionColStatsCache; - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java deleted file mode 100644 index e6c836b..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.MetaException; - -public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { - - @Override - public ColumnStatisticsObj aggregate(String colName, List<String> partNames, - List<ColumnStatistics> css) throws MetaException { - ColumnStatisticsObj statsObj = null; - BinaryColumnStatsData aggregateData = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - if (statsObj == null) { - colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); - } - BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats(); - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); - aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - } - } - ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - columnStatisticsData.setBinaryStats(aggregateData); - statsObj.setStatsData(columnStatisticsData); - return statsObj; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java deleted file mode 100644 index a34bc9f..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.MetaException; - -public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { - - @Override - public ColumnStatisticsObj aggregate(String colName, List<String> partNames, - List<ColumnStatistics> css) throws MetaException { - ColumnStatisticsObj statsObj = null; - BooleanColumnStatsData aggregateData = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - if (statsObj == null) { - colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); - } - BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats(); - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues()); - aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses()); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - } - } - ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - columnStatisticsData.setBooleanStats(aggregateData); - statsObj.setStatsData(columnStatisticsData); - return statsObj; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java deleted file mode 100644 index a52e5e5..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.MetaException; - -public abstract class ColumnStatsAggregator { - public boolean useDensityFunctionForNDVEstimation; - public double ndvTuner; - public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames, - List<ColumnStatistics> css) throws MetaException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java deleted file mode 100644 index dfae708..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; - -public class ColumnStatsAggregatorFactory { - - private ColumnStatsAggregatorFactory() { - } - - public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) { - ColumnStatsAggregator agg; - switch (type) { - case BOOLEAN_STATS: - agg = new BooleanColumnStatsAggregator(); - break; - case LONG_STATS: - agg = new LongColumnStatsAggregator(); - break; - case DATE_STATS: - agg = new DateColumnStatsAggregator(); - break; - case DOUBLE_STATS: - agg = new DoubleColumnStatsAggregator(); - break; - case STRING_STATS: - agg = new StringColumnStatsAggregator(); - break; - case BINARY_STATS: - agg = new BinaryColumnStatsAggregator(); - break; - case DECIMAL_STATS: - agg = new DecimalColumnStatsAggregator(); - break; - default: - throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString()); - } - agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation; - agg.ndvTuner = ndvTuner; - return agg; - } - - public static ColumnStatisticsObj newColumnStaticsObj(String colName, String colType, _Fields type) { - ColumnStatisticsObj cso = new ColumnStatisticsObj(); - ColumnStatisticsData csd = new ColumnStatisticsData(); - cso.setColName(colName); - cso.setColType(colType); - switch (type) { - case BOOLEAN_STATS: - csd.setBooleanStats(new BooleanColumnStatsData()); - break; - - case LONG_STATS: - csd.setLongStats(new LongColumnStatsDataInspector()); - break; - - case DATE_STATS: - csd.setDateStats(new DateColumnStatsDataInspector()); - break; - - case DOUBLE_STATS: - csd.setDoubleStats(new DoubleColumnStatsDataInspector()); - break; - - case STRING_STATS: - csd.setStringStats(new StringColumnStatsDataInspector()); - break; - - case BINARY_STATS: - csd.setBinaryStats(new BinaryColumnStatsData()); - break; - - case DECIMAL_STATS: - csd.setDecimalStats(new DecimalColumnStatsDataInspector()); - break; - - default: - throw new RuntimeException("Woh, bad. Unknown stats type!"); - } - - cso.setStatsData(csd); - return cso; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java deleted file mode 100644 index ee95396..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; -import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.Date; -import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DateColumnStatsAggregator extends ColumnStatsAggregator implements - IExtrapolatePartStatus { - - private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class); - - @Override - public ColumnStatisticsObj aggregate(String colName, List<String> partNames, - List<ColumnStatistics> css) throws MetaException { - ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - if (statsObj == null) { - colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); - } - DateColumnStatsDataInspector dateColumnStats = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); - if (dateColumnStats.getNdvEstimator() == null) { - ndvEstimator = null; - break; - } else { - // check if all of the bit vectors can merge - NumDistinctValueEstimator estimator = dateColumnStats.getNdvEstimator(); - if (ndvEstimator == null) { - ndvEstimator = estimator; - } else { - if (ndvEstimator.canMerge(estimator)) { - continue; - } else { - ndvEstimator = null; - break; - } - } - } - } - if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); - } - LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); - ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { - DateColumnStatsDataInspector aggregateData = null; - long lowerBound = 0; - long higherBound = 0; - double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DateColumnStatsDataInspector newData = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); - lowerBound = Math.max(lowerBound, newData.getNumDVs()); - higherBound += newData.getNumDVs(); - densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) - / newData.getNumDVs(); - if (ndvEstimator != null) { - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); - } - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData - .setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); - } - } - if (ndvEstimator != null) { - // if all the ColumnStatisticsObjs contain bitvectors, we do not need to - // use uniform distribution assumption because we can merge bitvectors - // to get a good estimation. - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - } else { - long estimation; - if (useDensityFunctionForNDVEstimation) { - // We have estimation, lowerbound and higherbound. We use estimation - // if it is between lowerbound and higherbound. - double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) (diff(aggregateData.getHighValue(), aggregateData.getLowValue()) / densityAvg); - if (estimation < lowerBound) { - estimation = lowerBound; - } else if (estimation > higherBound) { - estimation = higherBound; - } - } else { - estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner); - } - aggregateData.setNumDVs(estimation); - } - columnStatisticsData.setDateStats(aggregateData); - } else { - // we need extrapolation - LOG.debug("start extrapolation for " + colName); - - Map<String, Integer> indexMap = new HashMap<String, Integer>(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - Map<String, Double> adjustedIndexMap = new HashMap<String, Double>(); - Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>(); - // while we scan the css, we also get the densityAvg, lowerbound and - // higerbound when useDensityFunctionForNDVEstimation is true. - double densityAvgSum = 0.0; - if (ndvEstimator == null) { - // if not every partition uses bitvector for ndv, we just fall back to - // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DateColumnStatsData newData = cso.getStatsData().getDateStats(); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); - } - adjustedIndexMap.put(partName, (double) indexMap.get(partName)); - adjustedStatsMap.put(partName, cso.getStatsData()); - } - } else { - // we first merge all the adjacent bitvectors that we could merge and - // derive new partition names and index. - StringBuilder pseudoPartName = new StringBuilder(); - double pseudoIndexSum = 0; - int length = 0; - int curIndex = -1; - DateColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DateColumnStatsDataInspector newData = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); - // newData.isSetBitVectors() should be true for sure because we - // already checked it before. - if (indexMap.get(partName) != curIndex) { - // There is bitvector, but it is not adjacent to the previous ones. - if (length > 0) { - // we have to set ndv - adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - ColumnStatisticsData csd = new ColumnStatisticsData(); - csd.setDateStats(aggregateData); - adjustedStatsMap.put(pseudoPartName.toString(), csd); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) - / aggregateData.getNumDVs(); - } - // reset everything - pseudoPartName = new StringBuilder(); - pseudoIndexSum = 0; - length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); - } - aggregateData = null; - } - curIndex = indexMap.get(partName); - pseudoPartName.append(partName); - pseudoIndexSum += curIndex; - length++; - curIndex++; - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData.setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - } - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); - } - if (length > 0) { - // we have to set ndv - adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - ColumnStatisticsData csd = new ColumnStatisticsData(); - csd.setDateStats(aggregateData); - adjustedStatsMap.put(pseudoPartName.toString(), csd); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) - / aggregateData.getNumDVs(); - } - } - } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); - } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), css.size()); - statsObj.setStatsData(columnStatisticsData); - return statsObj; - } - - private long diff(Date d1, Date d2) { - return d1.getDaysSinceEpoch() - d2.getDaysSinceEpoch(); - } - - private Date min(Date d1, Date d2) { - return d1.compareTo(d2) < 0 ? d1 : d2; - } - - private Date max(Date d1, Date d2) { - return d1.compareTo(d2) < 0 ? d2 : d1; - } - - @Override - public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, - int numPartsWithStats, Map<String, Double> adjustedIndexMap, - Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) { - int rightBorderInd = numParts; - DateColumnStatsDataInspector extrapolateDateData = new DateColumnStatsDataInspector(); - Map<String, DateColumnStatsData> extractedAdjustedStatsMap = new HashMap<>(); - for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) { - extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDateStats()); - } - List<Map.Entry<String, DateColumnStatsData>> list = new LinkedList<Map.Entry<String, DateColumnStatsData>>( - extractedAdjustedStatsMap.entrySet()); - // get the lowValue - Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DateColumnStatsData> o1, - Map.Entry<String, DateColumnStatsData> o2) { - return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue()); - } - }); - double minInd = adjustedIndexMap.get(list.get(0).getKey()); - double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - long lowValue = 0; - long min = list.get(0).getValue().getLowValue().getDaysSinceEpoch(); - long max = list.get(list.size() - 1).getValue().getLowValue().getDaysSinceEpoch(); - if (minInd == maxInd) { - lowValue = min; - } else if (minInd < maxInd) { - // left border is the min - lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd)); - } else { - // right border is the min - lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); - } - - // get the highValue - Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DateColumnStatsData> o1, - Map.Entry<String, DateColumnStatsData> o2) { - return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue()); - } - }); - minInd = adjustedIndexMap.get(list.get(0).getKey()); - maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - long highValue = 0; - min = list.get(0).getValue().getHighValue().getDaysSinceEpoch(); - max = list.get(list.size() - 1).getValue().getHighValue().getDaysSinceEpoch(); - if (minInd == maxInd) { - highValue = min; - } else if (minInd < maxInd) { - // right border is the max - highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); - } else { - // left border is the max - highValue = (long) (min + (max - min) * minInd / (minInd - maxInd)); - } - - // get the #nulls - long numNulls = 0; - for (Map.Entry<String, DateColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) { - numNulls += entry.getValue().getNumNulls(); - } - // we scale up sumNulls based on the number of partitions - numNulls = numNulls * numParts / numPartsWithStats; - - // get the ndv - long ndv = 0; - Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DateColumnStatsData> o1, - Map.Entry<String, DateColumnStatsData> o2) { - return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); - } - }); - long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); - long higherBound = 0; - for (Map.Entry<String, DateColumnStatsData> entry : list) { - higherBound += entry.getValue().getNumDVs(); - } - if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { - ndv = (long) ((highValue - lowValue) / densityAvg); - if (ndv < lowerBound) { - ndv = lowerBound; - } else if (ndv > higherBound) { - ndv = higherBound; - } - } else { - minInd = adjustedIndexMap.get(list.get(0).getKey()); - maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - min = list.get(0).getValue().getNumDVs(); - max = list.get(list.size() - 1).getValue().getNumDVs(); - if (minInd == maxInd) { - ndv = min; - } else if (minInd < maxInd) { - // right border is the max - ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); - } else { - // left border is the max - ndv = (long) (min + (max - min) * minInd / (minInd - maxInd)); - } - } - extrapolateDateData.setLowValue(new Date(lowValue)); - extrapolateDateData.setHighValue(new Date(highValue)); - extrapolateDateData.setNumNulls(numNulls); - extrapolateDateData.setNumDVs(ndv); - extrapolateData.setDateStats(extrapolateDateData); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java deleted file mode 100644 index 284c12c..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.columnstats.aggr; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; -import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements - IExtrapolatePartStatus { - - private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class); - - @Override - public ColumnStatisticsObj aggregate(String colName, List<String> partNames, - List<ColumnStatistics> css) throws MetaException { - ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - if (statsObj == null) { - colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); - } - DecimalColumnStatsDataInspector decimalColumnStatsData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); - if (decimalColumnStatsData.getNdvEstimator() == null) { - ndvEstimator = null; - break; - } else { - // check if all of the bit vectors can merge - NumDistinctValueEstimator estimator = decimalColumnStatsData.getNdvEstimator(); - if (ndvEstimator == null) { - ndvEstimator = estimator; - } else { - if (ndvEstimator.canMerge(estimator)) { - continue; - } else { - ndvEstimator = null; - break; - } - } - } - } - if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); - } - LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); - ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { - DecimalColumnStatsDataInspector aggregateData = null; - long lowerBound = 0; - long higherBound = 0; - double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DecimalColumnStatsDataInspector newData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); - lowerBound = Math.max(lowerBound, newData.getNumDVs()); - higherBound += newData.getNumDVs(); - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); - if (ndvEstimator != null) { - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); - } - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < MetaStoreUtils - .decimalToDouble(newData.getLowValue())) { - aggregateData.setLowValue(aggregateData.getLowValue()); - } else { - aggregateData.setLowValue(newData.getLowValue()); - } - if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > MetaStoreUtils - .decimalToDouble(newData.getHighValue())) { - aggregateData.setHighValue(aggregateData.getHighValue()); - } else { - aggregateData.setHighValue(newData.getHighValue()); - } - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); - } - } - if (ndvEstimator != null) { - // if all the ColumnStatisticsObjs contain bitvectors, we do not need to - // use uniform distribution assumption because we can merge bitvectors - // to get a good estimation. - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - } else { - long estimation; - if (useDensityFunctionForNDVEstimation) { - // We have estimation, lowerbound and higherbound. We use estimation - // if it is between lowerbound and higherbound. - double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / densityAvg); - if (estimation < lowerBound) { - estimation = lowerBound; - } else if (estimation > higherBound) { - estimation = higherBound; - } - } else { - estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner); - } - aggregateData.setNumDVs(estimation); - } - columnStatisticsData.setDecimalStats(aggregateData); - } else { - // we need extrapolation - LOG.debug("start extrapolation for " + colName); - Map<String, Integer> indexMap = new HashMap<String, Integer>(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - Map<String, Double> adjustedIndexMap = new HashMap<String, Double>(); - Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>(); - // while we scan the css, we also get the densityAvg, lowerbound and - // higerbound when useDensityFunctionForNDVEstimation is true. - double densityAvgSum = 0.0; - if (ndvEstimator == null) { - // if not every partition uses bitvector for ndv, we just fall back to - // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); - } - adjustedIndexMap.put(partName, (double) indexMap.get(partName)); - adjustedStatsMap.put(partName, cso.getStatsData()); - } - } else { - // we first merge all the adjacent bitvectors that we could merge and - // derive new partition names and index. - StringBuilder pseudoPartName = new StringBuilder(); - double pseudoIndexSum = 0; - int length = 0; - int curIndex = -1; - DecimalColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DecimalColumnStatsDataInspector newData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); - // newData.isSetBitVectors() should be true for sure because we - // already checked it before. - if (indexMap.get(partName) != curIndex) { - // There is bitvector, but it is not adjacent to the previous ones. - if (length > 0) { - // we have to set ndv - adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - ColumnStatisticsData csd = new ColumnStatisticsData(); - csd.setDecimalStats(aggregateData); - adjustedStatsMap.put(pseudoPartName.toString(), csd); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); - } - // reset everything - pseudoPartName = new StringBuilder(); - pseudoIndexSum = 0; - length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); - } - aggregateData = null; - } - curIndex = indexMap.get(partName); - pseudoPartName.append(partName); - pseudoIndexSum += curIndex; - length++; - curIndex++; - if (aggregateData == null) { - aggregateData = newData.deepCopy(); - } else { - if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < MetaStoreUtils - .decimalToDouble(newData.getLowValue())) { - aggregateData.setLowValue(aggregateData.getLowValue()); - } else { - aggregateData.setLowValue(newData.getLowValue()); - } - if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > MetaStoreUtils - .decimalToDouble(newData.getHighValue())) { - aggregateData.setHighValue(aggregateData.getHighValue()); - } else { - aggregateData.setHighValue(newData.getHighValue()); - } - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - } - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); - } - if (length > 0) { - // we have to set ndv - adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - ColumnStatisticsData csd = new ColumnStatisticsData(); - csd.setDecimalStats(aggregateData); - adjustedStatsMap.put(pseudoPartName.toString(), csd); - if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); - } - } - } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); - } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), css.size()); - statsObj.setStatsData(columnStatisticsData); - return statsObj; - } - - @Override - public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, - int numPartsWithStats, Map<String, Double> adjustedIndexMap, - Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) { - int rightBorderInd = numParts; - DecimalColumnStatsDataInspector extrapolateDecimalData = new DecimalColumnStatsDataInspector(); - Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new HashMap<>(); - for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) { - extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats()); - } - List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<Map.Entry<String, DecimalColumnStatsData>>( - extractedAdjustedStatsMap.entrySet()); - // get the lowValue - Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DecimalColumnStatsData> o1, - Map.Entry<String, DecimalColumnStatsData> o2) { - return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue()); - } - }); - double minInd = adjustedIndexMap.get(list.get(0).getKey()); - double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - double lowValue = 0; - double min = MetaStoreUtils.decimalToDouble(list.get(0).getValue().getLowValue()); - double max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 1).getValue().getLowValue()); - if (minInd == maxInd) { - lowValue = min; - } else if (minInd < maxInd) { - // left border is the min - lowValue = (max - (max - min) * maxInd / (maxInd - minInd)); - } else { - // right border is the min - lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); - } - - // get the highValue - Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DecimalColumnStatsData> o1, - Map.Entry<String, DecimalColumnStatsData> o2) { - return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue()); - } - }); - minInd = adjustedIndexMap.get(list.get(0).getKey()); - maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - double highValue = 0; - min = MetaStoreUtils.decimalToDouble(list.get(0).getValue().getHighValue()); - max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 1).getValue().getHighValue()); - if (minInd == maxInd) { - highValue = min; - } else if (minInd < maxInd) { - // right border is the max - highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); - } else { - // left border is the max - highValue = (min + (max - min) * minInd / (minInd - maxInd)); - } - - // get the #nulls - long numNulls = 0; - for (Map.Entry<String, DecimalColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) { - numNulls += entry.getValue().getNumNulls(); - } - // we scale up sumNulls based on the number of partitions - numNulls = numNulls * numParts / numPartsWithStats; - - // get the ndv - long ndv = 0; - long ndvMin = 0; - long ndvMax = 0; - Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() { - @Override - public int compare(Map.Entry<String, DecimalColumnStatsData> o1, - Map.Entry<String, DecimalColumnStatsData> o2) { - return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); - } - }); - long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); - long higherBound = 0; - for (Map.Entry<String, DecimalColumnStatsData> entry : list) { - higherBound += entry.getValue().getNumDVs(); - } - if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { - ndv = (long) ((highValue - lowValue) / densityAvg); - if (ndv < lowerBound) { - ndv = lowerBound; - } else if (ndv > higherBound) { - ndv = higherBound; - } - } else { - minInd = adjustedIndexMap.get(list.get(0).getKey()); - maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); - ndvMin = list.get(0).getValue().getNumDVs(); - ndvMax = list.get(list.size() - 1).getValue().getNumDVs(); - if (minInd == maxInd) { - ndv = ndvMin; - } else if (minInd < maxInd) { - // right border is the max - ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd)); - } else { - // left border is the max - ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd)); - } - } - extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String - .valueOf(lowValue))); - extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String - .valueOf(highValue))); - extrapolateDecimalData.setNumNulls(numNulls); - extrapolateDecimalData.setNumDVs(ndv); - extrapolateData.setDecimalStats(extrapolateDecimalData); - } -}
