http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
deleted file mode 100644
index e713de0..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.cache;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.StatObjectConverter;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import 
org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
-import org.apache.hive.common.util.HiveStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class SharedCache {
-  private Map<String, Database> databaseCache = new TreeMap<String, 
Database>();
-  private Map<String, TableWrapper> tableCache = new TreeMap<String, 
TableWrapper>();
-  private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
-  private Map<String, ColumnStatisticsObj> partitionColStatsCache = new 
TreeMap<>();
-  private Map<String, ColumnStatisticsObj> tableColStatsCache = new 
TreeMap<>();
-  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new 
HashMap<>();
-  private static MessageDigest md;
-
-  private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
-
-  static {
-    try {
-      md = MessageDigest.getInstance("MD5");
-    } catch (NoSuchAlgorithmException e) {
-      throw new RuntimeException("should not happen", e);
-    }
-  }
-
-  public synchronized Database getDatabaseFromCache(String name) {
-    return 
databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
-  }
-
-  public synchronized void addDatabaseToCache(String dbName, Database db) {
-    Database dbCopy = db.deepCopy();
-    dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName));
-    databaseCache.put(dbName, dbCopy);
-  }
-
-  public synchronized void removeDatabaseFromCache(String dbName) {
-    databaseCache.remove(dbName);
-  }
-
-  public synchronized List<String> listCachedDatabases() {
-    return new ArrayList<String>(databaseCache.keySet());
-  }
-
-  public synchronized void alterDatabaseInCache(String dbName, Database newDb) 
{
-    removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
-    addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), 
newDb.deepCopy());
-  }
-
-  public synchronized int getCachedDatabaseCount() {
-    return databaseCache.size();
-  }
-
-  public synchronized Table getTableFromCache(String dbName, String tableName) 
{
-    TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, 
tableName));
-    if (tblWrapper == null) {
-      return null;
-    }
-    Table t = CacheUtils.assemble(tblWrapper, this);
-    return t;
-  }
-
-  public synchronized void addTableToCache(String dbName, String tblName, 
Table tbl) {
-    Table tblCopy = tbl.deepCopy();
-    tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
-    tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
-    if (tblCopy.getPartitionKeys() != null) {
-      for (FieldSchema fs : tblCopy.getPartitionKeys()) {
-        fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
-      }
-    }
-    TableWrapper wrapper;
-    if (tbl.getSd() != null) {
-      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
-      StorageDescriptor sd = tbl.getSd();
-      increSd(sd, sdHash);
-      tblCopy.setSd(null);
-      wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), 
sd.getParameters());
-    } else {
-      wrapper = new TableWrapper(tblCopy, null, null, null);
-    }
-    tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
-  }
-
-  public synchronized void removeTableFromCache(String dbName, String tblName) 
{
-    TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, 
tblName));
-    byte[] sdHash = tblWrapper.getSdHash();
-    if (sdHash!=null) {
-      decrSd(sdHash);
-    }
-  }
-
-  public synchronized ColumnStatisticsObj getCachedTableColStats(String 
colStatsCacheKey) {
-    return 
tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
-  }
-
-  public synchronized void removeTableColStatsFromCache(String dbName, String 
tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        tableColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
-      }
-    }
-  }
-
-  public synchronized void removeTableColStatsFromCache(String dbName, String 
tblName,
-      String colName) {
-    tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
-  }
-
-  public synchronized void updateTableColStatsInCache(String dbName, String 
tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, 
colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        LOG.debug("CachedStore: updating table column stats for column: " + 
colStatObj.getColName()
-            + ", of table: " + tableName + " and database: " + dbName);
-        // Update existing stat object's field
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        tableColStatsCache.put(key, colStatObj);
-      }
-    }
-  }
-
-  public synchronized void alterTableInCache(String dbName, String tblName, 
Table newTable) {
-    removeTableFromCache(dbName, tblName);
-    addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
-        HiveStringUtils.normalizeIdentifier(newTable.getTableName()), 
newTable);
-  }
-
-  public synchronized void alterTableInPartitionCache(String dbName, String 
tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      for (Partition part : partitions) {
-        removePartitionFromCache(part.getDbName(), part.getTableName(), 
part.getValues());
-        
part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName()));
-        
part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName()));
-        
addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
-            HiveStringUtils.normalizeIdentifier(newTable.getTableName()), 
part);
-      }
-    }
-  }
-
-  public synchronized void alterTableInTableColStatsCache(String dbName, 
String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
-      String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, 
tblName);
-      Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-          tableColStatsCache.entrySet().iterator();
-      Map<String, ColumnStatisticsObj> newTableColStats =
-          new HashMap<String, ColumnStatisticsObj>();
-      while (iterator.hasNext()) {
-        Entry<String, ColumnStatisticsObj> entry = iterator.next();
-        String key = entry.getKey();
-        ColumnStatisticsObj colStatObj = entry.getValue();
-        if 
(key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
-          String[] decomposedKey = CacheUtils.splitTableColStats(key);
-          String newKey = CacheUtils.buildKey(decomposedKey[0], 
decomposedKey[1], decomposedKey[2]);
-          newTableColStats.put(newKey, colStatObj);
-          iterator.remove();
-        }
-      }
-      tableColStatsCache.putAll(newTableColStats);
-    }
-  }
-
-  public synchronized void alterTableInPartitionColStatsCache(String dbName, 
String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || 
!tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      Map<String, ColumnStatisticsObj> newPartitionColStats =
-          new HashMap<String, ColumnStatisticsObj>();
-      for (Partition part : partitions) {
-        String oldPartialPartitionKey =
-            CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
-        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-            partitionColStatsCache.entrySet().iterator();
-        while (iterator.hasNext()) {
-          Entry<String, ColumnStatisticsObj> entry = iterator.next();
-          String key = entry.getKey();
-          ColumnStatisticsObj colStatObj = entry.getValue();
-          if 
(key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-            Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-            String newKey =
-                CacheUtils.buildKey((String) decomposedKey[0], (String) 
decomposedKey[1],
-                    (List<String>) decomposedKey[2], (String) 
decomposedKey[3]);
-            newPartitionColStats.put(newKey, colStatObj);
-            iterator.remove();
-          }
-        }
-      }
-      partitionColStatsCache.putAll(newPartitionColStats);
-    }
-  }
-
-  public synchronized int getCachedTableCount() {
-    return tableCache.size();
-  }
-
-  public synchronized List<Table> listCachedTables(String dbName) {
-    List<Table> tables = new ArrayList<Table>();
-    for (TableWrapper wrapper : tableCache.values()) {
-      if (wrapper.getTable().getDbName().equals(dbName)) {
-        tables.add(CacheUtils.assemble(wrapper, this));
-      }
-    }
-    return tables;
-  }
-
-  public synchronized List<TableMeta> getTableMeta(String dbNames, String 
tableNames, List<String> tableTypes) {
-    List<TableMeta> tableMetas = new ArrayList<TableMeta>();
-    for (String dbName : listCachedDatabases()) {
-      if (CacheUtils.matches(dbName, dbNames)) {
-        for (Table table : listCachedTables(dbName)) {
-          if (CacheUtils.matches(table.getTableName(), tableNames)) {
-            if (tableTypes==null || tableTypes.contains(table.getTableType())) 
{
-              TableMeta metaData = new TableMeta(
-                  dbName, table.getTableName(), table.getTableType());
-                metaData.setComments(table.getParameters().get("comment"));
-                tableMetas.add(metaData);
-            }
-          }
-        }
-      }
-    }
-    return tableMetas;
-  }
-
-  public synchronized void addPartitionToCache(String dbName, String tblName, 
Partition part) {
-    Partition partCopy = part.deepCopy();
-    PartitionWrapper wrapper;
-    if (part.getSd()!=null) {
-      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
-      StorageDescriptor sd = part.getSd();
-      increSd(sd, sdHash);
-      partCopy.setSd(null);
-      wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), 
sd.getParameters());
-    } else {
-      wrapper = new PartitionWrapper(partCopy, null, null, null);
-    }
-    partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), 
wrapper);
-  }
-
-  public synchronized Partition getPartitionFromCache(String key) {
-    PartitionWrapper wrapper = partitionCache.get(key);
-    if (wrapper == null) {
-      return null;
-    }
-    Partition p = CacheUtils.assemble(wrapper, this);
-    return p;
-  }
-
-  public synchronized Partition getPartitionFromCache(String dbName, String 
tblName, List<String> part_vals) {
-    return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, 
part_vals));
-  }
-
-  public synchronized boolean existPartitionFromCache(String dbName, String 
tblName, List<String> part_vals) {
-    return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, 
part_vals));
-  }
-
-  public synchronized Partition removePartitionFromCache(String dbName, String 
tblName,
-      List<String> part_vals) {
-    PartitionWrapper wrapper =
-        partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
-    if (wrapper.getSdHash() != null) {
-      decrSd(wrapper.getSdHash());
-    }
-    return wrapper.getPartition();
-  }
-
-  // Remove cached column stats for all partitions of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
-      }
-    }
-  }
-
-  // Remove cached column stats for a particular partition of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName,
-      List<String> partVals) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, 
partVals);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
-      }
-    }
-  }
-
-  // Remove cached column stats for a particular partition and a particular 
column of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, 
String tblName,
-      List<String> partVals, String colName) {
-    partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, 
partVals, colName));
-  }
-
-  public synchronized List<Partition> listCachedPartitions(String dbName, 
String tblName, int max) {
-    List<Partition> partitions = new ArrayList<Partition>();
-    int count = 0;
-    for (PartitionWrapper wrapper : partitionCache.values()) {
-      if (wrapper.getPartition().getDbName().equals(dbName)
-          && wrapper.getPartition().getTableName().equals(tblName)
-          && (max == -1 || count < max)) {
-        partitions.add(CacheUtils.assemble(wrapper, this));
-        count++;
-      }
-    }
-    return partitions;
-  }
-
-  public synchronized void alterPartitionInCache(String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
-    removePartitionFromCache(dbName, tblName, partVals);
-    
addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
-        HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
-  }
-
-  public synchronized void alterPartitionInColStatsCache(String dbName, String 
tblName,
-      List<String> partVals, Partition newPart) {
-    String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, 
tblName, partVals);
-    Map<String, ColumnStatisticsObj> newPartitionColStats =
-        new HashMap<String, ColumnStatisticsObj>();
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      ColumnStatisticsObj colStatObj = entry.getValue();
-      if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-        Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-        String newKey =
-            
CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
-                HiveStringUtils.normalizeIdentifier(newPart.getTableName()), 
newPart.getValues(),
-                (String) decomposedKey[3]);
-        newPartitionColStats.put(newKey, colStatObj);
-        iterator.remove();
-      }
-    }
-    partitionColStatsCache.putAll(newPartitionColStats);
-  }
-
-  public synchronized void updatePartitionColStatsInCache(String dbName, 
String tableName,
-      List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
-    for (ColumnStatisticsObj colStatObj : colStatsObjs) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, partVals, 
colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        // Update existing stat object's field
-        LOG.debug("CachedStore: updating partition column stats for column: "
-            + colStatObj.getColName() + ", of table: " + tableName + " and 
database: " + dbName);
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        partitionColStatsCache.put(key, colStatObj);
-      }
-    }
-  }
-
-  public synchronized int getCachedPartitionCount() {
-    return partitionCache.size();
-  }
-
-  public synchronized ColumnStatisticsObj getCachedPartitionColStats(String 
key) {
-    return 
partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
-  }
-
-  public synchronized void addPartitionColStatsToCache(String dbName, String 
tableName,
-      Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) {
-    for (Map.Entry<String, List<ColumnStatisticsObj>> entry : 
colStatsPerPartition.entrySet()) {
-      String partName = entry.getKey();
-      try {
-        List<String> partVals = Warehouse.getPartValuesFromPartName(partName);
-        for (ColumnStatisticsObj colStatObj : entry.getValue()) {
-          String key = CacheUtils.buildKey(dbName, tableName, partVals, 
colStatObj.getColName());
-          partitionColStatsCache.put(key, colStatObj);
-        }
-      } catch (MetaException e) {
-        LOG.info("Unable to add partition: " + partName + " to SharedCache", 
e);
-      }
-    }
-  }
-
-  public synchronized void refreshPartitionColStats(String dbName, String 
tableName,
-      Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) {
-    LOG.debug("CachedStore: updating cached partition column stats objects for 
database: " + dbName
-        + " and table: " + tableName);
-    removePartitionColStatsFromCache(dbName, tableName);
-    addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition);
-  }
-
-  public synchronized void addTableColStatsToCache(String dbName, String 
tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      String key = CacheUtils.buildKey(dbName, tableName, 
colStatObj.getColName());
-      tableColStatsCache.put(key, colStatObj);
-    }
-  }
-
-  public synchronized void refreshTableColStats(String dbName, String 
tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    LOG.debug("CachedStore: updating cached table column stats objects for 
database: " + dbName
-        + " and table: " + tableName);
-    // Remove all old cache entries for this table
-    removeTableColStatsFromCache(dbName, tableName);
-    // Add new entries to cache
-    addTableColStatsToCache(dbName, tableName, colStatsForTable);
-  }
-
-  public void increSd(StorageDescriptor sd, byte[] sdHash) {
-    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
-    if (sdCache.containsKey(byteArray)) {
-      sdCache.get(byteArray).refCount++;
-    } else {
-      StorageDescriptor sdToCache = sd.deepCopy();
-      sdToCache.setLocation(null);
-      sdToCache.setParameters(null);
-      sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1));
-    }
-  }
-
-  public void decrSd(byte[] sdHash) {
-    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
-    StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
-    sdWrapper.refCount--;
-    if (sdWrapper.getRefCount() == 0) {
-      sdCache.remove(byteArray);
-    }
-  }
-
-  public StorageDescriptor getSdFromCache(byte[] sdHash) {
-    StorageDescriptorWrapper sdWrapper = sdCache.get(new 
ByteArrayWrapper(sdHash));
-    return sdWrapper.getSd();
-  }
-
-  // Replace databases in databaseCache with the new list
-  public synchronized void refreshDatabases(List<Database> databases) {
-    LOG.debug("CachedStore: updating cached database objects");
-    for (String dbName : listCachedDatabases()) {
-      removeDatabaseFromCache(dbName);
-    }
-    for (Database db : databases) {
-      addDatabaseToCache(db.getName(), db);
-    }
-  }
-
-  // Replace tables in tableCache with the new list
-  public synchronized void refreshTables(String dbName, List<Table> tables) {
-    LOG.debug("CachedStore: updating cached table objects for database: " + 
dbName);
-    for (Table tbl : listCachedTables(dbName)) {
-      removeTableFromCache(dbName, tbl.getTableName());
-    }
-    for (Table tbl : tables) {
-      addTableToCache(dbName, tbl.getTableName(), tbl);
-    }
-  }
-
-  public synchronized void refreshPartitions(String dbName, String tblName,
-      List<Partition> partitions) {
-    LOG.debug("CachedStore: updating cached partition objects for database: " 
+ dbName
-        + " and table: " + tblName);
-    Iterator<Entry<String, PartitionWrapper>> iterator = 
partitionCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      PartitionWrapper partitionWrapper = iterator.next().getValue();
-      if (partitionWrapper.getPartition().getDbName().equals(dbName)
-          && partitionWrapper.getPartition().getTableName().equals(tblName)) {
-        iterator.remove();
-      }
-    }
-    for (Partition part : partitions) {
-      addPartitionToCache(dbName, tblName, part);
-    }
-  }
-
-  @VisibleForTesting
-  Map<String, Database> getDatabaseCache() {
-    return databaseCache;
-  }
-
-  @VisibleForTesting
-  Map<String, TableWrapper> getTableCache() {
-    return tableCache;
-  }
-
-  @VisibleForTesting
-  Map<String, PartitionWrapper> getPartitionCache() {
-    return partitionCache;
-  }
-
-  @VisibleForTesting
-  Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
-    return sdCache;
-  }
-
-  @VisibleForTesting
-  Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
-    return partitionColStatsCache;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
deleted file mode 100644
index e6c836b..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-
-public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {
-
-  @Override
-  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
-      List<ColumnStatistics> css) throws MetaException {
-    ColumnStatisticsObj statsObj = null;
-    BinaryColumnStatsData aggregateData = null;
-    String colType = null;
-    for (ColumnStatistics cs : css) {
-      if (cs.getStatsObjSize() != 1) {
-        throw new MetaException(
-            "The number of columns should be exactly one in aggrStats, but 
found "
-                + cs.getStatsObjSize());
-      }
-      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-      if (statsObj == null) {
-        colType = cso.getColType();
-        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
-            .getStatsData().getSetField());
-      }
-      BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats();
-      if (aggregateData == null) {
-        aggregateData = newData.deepCopy();
-      } else {
-        aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), 
newData.getMaxColLen()));
-        aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), 
newData.getAvgColLen()));
-        aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-      }
-    }
-    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    columnStatisticsData.setBinaryStats(aggregateData);
-    statsObj.setStatsData(columnStatisticsData);
-    return statsObj;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
deleted file mode 100644
index a34bc9f..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-
-public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {
-
-  @Override
-  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
-      List<ColumnStatistics> css) throws MetaException {
-    ColumnStatisticsObj statsObj = null;
-    BooleanColumnStatsData aggregateData = null;
-    String colType = null;
-    for (ColumnStatistics cs : css) {
-      if (cs.getStatsObjSize() != 1) {
-        throw new MetaException(
-            "The number of columns should be exactly one in aggrStats, but 
found "
-                + cs.getStatsObjSize());
-      }
-      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-      if (statsObj == null) {
-        colType = cso.getColType();
-        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
-            .getStatsData().getSetField());
-      }
-      BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats();
-      if (aggregateData == null) {
-        aggregateData = newData.deepCopy();
-      } else {
-        aggregateData.setNumTrues(aggregateData.getNumTrues() + 
newData.getNumTrues());
-        aggregateData.setNumFalses(aggregateData.getNumFalses() + 
newData.getNumFalses());
-        aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-      }
-    }
-    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    columnStatisticsData.setBooleanStats(aggregateData);
-    statsObj.setStatsData(columnStatisticsData);
-    return statsObj;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
deleted file mode 100644
index a52e5e5..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-
-public abstract class ColumnStatsAggregator {
-  public boolean useDensityFunctionForNDVEstimation;
-  public double ndvTuner;
-  public abstract ColumnStatisticsObj aggregate(String colName, List<String> 
partNames,
-      List<ColumnStatistics> css) throws MetaException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
deleted file mode 100644
index dfae708..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
-
-public class ColumnStatsAggregatorFactory {
-
-  private ColumnStatsAggregatorFactory() {
-  }
-
-  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner) {
-    ColumnStatsAggregator agg;
-    switch (type) {
-    case BOOLEAN_STATS:
-      agg = new BooleanColumnStatsAggregator();
-      break;
-    case LONG_STATS:
-      agg = new LongColumnStatsAggregator();
-      break;
-    case DATE_STATS:
-      agg = new DateColumnStatsAggregator();
-      break;
-    case DOUBLE_STATS:
-      agg = new DoubleColumnStatsAggregator();
-      break;
-    case STRING_STATS:
-      agg = new StringColumnStatsAggregator();
-      break;
-    case BINARY_STATS:
-      agg = new BinaryColumnStatsAggregator();
-      break;
-    case DECIMAL_STATS:
-      agg = new DecimalColumnStatsAggregator();
-      break;
-    default:
-      throw new RuntimeException("Woh, bad.  Unknown stats type " + 
type.toString());
-    }
-    agg.useDensityFunctionForNDVEstimation = 
useDensityFunctionForNDVEstimation;
-    agg.ndvTuner = ndvTuner;
-    return agg;
-  }
-
-  public static ColumnStatisticsObj newColumnStaticsObj(String colName, String 
colType, _Fields type) {
-    ColumnStatisticsObj cso = new ColumnStatisticsObj();
-    ColumnStatisticsData csd = new ColumnStatisticsData();
-    cso.setColName(colName);
-    cso.setColType(colType);
-    switch (type) {
-    case BOOLEAN_STATS:
-      csd.setBooleanStats(new BooleanColumnStatsData());
-      break;
-
-    case LONG_STATS:
-      csd.setLongStats(new LongColumnStatsDataInspector());
-      break;
-
-    case DATE_STATS:
-      csd.setDateStats(new DateColumnStatsDataInspector());
-      break;
-
-    case DOUBLE_STATS:
-      csd.setDoubleStats(new DoubleColumnStatsDataInspector());
-      break;
-
-    case STRING_STATS:
-      csd.setStringStats(new StringColumnStatsDataInspector());
-      break;
-
-    case BINARY_STATS:
-      csd.setBinaryStats(new BinaryColumnStatsData());
-      break;
-
-    case DECIMAL_STATS:
-      csd.setDecimalStats(new DecimalColumnStatsDataInspector());
-      break;
-
-    default:
-      throw new RuntimeException("Woh, bad.  Unknown stats type!");
-    }
-
-    cso.setStatsData(csd);
-    return cso;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
deleted file mode 100644
index ee95396..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DateColumnStatsAggregator extends ColumnStatsAggregator implements
-    IExtrapolatePartStatus {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DateColumnStatsAggregator.class);
-
-  @Override
-  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
-      List<ColumnStatistics> css) throws MetaException {
-    ColumnStatisticsObj statsObj = null;
-
-    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
-    // bitvectors
-    boolean doAllPartitionContainStats = partNames.size() == css.size();
-    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
-    NumDistinctValueEstimator ndvEstimator = null;
-    String colType = null;
-    for (ColumnStatistics cs : css) {
-      if (cs.getStatsObjSize() != 1) {
-        throw new MetaException(
-            "The number of columns should be exactly one in aggrStats, but 
found "
-                + cs.getStatsObjSize());
-      }
-      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-      if (statsObj == null) {
-        colType = cso.getColType();
-        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
-            .getStatsData().getSetField());
-      }
-      DateColumnStatsDataInspector dateColumnStats =
-          (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
-      if (dateColumnStats.getNdvEstimator() == null) {
-        ndvEstimator = null;
-        break;
-      } else {
-        // check if all of the bit vectors can merge
-        NumDistinctValueEstimator estimator = 
dateColumnStats.getNdvEstimator();
-        if (ndvEstimator == null) {
-          ndvEstimator = estimator;
-        } else {
-          if (ndvEstimator.canMerge(estimator)) {
-            continue;
-          } else {
-            ndvEstimator = null;
-            break;
-          }
-        }
-      }
-    }
-    if (ndvEstimator != null) {
-      ndvEstimator = NumDistinctValueEstimatorFactory
-          .getEmptyNumDistinctValueEstimator(ndvEstimator);
-    }
-    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
-    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    if (doAllPartitionContainStats || css.size() < 2) {
-      DateColumnStatsDataInspector aggregateData = null;
-      long lowerBound = 0;
-      long higherBound = 0;
-      double densityAvgSum = 0.0;
-      for (ColumnStatistics cs : css) {
-        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-        DateColumnStatsDataInspector newData =
-            (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
-        lowerBound = Math.max(lowerBound, newData.getNumDVs());
-        higherBound += newData.getNumDVs();
-        densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue()))
-            / newData.getNumDVs();
-        if (ndvEstimator != null) {
-          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
-        }
-        if (aggregateData == null) {
-          aggregateData = newData.deepCopy();
-        } else {
-          aggregateData.setLowValue(min(aggregateData.getLowValue(), 
newData.getLowValue()));
-          aggregateData
-              .setHighValue(max(aggregateData.getHighValue(), 
newData.getHighValue()));
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
-        }
-      }
-      if (ndvEstimator != null) {
-        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
-        // use uniform distribution assumption because we can merge bitvectors
-        // to get a good estimation.
-        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      } else {
-        long estimation;
-        if (useDensityFunctionForNDVEstimation) {
-          // We have estimation, lowerbound and higherbound. We use estimation
-          // if it is between lowerbound and higherbound.
-          double densityAvg = densityAvgSum / partNames.size();
-          estimation = (long) (diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()) / densityAvg);
-          if (estimation < lowerBound) {
-            estimation = lowerBound;
-          } else if (estimation > higherBound) {
-            estimation = higherBound;
-          }
-        } else {
-          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
-        }
-        aggregateData.setNumDVs(estimation);
-      }
-      columnStatisticsData.setDateStats(aggregateData);
-    } else {
-      // we need extrapolation
-      LOG.debug("start extrapolation for " + colName);
-
-      Map<String, Integer> indexMap = new HashMap<String, Integer>();
-      for (int index = 0; index < partNames.size(); index++) {
-        indexMap.put(partNames.get(index), index);
-      }
-      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
-      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, 
ColumnStatisticsData>();
-      // while we scan the css, we also get the densityAvg, lowerbound and
-      // higerbound when useDensityFunctionForNDVEstimation is true.
-      double densityAvgSum = 0.0;
-      if (ndvEstimator == null) {
-        // if not every partition uses bitvector for ndv, we just fall back to
-        // the traditional extrapolation methods.
-        for (ColumnStatistics cs : css) {
-          String partName = cs.getStatsDesc().getPartName();
-          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-          DateColumnStatsData newData = cso.getStatsData().getDateStats();
-          if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(newData.getHighValue(), 
newData.getLowValue()) / newData.getNumDVs();
-          }
-          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
-          adjustedStatsMap.put(partName, cso.getStatsData());
-        }
-      } else {
-        // we first merge all the adjacent bitvectors that we could merge and
-        // derive new partition names and index.
-        StringBuilder pseudoPartName = new StringBuilder();
-        double pseudoIndexSum = 0;
-        int length = 0;
-        int curIndex = -1;
-        DateColumnStatsDataInspector aggregateData = null;
-        for (ColumnStatistics cs : css) {
-          String partName = cs.getStatsDesc().getPartName();
-          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-          DateColumnStatsDataInspector newData =
-              (DateColumnStatsDataInspector) cso.getStatsData().getDateStats();
-          // newData.isSetBitVectors() should be true for sure because we
-          // already checked it before.
-          if (indexMap.get(partName) != curIndex) {
-            // There is bitvector, but it is not adjacent to the previous ones.
-            if (length > 0) {
-              // we have to set ndv
-              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
-              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-              ColumnStatisticsData csd = new ColumnStatisticsData();
-              csd.setDateStats(aggregateData);
-              adjustedStatsMap.put(pseudoPartName.toString(), csd);
-              if (useDensityFunctionForNDVEstimation) {
-                densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
-                    / aggregateData.getNumDVs();
-              }
-              // reset everything
-              pseudoPartName = new StringBuilder();
-              pseudoIndexSum = 0;
-              length = 0;
-              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
-            }
-            aggregateData = null;
-          }
-          curIndex = indexMap.get(partName);
-          pseudoPartName.append(partName);
-          pseudoIndexSum += curIndex;
-          length++;
-          curIndex++;
-          if (aggregateData == null) {
-            aggregateData = newData.deepCopy();
-          } else {
-            aggregateData.setLowValue(min(aggregateData.getLowValue(), 
newData.getLowValue()));
-            aggregateData.setHighValue(max(aggregateData.getHighValue(), 
newData.getHighValue()));
-            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-          }
-          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
-        }
-        if (length > 0) {
-          // we have to set ndv
-          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
-          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-          ColumnStatisticsData csd = new ColumnStatisticsData();
-          csd.setDateStats(aggregateData);
-          adjustedStatsMap.put(pseudoPartName.toString(), csd);
-          if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
-                / aggregateData.getNumDVs();
-          }
-        }
-      }
-      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
-          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
-    }
-    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
-        columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), 
css.size());
-    statsObj.setStatsData(columnStatisticsData);
-    return statsObj;
-  }
-
-  private long diff(Date d1, Date d2) {
-    return d1.getDaysSinceEpoch() - d2.getDaysSinceEpoch();
-  }
-
-  private Date min(Date d1, Date d2) {
-    return d1.compareTo(d2) < 0 ? d1 : d2;
-  }
-
-  private Date max(Date d1, Date d2) {
-    return d1.compareTo(d2) < 0 ? d2 : d1;
-  }
-
-  @Override
-  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
-      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
-      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
-    int rightBorderInd = numParts;
-    DateColumnStatsDataInspector extrapolateDateData = new 
DateColumnStatsDataInspector();
-    Map<String, DateColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
-    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
-      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getDateStats());
-    }
-    List<Map.Entry<String, DateColumnStatsData>> list = new 
LinkedList<Map.Entry<String, DateColumnStatsData>>(
-        extractedAdjustedStatsMap.entrySet());
-    // get the lowValue
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DateColumnStatsData> o1,
-          Map.Entry<String, DateColumnStatsData> o2) {
-        return 
o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
-      }
-    });
-    double minInd = adjustedIndexMap.get(list.get(0).getKey());
-    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-    long lowValue = 0;
-    long min = list.get(0).getValue().getLowValue().getDaysSinceEpoch();
-    long max = list.get(list.size() - 
1).getValue().getLowValue().getDaysSinceEpoch();
-    if (minInd == maxInd) {
-      lowValue = min;
-    } else if (minInd < maxInd) {
-      // left border is the min
-      lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
-    } else {
-      // right border is the min
-      lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / 
(minInd - maxInd));
-    }
-
-    // get the highValue
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DateColumnStatsData> o1,
-          Map.Entry<String, DateColumnStatsData> o2) {
-        return 
o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
-      }
-    });
-    minInd = adjustedIndexMap.get(list.get(0).getKey());
-    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-    long highValue = 0;
-    min = list.get(0).getValue().getHighValue().getDaysSinceEpoch();
-    max = list.get(list.size() - 
1).getValue().getHighValue().getDaysSinceEpoch();
-    if (minInd == maxInd) {
-      highValue = min;
-    } else if (minInd < maxInd) {
-      // right border is the max
-      highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / 
(maxInd - minInd));
-    } else {
-      // left border is the max
-      highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
-    }
-
-    // get the #nulls
-    long numNulls = 0;
-    for (Map.Entry<String, DateColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
-      numNulls += entry.getValue().getNumNulls();
-    }
-    // we scale up sumNulls based on the number of partitions
-    numNulls = numNulls * numParts / numPartsWithStats;
-
-    // get the ndv
-    long ndv = 0;
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DateColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DateColumnStatsData> o1,
-          Map.Entry<String, DateColumnStatsData> o2) {
-        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
-      }
-    });
-    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
-    long higherBound = 0;
-    for (Map.Entry<String, DateColumnStatsData> entry : list) {
-      higherBound += entry.getValue().getNumDVs();
-    }
-    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
-      ndv = (long) ((highValue - lowValue) / densityAvg);
-      if (ndv < lowerBound) {
-        ndv = lowerBound;
-      } else if (ndv > higherBound) {
-        ndv = higherBound;
-      }
-    } else {
-      minInd = adjustedIndexMap.get(list.get(0).getKey());
-      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-      min = list.get(0).getValue().getNumDVs();
-      max = list.get(list.size() - 1).getValue().getNumDVs();
-      if (minInd == maxInd) {
-        ndv = min;
-      } else if (minInd < maxInd) {
-        // right border is the max
-        ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd 
- minInd));
-      } else {
-        // left border is the max
-        ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
-      }
-    }
-    extrapolateDateData.setLowValue(new Date(lowValue));
-    extrapolateDateData.setHighValue(new Date(highValue));
-    extrapolateDateData.setNumNulls(numNulls);
-    extrapolateDateData.setNumDVs(ndv);
-    extrapolateData.setDateStats(extrapolateDateData);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
deleted file mode 100644
index 284c12c..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.columnstats.aggr;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.StatObjectConverter;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DecimalColumnStatsAggregator extends ColumnStatsAggregator 
implements
-    IExtrapolatePartStatus {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DecimalColumnStatsAggregator.class);
-
-  @Override
-  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
-      List<ColumnStatistics> css) throws MetaException {
-    ColumnStatisticsObj statsObj = null;
-
-    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
-    // bitvectors
-    boolean doAllPartitionContainStats = partNames.size() == css.size();
-    LOG.debug("doAllPartitionContainStats for " + colName + " is " + 
doAllPartitionContainStats);
-    NumDistinctValueEstimator ndvEstimator = null;
-    String colType = null;
-    for (ColumnStatistics cs : css) {
-      if (cs.getStatsObjSize() != 1) {
-        throw new MetaException(
-            "The number of columns should be exactly one in aggrStats, but 
found "
-                + cs.getStatsObjSize());
-      }
-      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-      if (statsObj == null) {
-        colType = cso.getColType();
-        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, 
colType, cso
-            .getStatsData().getSetField());
-      }
-      DecimalColumnStatsDataInspector decimalColumnStatsData =
-          (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
-      if (decimalColumnStatsData.getNdvEstimator() == null) {
-        ndvEstimator = null;
-        break;
-      } else {
-        // check if all of the bit vectors can merge
-        NumDistinctValueEstimator estimator = 
decimalColumnStatsData.getNdvEstimator();
-        if (ndvEstimator == null) {
-          ndvEstimator = estimator;
-        } else {
-          if (ndvEstimator.canMerge(estimator)) {
-            continue;
-          } else {
-            ndvEstimator = null;
-            break;
-          }
-        }
-      }
-    }
-    if (ndvEstimator != null) {
-      ndvEstimator = NumDistinctValueEstimatorFactory
-          .getEmptyNumDistinctValueEstimator(ndvEstimator);
-    }
-    LOG.debug("all of the bit vectors can merge for " + colName + " is " + 
(ndvEstimator != null));
-    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    if (doAllPartitionContainStats || css.size() < 2) {
-      DecimalColumnStatsDataInspector aggregateData = null;
-      long lowerBound = 0;
-      long higherBound = 0;
-      double densityAvgSum = 0.0;
-      for (ColumnStatistics cs : css) {
-        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-        DecimalColumnStatsDataInspector newData =
-            (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
-        lowerBound = Math.max(lowerBound, newData.getNumDVs());
-        higherBound += newData.getNumDVs();
-        densityAvgSum += 
(MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils
-            .decimalToDouble(newData.getLowValue())) / newData.getNumDVs();
-        if (ndvEstimator != null) {
-          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
-        }
-        if (aggregateData == null) {
-          aggregateData = newData.deepCopy();
-        } else {
-          if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < 
MetaStoreUtils
-              .decimalToDouble(newData.getLowValue())) {
-            aggregateData.setLowValue(aggregateData.getLowValue());
-          } else {
-            aggregateData.setLowValue(newData.getLowValue());
-          }
-          if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > 
MetaStoreUtils
-              .decimalToDouble(newData.getHighValue())) {
-            aggregateData.setHighValue(aggregateData.getHighValue());
-          } else {
-            aggregateData.setHighValue(newData.getHighValue());
-          }
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), 
newData.getNumDVs()));
-        }
-      }
-      if (ndvEstimator != null) {
-        // if all the ColumnStatisticsObjs contain bitvectors, we do not need 
to
-        // use uniform distribution assumption because we can merge bitvectors
-        // to get a good estimation.
-        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      } else {
-        long estimation;
-        if (useDensityFunctionForNDVEstimation) {
-          // We have estimation, lowerbound and higherbound. We use estimation
-          // if it is between lowerbound and higherbound.
-          double densityAvg = densityAvgSum / partNames.size();
-          estimation = (long) 
((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
-              .decimalToDouble(aggregateData.getLowValue())) / densityAvg);
-          if (estimation < lowerBound) {
-            estimation = lowerBound;
-          } else if (estimation > higherBound) {
-            estimation = higherBound;
-          }
-        } else {
-          estimation = (long) (lowerBound + (higherBound - lowerBound) * 
ndvTuner);
-        }
-        aggregateData.setNumDVs(estimation);
-      }
-      columnStatisticsData.setDecimalStats(aggregateData);
-    } else {
-      // we need extrapolation
-      LOG.debug("start extrapolation for " + colName);
-      Map<String, Integer> indexMap = new HashMap<String, Integer>();
-      for (int index = 0; index < partNames.size(); index++) {
-        indexMap.put(partNames.get(index), index);
-      }
-      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
-      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, 
ColumnStatisticsData>();
-      // while we scan the css, we also get the densityAvg, lowerbound and
-      // higerbound when useDensityFunctionForNDVEstimation is true.
-      double densityAvgSum = 0.0;
-      if (ndvEstimator == null) {
-        // if not every partition uses bitvector for ndv, we just fall back to
-        // the traditional extrapolation methods.
-        for (ColumnStatistics cs : css) {
-          String partName = cs.getStatsDesc().getPartName();
-          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-          DecimalColumnStatsData newData = 
cso.getStatsData().getDecimalStats();
-          if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += 
(MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils
-                .decimalToDouble(newData.getLowValue())) / newData.getNumDVs();
-          }
-          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
-          adjustedStatsMap.put(partName, cso.getStatsData());
-        }
-      } else {
-        // we first merge all the adjacent bitvectors that we could merge and
-        // derive new partition names and index.
-        StringBuilder pseudoPartName = new StringBuilder();
-        double pseudoIndexSum = 0;
-        int length = 0;
-        int curIndex = -1;
-        DecimalColumnStatsDataInspector aggregateData = null;
-        for (ColumnStatistics cs : css) {
-          String partName = cs.getStatsDesc().getPartName();
-          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-          DecimalColumnStatsDataInspector newData =
-              (DecimalColumnStatsDataInspector) 
cso.getStatsData().getDecimalStats();
-          // newData.isSetBitVectors() should be true for sure because we
-          // already checked it before.
-          if (indexMap.get(partName) != curIndex) {
-            // There is bitvector, but it is not adjacent to the previous ones.
-            if (length > 0) {
-              // we have to set ndv
-              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
-              
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-              ColumnStatisticsData csd = new ColumnStatisticsData();
-              csd.setDecimalStats(aggregateData);
-              adjustedStatsMap.put(pseudoPartName.toString(), csd);
-              if (useDensityFunctionForNDVEstimation) {
-                densityAvgSum += 
(MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
-                    .decimalToDouble(aggregateData.getLowValue())) / 
aggregateData.getNumDVs();
-              }
-              // reset everything
-              pseudoPartName = new StringBuilder();
-              pseudoIndexSum = 0;
-              length = 0;
-              ndvEstimator = 
NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
-            }
-            aggregateData = null;
-          }
-          curIndex = indexMap.get(partName);
-          pseudoPartName.append(partName);
-          pseudoIndexSum += curIndex;
-          length++;
-          curIndex++;
-          if (aggregateData == null) {
-            aggregateData = newData.deepCopy();
-          } else {
-            if (MetaStoreUtils.decimalToDouble(aggregateData.getLowValue()) < 
MetaStoreUtils
-                .decimalToDouble(newData.getLowValue())) {
-              aggregateData.setLowValue(aggregateData.getLowValue());
-            } else {
-              aggregateData.setLowValue(newData.getLowValue());
-            }
-            if (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) > 
MetaStoreUtils
-                .decimalToDouble(newData.getHighValue())) {
-              aggregateData.setHighValue(aggregateData.getHighValue());
-            } else {
-              aggregateData.setHighValue(newData.getHighValue());
-            }
-            aggregateData.setNumNulls(aggregateData.getNumNulls() + 
newData.getNumNulls());
-          }
-          ndvEstimator.mergeEstimators(newData.getNdvEstimator());
-        }
-        if (length > 0) {
-          // we have to set ndv
-          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / 
length);
-          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-          ColumnStatisticsData csd = new ColumnStatisticsData();
-          csd.setDecimalStats(aggregateData);
-          adjustedStatsMap.put(pseudoPartName.toString(), csd);
-          if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += 
(MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils
-                .decimalToDouble(aggregateData.getLowValue())) / 
aggregateData.getNumDVs();
-          }
-        }
-      }
-      extrapolate(columnStatisticsData, partNames.size(), css.size(), 
adjustedIndexMap,
-          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
-    }
-    LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # 
of partitions found: {}", colName,
-        columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), 
css.size());
-    statsObj.setStatsData(columnStatisticsData);
-    return statsObj;
-  }
-
-  @Override
-  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
-      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
-      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
-    int rightBorderInd = numParts;
-    DecimalColumnStatsDataInspector extrapolateDecimalData = new 
DecimalColumnStatsDataInspector();
-    Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new 
HashMap<>();
-    for (Map.Entry<String, ColumnStatisticsData> entry : 
adjustedStatsMap.entrySet()) {
-      extractedAdjustedStatsMap.put(entry.getKey(), 
entry.getValue().getDecimalStats());
-    }
-    List<Map.Entry<String, DecimalColumnStatsData>> list = new 
LinkedList<Map.Entry<String, DecimalColumnStatsData>>(
-        extractedAdjustedStatsMap.entrySet());
-    // get the lowValue
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
-          Map.Entry<String, DecimalColumnStatsData> o2) {
-        return 
o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
-      }
-    });
-    double minInd = adjustedIndexMap.get(list.get(0).getKey());
-    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-    double lowValue = 0;
-    double min = 
MetaStoreUtils.decimalToDouble(list.get(0).getValue().getLowValue());
-    double max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 
1).getValue().getLowValue());
-    if (minInd == maxInd) {
-      lowValue = min;
-    } else if (minInd < maxInd) {
-      // left border is the min
-      lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
-    } else {
-      // right border is the min
-      lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - 
maxInd));
-    }
-
-    // get the highValue
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
-          Map.Entry<String, DecimalColumnStatsData> o2) {
-        return 
o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
-      }
-    });
-    minInd = adjustedIndexMap.get(list.get(0).getKey());
-    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-    double highValue = 0;
-    min = 
MetaStoreUtils.decimalToDouble(list.get(0).getValue().getHighValue());
-    max = MetaStoreUtils.decimalToDouble(list.get(list.size() - 
1).getValue().getHighValue());
-    if (minInd == maxInd) {
-      highValue = min;
-    } else if (minInd < maxInd) {
-      // right border is the max
-      highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - 
minInd));
-    } else {
-      // left border is the max
-      highValue = (min + (max - min) * minInd / (minInd - maxInd));
-    }
-
-    // get the #nulls
-    long numNulls = 0;
-    for (Map.Entry<String, DecimalColumnStatsData> entry : 
extractedAdjustedStatsMap.entrySet()) {
-      numNulls += entry.getValue().getNumNulls();
-    }
-    // we scale up sumNulls based on the number of partitions
-    numNulls = numNulls * numParts / numPartsWithStats;
-
-    // get the ndv
-    long ndv = 0;
-    long ndvMin = 0;
-    long ndvMax = 0;
-    Collections.sort(list, new Comparator<Map.Entry<String, 
DecimalColumnStatsData>>() {
-      @Override
-      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
-          Map.Entry<String, DecimalColumnStatsData> o2) {
-        return Long.compare(o1.getValue().getNumDVs(), 
o2.getValue().getNumDVs());
-      }
-    });
-    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
-    long higherBound = 0;
-    for (Map.Entry<String, DecimalColumnStatsData> entry : list) {
-      higherBound += entry.getValue().getNumDVs();
-    }
-    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
-      ndv = (long) ((highValue - lowValue) / densityAvg);
-      if (ndv < lowerBound) {
-        ndv = lowerBound;
-      } else if (ndv > higherBound) {
-        ndv = higherBound;
-      }
-    } else {
-      minInd = adjustedIndexMap.get(list.get(0).getKey());
-      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
-      ndvMin = list.get(0).getValue().getNumDVs();
-      ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
-      if (minInd == maxInd) {
-        ndv = ndvMin;
-      } else if (minInd < maxInd) {
-        // right border is the max
-        ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / 
(maxInd - minInd));
-      } else {
-        // left border is the max
-        ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
-      }
-    }
-    
extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String
-        .valueOf(lowValue)));
-    
extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String
-        .valueOf(highValue)));
-    extrapolateDecimalData.setNumNulls(numNulls);
-    extrapolateDecimalData.setNumDVs(ndv);
-    extrapolateData.setDecimalStats(extrapolateDecimalData);
-  }
-}

Reply via email to