http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 915bce3..64bc1a2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -58,9 +58,13 @@ import java.util.regex.Pattern;
 
 import javax.jdo.JDOException;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -78,6 +82,7 @@ import 
org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
@@ -114,6 +119,7 @@ import 
org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
 import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -159,10 +165,6 @@ import com.facebook.fb303.fb_status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -170,6 +172,8 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class HiveMetaStore extends ThriftHiveMetastore {
   public static final Logger LOG = 
LoggerFactory.getLogger(HiveMetaStore.class);
+  public static final String PARTITION_NUMBER_EXCEED_LIMIT_MSG =
+      "Number of partitions scanned (=%d) on table '%s' exceeds limit (=%d). 
This is controlled on the metastore server by %s.";
 
   // boolean that tells if the HiveMetaStore (remote) server is being used.
   // Can be used to determine if the calls to metastore api (HMSHandler) are 
being made with
@@ -234,7 +238,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
   public static class HMSHandler extends FacebookBase implements IHMSHandler, 
ThreadLocalRawStore {
     public static final Logger LOG = HiveMetaStore.LOG;
-    private String rawStoreClassName;
     private final HiveConf hiveConf; // stores datastore (jpox) properties,
                                      // right now they come from 
jpox.properties
 
@@ -416,7 +419,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
     @Override
     public void init() throws MetaException {
-      rawStoreClassName = 
hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
       initListeners = MetaStoreUtils.getMetaStoreListeners(
           MetaStoreInitListener.class, hiveConf,
           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
@@ -514,7 +516,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, 
hiveConf);
     }
 
-    private String addPrefix(String s) {
+    private static String addPrefix(String s) {
       return threadLocalId.get() + ": " + s;
     }
 
@@ -589,9 +591,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @InterfaceStability.Evolving
     @Override
     public RawStore getMS() throws MetaException {
+      Configuration conf = getConf();
+      return getMSForConf(conf);
+    }
+
+    public static RawStore getMSForConf(Configuration conf) throws 
MetaException {
       RawStore ms = threadLocalMS.get();
       if (ms == null) {
-        ms = newRawStore();
+        ms = newRawStoreForConf(conf);
         ms.verifySchema();
         threadLocalMS.set(ms);
         ms = threadLocalMS.get();
@@ -608,24 +615,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return txn;
     }
 
-    private RawStore newRawStore() throws MetaException {
-      LOG.info(addPrefix("Opening raw store with implementation class:"
-          + rawStoreClassName));
-      Configuration conf = getConf();
-
+    private static RawStore newRawStoreForConf(Configuration conf) throws 
MetaException {
+      HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
+      String rawStoreClassName = 
hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
+      LOG.info(addPrefix("Opening raw store with implementation class:" + 
rawStoreClassName));
       if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
         LOG.info("Fastpath, skipping raw store proxy");
         try {
-          RawStore rs = ((Class<? extends RawStore>) MetaStoreUtils.getClass(
-              rawStoreClassName)).newInstance();
-          rs.setConf(conf);
+          RawStore rs =
+              ((Class<? extends RawStore>) 
MetaStoreUtils.getClass(rawStoreClassName))
+                  .newInstance();
+          rs.setConf(hiveConf);
           return rs;
         } catch (Exception e) {
           LOG.error("Unable to instantiate raw store directly in fastpath 
mode", e);
           throw new RuntimeException(e);
         }
       }
-
       return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, 
threadLocalId.get());
     }
 
@@ -874,10 +880,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path dbPath = new Path(db.getLocationUri());
       boolean success = false;
       boolean madeDir = false;
+      Map<String, String> transactionalListenersResponses = 
Collections.emptyMap();
       try {
         firePreEvent(new PreCreateDatabaseEvent(db, this));
         if (!wh.isDir(dbPath)) {
-          if (!wh.mkdirs(dbPath, true)) {
+          if (!wh.mkdirs(dbPath)) {
             throw new MetaException("Unable to create database path " + dbPath 
+
                 ", failed to create database " + db.getName());
           }
@@ -886,11 +893,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.openTransaction();
         ms.createDatabase(db);
-        if (transactionalListeners.size() > 0) {
-          CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateDatabase(cde);
-          }
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenersResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_DATABASE,
+                                                    new 
CreateDatabaseEvent(db, true, this));
         }
 
         success = ms.commitTransaction();
@@ -901,8 +909,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(dbPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onCreateDatabase(new CreateDatabaseEvent(db, success, 
this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_DATABASE,
+                                                new CreateDatabaseEvent(db, 
success, this),
+                                                null,
+                                                
transactionalListenersResponses);
         }
       }
     }
@@ -1017,6 +1030,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Database db = null;
       List<Path> tablePaths = new ArrayList<Path>();
       List<Path> partitionPaths = new ArrayList<Path>();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         db = ms.getDatabase(name);
@@ -1099,12 +1113,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.dropDatabase(name)) {
-          if (transactionalListeners.size() > 0) {
-            DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropDatabase(dde);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_DATABASE,
+                                                      new 
DropDatabaseEvent(db, true, this));
           }
+
           success = ms.commitTransaction();
         }
       } finally {
@@ -1126,8 +1141,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           // it is not a terrible thing even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onDropDatabase(new DropDatabaseEvent(db, success, this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_DATABASE,
+                                                new DropDatabaseEvent(db, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -1353,6 +1373,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       }
 
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       Path tblPath = null;
       boolean success = false, madeDir = false;
       try {
@@ -1374,7 +1395,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
           if (tbl.getSd().getLocation() == null
               || tbl.getSd().getLocation().isEmpty()) {
-            tblPath = wh.getTablePath(
+            tblPath = wh.getDefaultTablePath(
                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
           } else {
             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
@@ -1388,7 +1409,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         if (tblPath != null) {
           if (!wh.isDir(tblPath)) {
-            if (!wh.mkdirs(tblPath, true)) {
+            if (!wh.mkdirs(tblPath)) {
               throw new MetaException(tblPath
                   + " is not a directory or unable to create one");
             }
@@ -1413,12 +1434,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
         }
 
-        if (transactionalListeners.size() > 0) {
-          CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, 
this);
-          createTableEvent.setEnvironmentContext(envContext);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateTable(createTableEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_TABLE,
+                                                    new CreateTableEvent(tbl, 
true, this),
+                                                    envContext);
         }
 
         success = ms.commitTransaction();
@@ -1429,11 +1450,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(tblPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          CreateTableEvent createTableEvent =
-              new CreateTableEvent(tbl, success, this);
-          createTableEvent.setEnvironmentContext(envContext);
-          listener.onCreateTable(createTableEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_TABLE,
+                                                new CreateTableEvent(tbl, 
success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -1598,6 +1621,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Path> partPaths = null;
       Table tbl = null;
       boolean ifPurge = false;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop any partitions
@@ -1642,7 +1666,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge, 
deleteData && !isExternal);
         // Drop the partitions and get a list of locations which need to be 
deleted
         partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
             tbl.getPartitionKeys(), deleteData && !isExternal);
@@ -1651,12 +1674,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw new MetaException(indexName == null ? "Unable to drop table " 
+ tableName:
               "Unable to drop index table " + tableName + " for index " + 
indexName);
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, 
deleteData, this);
-            dropTableEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropTable(dropTableEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_TABLE,
+                                                      new DropTableEvent(tbl, 
deleteData, true, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -1671,53 +1694,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           deleteTableData(tblPath, ifPurge);
           // ok even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, 
deleteData, this);
-          dropTableEvent.setEnvironmentContext(envContext);
-          listener.onDropTable(dropTableEvent);
-        }
-      }
-      return success;
-    }
-
-    /**
-     * Will throw MetaException if combination of trash policy/purge can't be 
satisfied
-     * @param pathToData path to data which may potentially be moved to trash
-     * @param objectName db.table, or db.table.part
-     * @param ifPurge if PURGE options is specified
-     */
-    private void checkTrashPurgeCombination(Path pathToData, String 
objectName, boolean ifPurge,
-        boolean deleteData) throws MetaException {
-      // There is no need to check TrashPurgeCombination in following cases 
since Purge/Trash
-      // is not applicable:
-      // a) deleteData is false -- drop an external table
-      // b) pathToData is null -- a view
-      // c) ifPurge is true -- force delete without Trash
-      if (!deleteData || pathToData == null || ifPurge) {
-        return;
-      }
 
-      boolean trashEnabled = false;
-      try {
-        trashEnabled = 0 < hiveConf.getFloat("fs.trash.interval", -1);
-      } catch(NumberFormatException ex) {
-  // nothing to do
-      }
-
-      if (trashEnabled) {
-        try {
-          HadoopShims.HdfsEncryptionShim shim =
-            
ShimLoader.getHadoopShims().createHdfsEncryptionShim(FileSystem.get(hiveConf), 
hiveConf);
-          if (shim.isPathEncrypted(pathToData)) {
-            throw new MetaException("Unable to drop " + objectName + " because 
it is in an encryption zone" +
-              " and trash is enabled.  Use PURGE option to skip trash.");
-          }
-        } catch (IOException ex) {
-          MetaException e = new MetaException(ex.getMessage());
-          e.initCause(ex);
-          throw e;
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_TABLE,
+                                                new DropTableEvent(tbl, 
deleteData, success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
+      return success;
     }
 
     /**
@@ -1883,6 +1869,151 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
     }
 
+    private void updateStatsForTruncate(Map<String,String> props, 
EnvironmentContext environmentContext) {
+      if (null == props) {
+        return;
+      }
+      for (String stat : StatsSetupConst.supportedStats) {
+        String statVal = props.get(stat);
+        if (statVal != null) {
+          //In the case of truncate table, we set the stats to be 0.
+          props.put(stat, "0");
+        }
+      }
+      //first set basic stats to true
+      StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
+      environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, 
StatsSetupConst.TASK);
+      //then invalidate column stats
+      StatsSetupConst.clearColumnStatsState(props);
+      return;
+    }
+
+    private void alterPartitionForTruncate(final RawStore ms,
+                                           final String dbName,
+                                           final String tableName,
+                                           final Table table,
+                                           final Partition partition) throws 
Exception {
+      EnvironmentContext environmentContext = new EnvironmentContext();
+      updateStatsForTruncate(partition.getParameters(), environmentContext);
+
+      if (!transactionalListeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                EventType.ALTER_PARTITION,
+                new AlterPartitionEvent(partition, partition, table, true, 
true, this));
+      }
+
+      if (!listeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(listeners,
+                EventType.ALTER_PARTITION,
+                new AlterPartitionEvent(partition, partition, table, true, 
true, this));
+      }
+
+      alterHandler.alterPartition(ms, wh, dbName, tableName, null, partition, 
environmentContext, this);
+    }
+
+    private void alterTableStatsForTruncate(final RawStore ms,
+                                            final String dbName,
+                                            final String tableName,
+                                            final Table table,
+                                            final List<String> partNames) 
throws Exception {
+      if (partNames == null) {
+        if (0 != table.getPartitionKeysSize()) {
+          for (Partition partition : ms.getPartitions(dbName, tableName, 
Integer.MAX_VALUE)) {
+            alterPartitionForTruncate(ms, dbName, tableName, table, partition);
+          }
+        } else {
+          EnvironmentContext environmentContext = new EnvironmentContext();
+          updateStatsForTruncate(table.getParameters(), environmentContext);
+
+          if (!transactionalListeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                    EventType.ALTER_TABLE,
+                    new AlterTableEvent(table, table, true, true, this));
+          }
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                    EventType.ALTER_TABLE,
+                    new AlterTableEvent(table, table, true, true, this));
+          }
+
+          alterHandler.alterTable(ms, wh, dbName, tableName, table, 
environmentContext, this);
+        }
+      } else {
+        for (Partition partition : ms.getPartitionsByNames(dbName, tableName, 
partNames)) {
+          alterPartitionForTruncate(ms, dbName, tableName, table, partition);
+        }
+      }
+      return;
+    }
+
+    private List<Path> getLocationsForTruncate(final RawStore ms,
+                                               final String dbName,
+                                               final String tableName,
+                                               final Table table,
+                                               final List<String> partNames) 
throws Exception {
+      List<Path> locations = new ArrayList<Path>();
+      if (partNames == null) {
+        if (0 != table.getPartitionKeysSize()) {
+          for (Partition partition : ms.getPartitions(dbName, tableName, 
Integer.MAX_VALUE)) {
+            locations.add(new Path(partition.getSd().getLocation()));
+          }
+        } else {
+          locations.add(new Path(table.getSd().getLocation()));
+        }
+      } else {
+        for (Partition partition : ms.getPartitionsByNames(dbName, tableName, 
partNames)) {
+          locations.add(new Path(partition.getSd().getLocation()));
+        }
+      }
+      return locations;
+    }
+
+    @Override
+    public void truncate_table(final String dbName, final String tableName, 
List<String> partNames)
+      throws NoSuchObjectException, MetaException {
+      try {
+        Table tbl = get_table_core(dbName, tableName);
+        boolean isAutopurge = (tbl.isSetParameters() && 
"true".equalsIgnoreCase(tbl.getParameters().get("auto.purge")));
+
+        // This is not transactional
+        for (Path location : getLocationsForTruncate(getMS(), dbName, 
tableName, tbl, partNames)) {
+          FileSystem fs = location.getFileSystem(getHiveConf());
+          HadoopShims.HdfsEncryptionShim shim
+                  = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, 
getHiveConf());
+          if (!shim.isPathEncrypted(location)) {
+            HdfsUtils.HadoopFileStatus status = new 
HdfsUtils.HadoopFileStatus(getHiveConf(), fs, location);
+            FileStatus targetStatus = fs.getFileStatus(location);
+            String targetGroup = targetStatus == null ? null : 
targetStatus.getGroup();
+            wh.deleteDir(location, true, isAutopurge);
+            fs.mkdirs(location);
+            HdfsUtils.setFullFileStatus(getHiveConf(), status, targetGroup, 
fs, location, false);
+          } else {
+            FileStatus[] statuses = fs.listStatus(location, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+            if (statuses == null || statuses.length == 0) {
+              continue;
+            }
+            for (final FileStatus status : statuses) {
+              wh.deleteDir(status.getPath(), true, isAutopurge);
+            }
+          }
+        }
+
+        // Alter the table/partition stats and also notify truncate table event
+        alterTableStatsForTruncate(getMS(), dbName, tableName, tbl, partNames);
+      } catch (IOException e) {
+        throw new MetaException(e.getMessage());
+      } catch (Exception e) {
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof NoSuchObjectException) {
+          throw (NoSuchObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      }
+    }
+
     /**
      * Is this an external table?
      *
@@ -2135,6 +2266,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, madeDir = false;
       Path partLocation = null;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         part.setDbName(dbName);
@@ -2173,7 +2305,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (!wh.isDir(partLocation)) {
-          if (!wh.mkdirs(partLocation, true)) {
+          if (!wh.mkdirs(partLocation)) {
             throw new MetaException(partLocation
                 + " is not a directory or unable to create one");
           }
@@ -2191,12 +2323,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.addPartition(part)) {
-          if (transactionalListeners.size() > 0) {
-            AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, 
part, true, this);
-            addPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onAddPartition(addPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.ADD_PARTITION,
+                                                      new 
AddPartitionEvent(tbl, part, true, this),
+                                                      envContext);
           }
 
           success = ms.commitTransaction();
@@ -2209,11 +2341,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(tbl, part, success, this);
-          addPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAddPartition(addPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
part, success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
       return part;
@@ -2358,8 +2491,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final Map<PartValEqWrapper, Boolean> addedPartitions =
           Collections.synchronizedMap(new HashMap<PartValEqWrapper, 
Boolean>());
       final List<Partition> newParts = new ArrayList<Partition>();
-      final List<Partition> existingParts = new ArrayList<Partition>();;
+      final List<Partition> existingParts = new ArrayList<Partition>();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
+
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2445,7 +2580,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = false;
         // Notification is generated for newly created partitions only. The 
subset of partitions
         // that already exist (existingParts), will not generate notifications.
-        fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
newParts, true, this));
+        }
+
         success = ms.commitTransaction();
       } finally {
         if (!success) {
@@ -2456,12 +2597,26 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               wh.deleteDir(new 
Path(e.getKey().partition.getSd().getLocation()), true);
             }
           }
-          fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, 
parts, false, this));
+          }
         } else {
-          fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
-          if (existingParts != null) {
-            // The request has succeeded but we failed to add these partitions.
-            fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, 
newParts, true, this),
+                                                  null,
+                                                  
transactionalListenerResponses);
+
+            if (!existingParts.isEmpty()) {
+              // The request has succeeded but we failed to add these 
partitions.
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
existingParts, false, this));
+            }
           }
         }
       }
@@ -2548,6 +2703,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final PartitionSpecProxy.PartitionIterator partitionIterator = 
partitionSpecProxy
           .getPartitionIterator();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2621,7 +2777,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = ms.addPartitions(dbName, tblName, partitionSpecProxy, 
ifNotExists);
         //setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, 
null, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
partitionSpecProxy, true, this));
+        }
+
         success = ms.commitTransaction();
         return addedPartitions.size();
       } finally {
@@ -2634,7 +2797,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             }
           }
         }
-        fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
partitionSpecProxy, true, this),
+                                                null,
+                                                
transactionalListenerResponses);
+        }
       }
     }
 
@@ -2686,7 +2856,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         // mkdirs() because if the file system is read-only, mkdirs will
         // throw an exception even if the directory already exists.
         if (!wh.isDir(partLocation)) {
-          if (!wh.mkdirs(partLocation, true)) {
+          if (!wh.mkdirs(partLocation)) {
             throw new MetaException(partLocation
                 + " is not a directory or unable to create one");
           }
@@ -2739,6 +2909,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throws InvalidObjectException, AlreadyExistsException, MetaException, 
TException {
       boolean success = false;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(part.getDbName(), part.getTableName());
@@ -2763,7 +2934,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), 
envContext, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
Arrays.asList(part), true, this),
+                                                    envContext);
+
+        }
+
         // we proceed only if we'd actually succeeded anyway, otherwise,
         // we'd have thrown an exception
         success = ms.commitTransaction();
@@ -2771,64 +2951,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
         }
-        fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, 
success);
-      }
-      return part;
-    }
 
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final List<Partition> parts, final EnvironmentContext envContext, 
boolean success)
-          throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
Arrays.asList(part), success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
 
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final PartitionSpecProxy partitionSpec, final EnvironmentContext 
envContext, boolean success)
-          throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
-
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final List<Partition> parts, final EnvironmentContext envContext, 
boolean success)
-            throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
-
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final PartitionSpecProxy partitionSpec, final EnvironmentContext 
envContext, boolean success)
-            throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
         }
       }
+      return part;
     }
 
-
     @Override
     public Partition add_partition(final Partition part)
         throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -2911,6 +3046,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path destPath = new Path(destinationTable.getSd().getLocation(),
           Warehouse.makePartName(partitionKeysPresent, partValsPresent));
       List<Partition> destPartitions = new ArrayList<Partition>();
+
+      Map<String, String> transactionalListenerResponsesForAddPartition = 
Collections.emptyMap();
+      List<Map<String, String>> transactionalListenerResponsesForDropPartition 
=
+          Lists.newArrayListWithCapacity(partitionsToExchange.size());
+
       try {
         for (Partition partition: partitionsToExchange) {
           Partition destPartition = new Partition(partition);
@@ -2926,7 +3066,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
         Path destParentPath = destPath.getParent();
         if (!wh.isDir(destParentPath)) {
-          if (!wh.mkdirs(destParentPath, true)) {
+          if (!wh.mkdirs(destParentPath)) {
               throw new MetaException("Unable to create path " + 
destParentPath);
           }
         }
@@ -2938,8 +3078,22 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
-            destinationTable, destPartitions, transactionalListeners, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponsesForAddPartition =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new 
AddPartitionEvent(destinationTable, destPartitions, true, this));
+
+          for (Partition partition : partitionsToExchange) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(sourceTable, partition, true, true, 
this);
+            transactionalListenerResponsesForDropPartition.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      dropPartitionEvent));
+          }
+        }
 
         success = ms.commitTransaction();
         return destPartitions;
@@ -2949,34 +3103,31 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (pathCreated) {
             wh.renameDir(destPath, sourcePath);
           }
-
-          fireMetaStoreExchangePartitionEvent(sourceTable, 
partitionsToExchange,
-              destinationTable, destPartitions, listeners, success);
         }
-      }
-    }
 
-    private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
-        List<Partition> partitionsToExchange, Table destinationTable,
-        List<Partition> destPartitions,
-        List<MetaStoreEventListener> eventListeners,
-        boolean status) throws MetaException {
-      if (sourceTable != null && destinationTable != null
-          && !CollectionUtils.isEmpty(partitionsToExchange)
-          && !CollectionUtils.isEmpty(destPartitions)) {
-        if (eventListeners.size() > 0) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(destinationTable, destPartitions, status, 
this);
-          for (MetaStoreEventListener eventListener : eventListeners) {
-            eventListener.onAddPartition(addPartitionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          AddPartitionEvent addPartitionEvent = new 
AddPartitionEvent(destinationTable, destPartitions, success, this);
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                addPartitionEvent,
+                                                null,
+                                                
transactionalListenerResponsesForAddPartition);
 
+          i = 0;
           for (Partition partition : partitionsToExchange) {
             DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(sourceTable, partition, true, status, 
this);
-            for (MetaStoreEventListener eventListener : eventListeners) {
-              eventListener.onDropPartition(dropPartitionEvent);
-            }
+                new DropPartitionEvent(sourceTable, partition, success, true, 
this);
+            Map<String, String> parameters =
+                (transactionalListenerResponsesForDropPartition.size() > i)
+                    ? transactionalListenerResponsesForDropPartition.get(i)
+                    : null;
+
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.DROP_PARTITION,
+                                                  dropPartitionEvent,
+                                                  null,
+                                                  parameters);
+            i++;
           }
         }
       }
@@ -2994,6 +3145,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path archiveParentDir = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
 
       try {
         ms.openTransaction();
@@ -3012,27 +3164,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (isArchived) {
           archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
           verifyIsWritablePath(archiveParentDir);
-          checkTrashPurgeCombination(archiveParentDir, db_name + "." + 
tbl_name + "." + part_vals,
-              mustPurge, deleteData && !isExternalTbl);
         }
 
         if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
           partPath = new Path(part.getSd().getLocation());
           verifyIsWritablePath(partPath);
-          checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." 
+ part_vals,
-              mustPurge, deleteData && !isExternalTbl);
         }
 
         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
           throw new MetaException("Unable to drop partition");
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new 
DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -3060,11 +3208,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             // ok even if the data is not deleted
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropPartitionEvent dropPartitionEvent =
-            new DropPartitionEvent(tbl, part, success, deleteData, this);
-          dropPartitionEvent.setEnvironmentContext(envContext);
-          listener.onDropPartition(dropPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_PARTITION,
+                                                new DropPartitionEvent(tbl, 
part, success, deleteData, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
       return true;
@@ -3126,6 +3275,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> parts = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      List<Map<String, String>> transactionalListenerResponses = 
Lists.newArrayList();
+
       try {
         // We need Partition-s for firing events and for result; DN needs 
MPartition-s to drop.
         // Great... Maybe we could bypass fetching MPartitions by issuing 
direct SQL deletes.
@@ -3195,28 +3346,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (MetaStoreUtils.isArchived(part)) {
             Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
             verifyIsWritablePath(archiveParentDir);
-            checkTrashPurgeCombination(archiveParentDir, dbName + "." + 
tblName + "." +
-                part.getValues(), mustPurge, deleteData && !isExternalTbl);
             archToDelete.add(archiveParentDir);
           }
           if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
             Path partPath = new Path(part.getSd().getLocation());
             verifyIsWritablePath(partPath);
-            checkTrashPurgeCombination(partPath, dbName + "." + tblName + "." 
+ part.getValues(),
-                mustPurge, deleteData && !isExternalTbl);
             dirsToDelete.add(new PathAndPartValSize(partPath, 
part.getValues().size()));
           }
         }
 
         ms.dropPartitions(dbName, tblName, partNames);
-        if (parts != null && transactionalListeners.size() > 0) {
+        if (parts != null && !transactionalListeners.isEmpty()) {
           for (Partition part : parts) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+            transactionalListenerResponses.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new 
DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext));
           }
         }
 
@@ -3250,12 +3396,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
         if (parts != null) {
-          for (Partition part : parts) {
-            for (MetaStoreEventListener listener : listeners) {
-              DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, success, deleteData, this);
-              dropPartitionEvent.setEnvironmentContext(envContext);
-              listener.onDropPartition(dropPartitionEvent);
+          int i = 0;
+          if (parts != null && !listeners.isEmpty()) {
+            for (Partition part : parts) {
+              Map<String, String> parameters =
+                  (!transactionalListenerResponses.isEmpty()) ? 
transactionalListenerResponses.get(i) : null;
+
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.DROP_PARTITION,
+                                                    new 
DropPartitionEvent(tbl, part, success, deleteData, this),
+                                                    envContext,
+                                                    parameters);
+
+              i++;
             }
           }
         }
@@ -3435,8 +3588,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         int partitionRequest = (maxToFetch < 0) ? numPartitions : maxToFetch;
         if (partitionRequest > partitionLimit) {
           String configName = 
ConfVars.METASTORE_LIMIT_PARTITION_REQUEST.varname;
-          throw new MetaException(String.format("Number of partitions scanned 
(=%d) on table '%s' exceeds limit" +
-              " (=%d). This is controlled on the metastore server by %s.", 
partitionRequest, tblName, partitionLimit, configName));
+          throw new 
MetaException(String.format(PARTITION_NUMBER_EXCEED_LIMIT_MSG, partitionRequest,
+              tblName, partitionLimit, configName));
         }
       }
     }
@@ -3678,14 +3831,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Only fetch the table if we actually have a listener
         Table table = null;
-        for (MetaStoreEventListener listener : listeners) {
+        if (!listeners.isEmpty()) {
           if (table == null) {
             table = getMS().getTable(db_name, tbl_name);
           }
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, new_part, table, true, this);
-          alterPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAlterPartition(alterPartitionEvent);
+
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_PARTITION,
+                                                new 
AlterPartitionEvent(oldPart, new_part, table, false, true, this),
+                                                envContext);
         }
       } catch (InvalidObjectException e) {
         ex = e;
@@ -3749,13 +3903,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           else {
             throw new InvalidOperationException("failed to alterpartitions");
           }
-          for (MetaStoreEventListener listener : listeners) {
-            if (table == null) {
-              table = getMS().getTable(db_name, tbl_name);
-            }
-            AlterPartitionEvent alterPartitionEvent =
-                new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, 
this);
-            listener.onAlterPartition(alterPartitionEvent);
+
+          if (table == null) {
+            table = getMS().getTable(db_name, tbl_name);
+          }
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ALTER_PARTITION,
+                                                  new 
AlterPartitionEvent(oldTmpPart, tmpPart, table, false, true, this));
           }
         }
       } catch (InvalidObjectException e) {
@@ -3792,16 +3948,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       Index oldIndex = null;
       RawStore ms  = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         oldIndex = get_index_by_name(dbname, base_table_name, index_name);
         firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
         ms.alterIndex(dbname, base_table_name, index_name, newIndex);
-        if (transactionalListeners.size() > 0) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, 
newIndex, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onAlterIndex(alterIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ALTER_INDEX,
+                                                    new 
AlterIndexEvent(oldIndex, newIndex, true, this));
         }
 
         success = ms.commitTransaction();
@@ -3823,9 +3980,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         endFunction("alter_index", success, ex, base_table_name);
-        for (MetaStoreEventListener listener : listeners) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, 
newIndex, success, this);
-          listener.onAlterIndex(alterIndexEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_INDEX,
+                                                new AlterIndexEvent(oldIndex, 
newIndex, success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -3893,11 +4054,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
                 envContext, this);
         success = true;
-        for (MetaStoreEventListener listener : listeners) {
-          AlterTableEvent alterTableEvent =
-              new AlterTableEvent(oldt, newTable, success, this);
-          alterTableEvent.setEnvironmentContext(envContext);
-          listener.onAlterTable(alterTableEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_TABLE,
+                                                new AlterTableEvent(oldt, 
newTable, false, true, this),
+                                                envContext);
         }
       } catch (NoSuchObjectException e) {
         // thrown when the table to be altered does not exist
@@ -4465,6 +4626,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, indexTableCreated = false;
       String[] qualified =
           MetaStoreUtils.getQualifiedName(index.getDbName(), 
index.getIndexTableName());
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         firePreEvent(new PreAddIndexEvent(index, this));
@@ -4502,11 +4664,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         index.setCreateTime((int) time);
         index.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(time));
         if (ms.addIndex(index)) {
-          if (transactionalListeners.size() > 0) {
-            AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onAddIndex(addIndexEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.CREATE_INDEX,
+                                                      new AddIndexEvent(index, 
true, this));
           }
         }
 
@@ -4523,9 +4685,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, 
this);
-          listener.onAddIndex(addIndexEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_INDEX,
+                                                new AddIndexEvent(index, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -4563,6 +4728,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Index index = null;
       Path tblPath = null;
       List<Path> partPaths = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop the underlying index table
@@ -4595,11 +4761,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        if (transactionalListeners.size() > 0) {
-          DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, 
this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onDropIndex(dropIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_INDEX,
+                                                    new DropIndexEvent(index, 
true, this));
         }
 
         success = ms.commitTransaction();
@@ -4612,11 +4778,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           // ok even if the data is not deleted
         }
         // Skip the event listeners if the index is NULL
-        if (index != null) {
-          for (MetaStoreEventListener listener : listeners) {
-            DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, 
this);
-            listener.onDropIndex(dropIndexEvent);
-          }
+        if (index != null && !listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_INDEX,
+                                                new DropIndexEvent(index, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
       return success;
@@ -6052,6 +6219,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       validateFunctionInfo(func);
       boolean success = false;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         Database db = ms.getDatabase(func.getDbName());
@@ -6068,11 +6236,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         long time = System.currentTimeMillis() / 1000;
         func.setCreateTime((int) time);
         ms.createFunction(func);
-        if (transactionalListeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new 
CreateFunctionEvent(func, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateFunction(createFunctionEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_FUNCTION,
+                                                    new 
CreateFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6081,11 +6249,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        if (listeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new 
CreateFunctionEvent(func, success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onCreateFunction(createFunctionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_FUNCTION,
+                                                new CreateFunctionEvent(func, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -6097,6 +6266,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false;
       Function func = null;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         func = ms.getFunction(dbName, funcName);
@@ -6106,10 +6276,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.dropFunction(dbName, funcName);
         if (transactionalListeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, 
true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onDropFunction(dropFunctionEvent);
-          }
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_FUNCTION,
+                                                    new 
DropFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6119,10 +6289,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (listeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, 
success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onDropFunction(dropFunctionEvent);
-          }
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_FUNCTION,
+                                                new DropFunctionEvent(func, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -6474,13 +6645,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         InsertEvent event =
             new InsertEvent(rqst.getDbName(), rqst.getTableName(), 
rqst.getPartitionVals(), rqst
                 .getData().getInsertData(), rqst.isSuccessful(), this);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onInsert(event);
-        }
 
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onInsert(event);
-        }
+        /*
+         * The transactional listener response will be set already on the 
event, so there is not need
+         * to pass the response to the non-transactional listener.
+         */
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.INSERT, event);
+        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, 
event);
 
         return new FireEventResponse();
 
@@ -7234,10 +7405,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             ServerMode.METASTORE);
         saslServer.setSecretManager(delegationTokenManager.getSecretManager());
         transFactory = saslServer.createTransportFactory(
-                MetaStoreUtils.getMetaStoreSaslProperties(conf));
+                MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
         processor = saslServer.wrapProcessor(
           new ThriftHiveMetastore.Processor<IHMSHandler>(handler));
-        serverSocket = HiveAuthUtils.getServerSocket(null, port);
 
         LOG.info("Starting DB backed MetaStore Server in Secure Mode");
       } else {
@@ -7256,25 +7426,27 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           processor = new TSetIpAddressProcessor<IHMSHandler>(handler);
           LOG.info("Starting DB backed MetaStore Server");
         }
+      }
+
+      if (!useSSL) {
+        serverSocket = HiveAuthUtils.getServerSocket(null, port);
+      } else {
+        String keyStorePath = 
conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
+        if (keyStorePath.isEmpty()) {
+          throw new 
IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH.varname
+              + " Not configured for SSL connection");
+        }
+        String keyStorePassword = ShimLoader.getHadoopShims().getPassword(conf,
+            HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
 
         // enable SSL support for HMS
         List<String> sslVersionBlacklist = new ArrayList<String>();
         for (String sslVersion : 
conf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
           sslVersionBlacklist.add(sslVersion);
         }
-        if (!useSSL) {
-          serverSocket = HiveAuthUtils.getServerSocket(null, port);
-        } else {
-          String keyStorePath = 
conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
-          if (keyStorePath.isEmpty()) {
-            throw new 
IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname
-                + " Not configured for SSL connection");
-          }
-          String keyStorePassword = 
ShimLoader.getHadoopShims().getPassword(conf,
-              HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
-          serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, 
keyStorePath,
-              keyStorePassword, sslVersionBlacklist);
-        }
+
+        serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, 
keyStorePath,
+            keyStorePassword, sslVersionBlacklist);
       }
 
       if (tcpKeepAlive) {
@@ -7336,6 +7508,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       HMSHandler.LOG.info("Options.maxWorkerThreads = "
           + maxWorkerThreads);
       HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+      HMSHandler.LOG.info("Enable SSL = " + useSSL);
 
       if (startLock != null) {
         signalOtherThreadsToStart(tServer, startLock, startCondition, 
startedServing);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index b0b009a..4912a31 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -395,6 +395,29 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient {
         LOG.info("Trying to connect to metastore with URI " + store);
 
         try {
+          if (useSSL) {
+            try {
+              String trustStorePath = 
conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim();
+              if (trustStorePath.isEmpty()) {
+                throw new 
IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname
+                    + " Not configured for SSL connection");
+              }
+              String trustStorePassword = 
ShimLoader.getHadoopShims().getPassword(conf,
+                  
HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname);
+
+              // Create an SSL socket and connect
+              transport = HiveAuthUtils.getSSLSocket(store.getHost(), 
store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword );
+              LOG.info("Opened an SSL connection to metastore, current 
connections: " + connCount.incrementAndGet());
+            } catch(IOException e) {
+              throw new IllegalArgumentException(e);
+            } catch(TTransportException e) {
+              tte = e;
+              throw new MetaException(e.toString());
+            }
+          } else {
+            transport = new TSocket(store.getHost(), store.getPort(), 
clientSocketTimeout);
+          }
+
           if (useSasl) {
             // Wrap thrift connection with SASL for secure connection.
             try {
@@ -409,48 +432,24 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient {
               String tokenSig = 
conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE);
               // tokenSig could be null
               tokenStrForm = Utils.getTokenStrForm(tokenSig);
-              transport = new TSocket(store.getHost(), store.getPort(), 
clientSocketTimeout);
 
               if(tokenStrForm != null) {
                 // authenticate using delegation tokens via the "DIGEST" 
mechanism
                 transport = authBridge.createClientTransport(null, 
store.getHost(),
                     "DIGEST", tokenStrForm, transport,
-                        MetaStoreUtils.getMetaStoreSaslProperties(conf));
+                        MetaStoreUtils.getMetaStoreSaslProperties(conf, 
useSSL));
               } else {
                 String principalConfig =
                     
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
                 transport = authBridge.createClientTransport(
                     principalConfig, store.getHost(), "KERBEROS", null,
-                    transport, 
MetaStoreUtils.getMetaStoreSaslProperties(conf));
+                    transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, 
useSSL));
               }
             } catch (IOException ioe) {
               LOG.error("Couldn't create client transport", ioe);
               throw new MetaException(ioe.toString());
             }
           } else {
-            if (useSSL) {
-              try {
-                String trustStorePath = 
conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim();
-                if (trustStorePath.isEmpty()) {
-                  throw new 
IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname
-                      + " Not configured for SSL connection");
-                }
-                String trustStorePassword = 
ShimLoader.getHadoopShims().getPassword(conf,
-                    
HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname);
-
-                // Create an SSL socket and connect
-                transport = HiveAuthUtils.getSSLSocket(store.getHost(), 
store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword );
-                LOG.info("Opened an SSL connection to metastore, current 
connections: " + connCount.incrementAndGet());
-              } catch(IOException e) {
-                throw new IllegalArgumentException(e);
-              } catch(TTransportException e) {
-                tte = e;
-                throw new MetaException(e.toString());
-              }
-            } else {
-              transport = new TSocket(store.getHost(), store.getPort(), 
clientSocketTimeout);
-            }
-
             if (useFramedTransport) {
               transport = new TFramedTransport(transport);
             }
@@ -1097,6 +1096,23 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient {
   }
 
   /**
+   * Truncate the table/partitions in the DEFAULT database.
+   * @param dbName
+   *          The db to which the table to be truncate belongs to
+   * @param tableName
+   *          The table to truncate
+   * @param partNames
+   *          List of partitions to truncate. NULL will truncate the whole 
table/all partitions
+   * @throws MetaException
+   * @throws TException
+   *           Could not truncate table properly.
+   */
+  @Override
+  public void truncateTable(String dbName, String tableName, List<String> 
partNames) throws MetaException, TException {
+    client.truncate_table(dbName, tableName, partNames);
+  }
+
+  /**
    * @param type
    * @return true if the type is dropped
    * @throws MetaException

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
index df698c8..b7d7b50 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
@@ -25,35 +25,23 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 
 public class HiveMetaStoreFsImpl implements MetaStoreFS {
 
   public static final Logger LOG = LoggerFactory
-      .getLogger("hive.metastore.hivemetastoressimpl");
+      .getLogger("hive.metastore.hivemetastoreFsimpl");
 
   @Override
   public boolean deleteDir(FileSystem fs, Path f, boolean recursive,
       boolean ifPurge, Configuration conf) throws MetaException {
-    LOG.debug("deleting  " + f);
-
     try {
-      if (ifPurge) {
-        LOG.info("Not moving "+ f +" to trash");
-      } else if (Trash.moveToAppropriateTrash(fs, f, conf)) {
-        LOG.info("Moved to trash: " + f);
-        return true;
-      }
-
-      if (fs.delete(f, true)) {
-        LOG.debug("Deleted the diretory " + f);
-        return true;
-      }
-
+      FileUtils.moveToTrash(fs, f, conf, ifPurge);
       if (fs.exists(f)) {
         throw new MetaException("Unable to delete directory: " + f);
       }
+      return true;
     } catch (FileNotFoundException e) {
       return true; // ok even if there is not data
     } catch (Exception e) {
@@ -61,5 +49,4 @@ public class HiveMetaStoreFsImpl implements MetaStoreFS {
     }
     return false;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index d567258..82db281 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -305,6 +305,20 @@ public interface IMetaStoreClient {
   void dropTable(String dbname, String tableName)
       throws MetaException, TException, NoSuchObjectException;
 
+  /**
+   * Truncate the table/partitions in the DEFAULT database.
+   * @param dbName
+   *          The db to which the table to be truncate belongs to
+   * @param tableName
+   *          The table to truncate
+   * @param partNames
+   *          List of partitions to truncate. NULL will truncate the whole 
table/all partitions
+   * @throws MetaException
+   * @throws TException
+   *           Could not truncate table properly.
+   */
+  void truncateTable(String dbName, String tableName, List<String> partNames) 
throws MetaException, TException;
+
   boolean tableExists(String databaseName, String tableName) throws 
MetaException,
       TException, UnknownDBException;
 

Reply via email to