HIVE-16164: Provide mechanism for passing HMS notification ID between 
transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit 
Sabharwal, Alexander Kolbasov)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa29cd9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa29cd9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa29cd9d

Branch: refs/heads/master
Commit: aa29cd9d6044edb7d425b99ff347c92079475d1d
Parents: 2985262
Author: Sergio Pena <sergio.p...@cloudera.com>
Authored: Tue Apr 4 09:42:06 2017 -0500
Committer: Sergio Pena <sergio.p...@cloudera.com>
Committed: Tue Apr 4 09:43:05 2017 -0500

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  46 +-
 .../MetaStoreEventListenerConstants.java        |  33 ++
 .../listener/TestDbNotificationListener.java    | 190 +++++++
 .../hadoop/hive/metastore/HiveAlterHandler.java |  60 +--
 .../hadoop/hive/metastore/HiveMetaStore.java    | 529 +++++++++++--------
 .../metastore/MetaStoreListenerNotifier.java    | 224 ++++++++
 .../hive/metastore/events/ListenerEvent.java    | 106 ++++
 .../hadoop/hive/metastore/TestObjectStore.java  |  50 ++
 8 files changed, 959 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index ea6cb79..bbfbc36 100644
--- 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -137,7 +138,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .buildCreateTableMessage(t, new 
FileIterator(t.getSd().getLocation())).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -152,7 +153,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -168,7 +169,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .buildAlterTableMessage(before, after).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   class FileIterator implements Iterator<String> {
@@ -276,7 +277,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), 
msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -291,7 +292,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .buildDropPartitionMessage(t, 
partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -307,7 +308,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .buildAlterPartitionMessage(partitionEvent.getTable(), before, 
after).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -321,7 +322,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), 
msgFactory
             .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -335,7 +336,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), 
msgFactory
             .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -349,7 +350,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), 
msgFactory
             .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -363,7 +364,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), 
msgFactory
             .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -377,7 +378,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), 
msgFactory
             .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -391,7 +392,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), 
msgFactory
             .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -406,7 +407,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), 
msgFactory
             .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   class FileChksumIterator implements Iterator<String> {
@@ -443,7 +444,7 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
             .toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
-    process(event);
+    process(event, insertEvent);
   }
 
   /**
@@ -467,14 +468,27 @@ public class DbNotificationListener extends 
MetaStoreEventListener {
     return (int)millis;
   }
 
-  // Process this notification by adding it to metastore DB
-  private void process(NotificationEvent event) throws MetaException {
+  /**
+   * Process this notification by adding it to metastore DB.
+   *
+   * @param event NotificationEvent is the object written to the metastore DB.
+   * @param listenerEvent ListenerEvent (from which NotificationEvent was 
based) used only to set the
+   *                      DB_NOTIFICATION_EVENT_ID_KEY_NAME for future 
reference by other listeners.
+   */
+  private void process(NotificationEvent event, ListenerEvent listenerEvent) 
throws MetaException {
     event.setMessageFormat(msgFactory.getMessageFormat());
     synchronized (NOTIFICATION_TBL_LOCK) {
       LOG.debug("DbNotificationListener: Processing : {}:{}", 
event.getEventId(),
           event.getMessage());
       HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
     }
+
+      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other 
listeners.
+      if (event.isSetEventId()) {
+        listenerEvent.putParameter(
+            MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+            Long.toString(event.getEventId()));
+      }
   }
 
   private static class CleanerThread extends Thread {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
new file mode 100644
index 0000000..a4f2d59
--- /dev/null
+++ 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+/**
+ * Keeps a list of reserved keys used by Hive listeners when updating the 
ListenerEvent
+ * parameters.
+ */
+public class MetaStoreEventListenerConstants {
+  /*
+   * DbNotificationListener keys reserved for updating ListenerEvent 
parameters.
+   *
+   * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier 
that DbNotificationListener
+   *                                   processed during an event. This event 
identifier might be shared
+   *                                   across other MetaStoreEventListener 
implementations.
+   */
+  public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = 
"DB_NOTIFICATION_EVENT_ID_KEY_NAME";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 1cf47c3..50d8878 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -31,13 +31,16 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -56,6 +60,21 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
@@ -75,6 +94,8 @@ import 
org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.data.Pair;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -97,12 +118,105 @@ public class TestDbNotificationListener {
   private int startTime;
   private long firstEventId;
 
+  /* This class is used to verify that HiveMetaStore calls the 
non-transactional listeners with the
+    * current event ID set by the DbNotificationListener class */
+  public static class MockMetaStoreEventListener extends 
MetaStoreEventListener {
+    private static Stack<Pair<EventType, String>> eventsIds = new Stack<>();
+
+    private static void pushEventId(EventType eventType, final ListenerEvent 
event) {
+      if (event.getStatus()) {
+        Map<String, String> parameters = event.getParameters();
+        if 
(parameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME))
 {
+          Pair<EventType, String> pair =
+              new Pair<>(eventType, 
parameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
+          eventsIds.push(pair);
+        }
+      }
+    }
+
+    public static void popAndVerifyLastEventId(EventType eventType, long id) {
+      if (!eventsIds.isEmpty()) {
+        Pair<EventType, String> pair = eventsIds.pop();
+
+        assertEquals("Last event type does not match.", eventType, pair.first);
+        assertEquals("Last event ID does not match.", Long.toString(id), 
pair.second);
+      } else {
+        assertTrue("List of events is empty.",false);
+      }
+    }
+
+    public static void clearEvents() {
+      eventsIds.clear();
+    }
+
+    public MockMetaStoreEventListener(Configuration config) {
+      super(config);
+    }
+
+    public void onCreateTable (CreateTableEvent tableEvent) throws 
MetaException {
+      pushEventId(EventType.CREATE_TABLE, tableEvent);
+    }
+
+    public void onDropTable (DropTableEvent tableEvent)  throws MetaException {
+      pushEventId(EventType.DROP_TABLE, tableEvent);
+    }
+
+    public void onAlterTable (AlterTableEvent tableEvent) throws MetaException 
{
+      pushEventId(EventType.ALTER_TABLE, tableEvent);
+    }
+
+    public void onAddPartition (AddPartitionEvent partitionEvent) throws 
MetaException {
+      pushEventId(EventType.ADD_PARTITION, partitionEvent);
+    }
+
+    public void onDropPartition (DropPartitionEvent partitionEvent)  throws 
MetaException {
+      pushEventId(EventType.DROP_PARTITION, partitionEvent);
+    }
+
+    public void onAlterPartition (AlterPartitionEvent partitionEvent)  throws 
MetaException {
+      pushEventId(EventType.ALTER_PARTITION, partitionEvent);
+    }
+
+    public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws 
MetaException {
+      pushEventId(EventType.CREATE_DATABASE, dbEvent);
+    }
+
+    public void onDropDatabase (DropDatabaseEvent dbEvent) throws 
MetaException {
+      pushEventId(EventType.DROP_DATABASE, dbEvent);
+    }
+
+    public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.CREATE_INDEX, indexEvent);
+    }
+
+    public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.DROP_INDEX, indexEvent);
+    }
+
+    public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.ALTER_INDEX, indexEvent);
+    }
+
+    public void onCreateFunction (CreateFunctionEvent fnEvent) throws 
MetaException {
+      pushEventId(EventType.CREATE_FUNCTION, fnEvent);
+    }
+
+    public void onDropFunction (DropFunctionEvent fnEvent) throws 
MetaException {
+      pushEventId(EventType.DROP_FUNCTION, fnEvent);
+    }
+
+    public void onInsert(InsertEvent insertEvent) throws MetaException {
+      pushEventId(EventType.INSERT, insertEvent);
+    }
+  }
+
   @SuppressWarnings("rawtypes")
   @BeforeClass
   public static void connectToMetastore() throws Exception {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
         DbNotificationListener.class.getName());
+    conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, 
MockMetaStoreEventListener.class.getName());
     conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, 
String.valueOf(EVENTS_TTL) + "s");
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -139,6 +253,12 @@ public class TestDbNotificationListener {
     DummyRawStoreFailEvent.setEventSucceed(true);
   }
 
+  @After
+  public void tearDown() {
+    MockMetaStoreEventListener.clearEvents();
+  }
+
+
   @Test
   public void createDatabase() throws Exception {
     String dbName = "createdb";
@@ -164,6 +284,9 @@ public class TestDbNotificationListener {
     CreateDatabaseMessage createDbMsg = 
md.getCreateDatabaseMessage(event.getMessage());
     assertEquals(dbName, createDbMsg.getDB());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -206,6 +329,10 @@ public class TestDbNotificationListener {
     DropDatabaseMessage dropDbMsg = 
md.getDropDatabaseMessage(event.getMessage());
     assertEquals(dbName, dropDbMsg.getDB());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_DATABASE, 
firstEventId + 2);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
@@ -256,6 +383,9 @@ public class TestDbNotificationListener {
     assertEquals(tblName, createTblMsg.getTable());
     assertEquals(table, createTblMsg.getTableObj());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     table =
@@ -312,6 +442,9 @@ public class TestDbNotificationListener {
     AlterTableMessage alterTableMessage = 
md.getAlterTableMessage(event.getMessage());
     assertEquals(table, alterTableMessage.getTableObjAfter());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -363,6 +496,10 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, dropTblMsg.getDB());
     assertEquals(tblName, dropTblMsg.getTable());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_TABLE, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     table =
@@ -428,6 +565,10 @@ public class TestDbNotificationListener {
     assertTrue(ptnIter.hasNext());
     assertEquals(partition, ptnIter.next());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     partition =
@@ -494,6 +635,10 @@ public class TestDbNotificationListener {
     assertEquals(tblName, alterPtnMsg.getTable());
     assertEquals(newPart, alterPtnMsg.getPtnObjAfter());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -557,6 +702,11 @@ public class TestDbNotificationListener {
     assertEquals(table.getTableName(), tableObj.getTableName());
     assertEquals(table.getOwner(), tableObj.getOwner());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, 
firstEventId + 3);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     List<String> newpartCol1Vals = Arrays.asList("tomorrow");
@@ -653,6 +803,13 @@ public class TestDbNotificationListener {
     Iterator<Map<String, String>> parts = 
dropPtnMsg.getPartitions().iterator();
     assertTrue(parts.hasNext());
     assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values()));
+
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, 
firstEventId + 5);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 4);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
   }
 
   @Test
@@ -693,6 +850,9 @@ public class TestDbNotificationListener {
     assertEquals(ResourceType.JAR, 
funcObj.getResourceUris().get(0).getResourceType());
     assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -742,6 +902,10 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, dropFuncMsg.getDB());
     assertEquals(funcName, dropFuncMsg.getFunctionName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_FUNCTION, 
firstEventId + 2);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     func =
@@ -807,6 +971,11 @@ public class TestDbNotificationListener {
     assertEquals(tableName, indexObj.getOrigTableName());
     assertEquals(indexTableName, indexObj.getIndexTableName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, 
firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -873,6 +1042,12 @@ public class TestDbNotificationListener {
     assertEquals(indexTableName.toLowerCase(), dropIdxMsg.getIndexTableName());
     assertEquals(tableName.toLowerCase(), dropIdxMsg.getOrigTableName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_INDEX, 
firstEventId + 4);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, 
firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     index =
@@ -947,6 +1122,12 @@ public class TestDbNotificationListener {
     assertEquals(indexTableName, indexObj.getIndexTableName());
     assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ALTER_INDEX, 
firstEventId + 4);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, 
firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -1003,6 +1184,10 @@ public class TestDbNotificationListener {
     assertEquals(tblName, event.getTableName());
     // Parse the message field
     verifyInsert(event, defaultDbName, tblName);
+
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
   }
 
   @Test
@@ -1063,6 +1248,11 @@ public class TestDbNotificationListener {
     Map<String,String> partKeyValsFromNotif = 
insertMessage.getPartitionKeyValues();
 
     assertMapEquals(partKeyVals, partKeyValsFromNotif);
+
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, 
firstEventId + 3);
+    
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, 
firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, 
firstEventId + 1);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 4ce6a65..d0511ad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -268,12 +269,11 @@ public class HiveAlterHandler implements AlterHandler {
       }
 
       alterTableUpdateTableColumnStats(msdb, oldt, newt);
-      if (transactionalListeners != null && transactionalListeners.size() > 0) 
{
-        AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, 
true, handler);
-        alterTableEvent.setEnvironmentContext(environmentContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAlterTable(alterTableEvent);
-        }
+      if (transactionalListeners != null && !transactionalListeners.isEmpty()) 
{
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                              
EventMessage.EventType.ALTER_TABLE,
+                                              new AlterTableEvent(oldt, newt, 
true, handler),
+                                              environmentContext);
       }
       // commit the changes
       success = msdb.commitTransaction();
@@ -381,13 +381,13 @@ public class HiveAlterHandler implements AlterHandler {
 
         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), 
new_part);
         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
-        if (transactionalListeners != null && transactionalListeners.size() > 
0) {
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
-          alterPartitionEvent.setEnvironmentContext(environmentContext);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onAlterPartition(alterPartitionEvent);
-          }
+        if (transactionalListeners != null && 
!transactionalListeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                
EventMessage.EventType.ALTER_PARTITION,
+                                                new 
AlterPartitionEvent(oldPart, new_part, tbl, true, handler),
+                                                environmentContext);
+
+
         }
         success = msdb.commitTransaction();
       } catch (InvalidObjectException e) {
@@ -499,13 +499,11 @@ public class HiveAlterHandler implements AlterHandler {
         }
       }
 
-      if (transactionalListeners != null && transactionalListeners.size() > 0) 
{
-        AlterPartitionEvent alterPartitionEvent =
-            new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
-        alterPartitionEvent.setEnvironmentContext(environmentContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAlterPartition(alterPartitionEvent);
-        }
+      if (transactionalListeners != null && !transactionalListeners.isEmpty()) 
{
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                              
EventMessage.EventType.ALTER_PARTITION,
+                                              new AlterPartitionEvent(oldPart, 
new_part, tbl, true, handler),
+                                              environmentContext);
       }
 
       success = msdb.commitTransaction();
@@ -534,13 +532,11 @@ public class HiveAlterHandler implements AlterHandler {
           try {
             msdb.openTransaction();
             msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
-            if (transactionalListeners != null && 
transactionalListeners.size() > 0) {
-              AlterPartitionEvent alterPartitionEvent =
-                  new AlterPartitionEvent(new_part, oldPart, tbl, true, 
handler);
-              alterPartitionEvent.setEnvironmentContext(environmentContext);
-              for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-                transactionalListener.onAlterPartition(alterPartitionEvent);
-              }
+            if (transactionalListeners != null && 
!transactionalListeners.isEmpty()) {
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    
EventMessage.EventType.ALTER_PARTITION,
+                                                    new 
AlterPartitionEvent(new_part, oldPart, tbl, success, handler),
+                                                    environmentContext);
             }
 
             revertMetaDataTransaction = msdb.commitTransaction();
@@ -625,12 +621,10 @@ public class HiveAlterHandler implements AlterHandler {
               "when invoking MetaStoreEventListener for alterPartitions 
event.");
         }
 
-        if (transactionalListeners != null && transactionalListeners.size() > 
0) {
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, newPart, tbl, true, handler);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onAlterPartition(alterPartitionEvent);
-          }
+        if (transactionalListeners != null && 
!transactionalListeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                
EventMessage.EventType.ALTER_PARTITION,
+                                                new 
AlterPartitionEvent(oldPart, newPart, tbl, true, handler));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 80b1e98..3aabe22 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -57,8 +57,11 @@ import java.util.regex.Pattern;
 
 import javax.jdo.JDOException;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -112,6 +115,7 @@ import 
org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
 import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -151,10 +155,6 @@ import com.facebook.fb303.fb_status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -869,6 +869,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path dbPath = new Path(db.getLocationUri());
       boolean success = false;
       boolean madeDir = false;
+      Map<String, String> transactionalListenersResponses = 
Collections.emptyMap();
       try {
         firePreEvent(new PreCreateDatabaseEvent(db, this));
         if (!wh.isDir(dbPath)) {
@@ -881,11 +882,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.openTransaction();
         ms.createDatabase(db);
-        if (transactionalListeners.size() > 0) {
-          CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateDatabase(cde);
-          }
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenersResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_DATABASE,
+                                                    new 
CreateDatabaseEvent(db, true, this));
         }
 
         success = ms.commitTransaction();
@@ -896,8 +898,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(dbPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onCreateDatabase(new CreateDatabaseEvent(db, success, 
this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_DATABASE,
+                                                new CreateDatabaseEvent(db, 
success, this),
+                                                null,
+                                                
transactionalListenersResponses);
         }
       }
     }
@@ -1012,6 +1019,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Database db = null;
       List<Path> tablePaths = new ArrayList<Path>();
       List<Path> partitionPaths = new ArrayList<Path>();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         db = ms.getDatabase(name);
@@ -1094,12 +1102,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.dropDatabase(name)) {
-          if (transactionalListeners.size() > 0) {
-            DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropDatabase(dde);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_DATABASE,
+                                                      new 
DropDatabaseEvent(db, true, this));
           }
+
           success = ms.commitTransaction();
         }
       } finally {
@@ -1121,8 +1130,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           // it is not a terrible thing even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onDropDatabase(new DropDatabaseEvent(db, success, this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_DATABASE,
+                                                new DropDatabaseEvent(db, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -1380,6 +1394,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       }
 
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       Path tblPath = null;
       boolean success = false, madeDir = false;
       try {
@@ -1440,12 +1455,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
         }
 
-        if (transactionalListeners.size() > 0) {
-          CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, 
this);
-          createTableEvent.setEnvironmentContext(envContext);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateTable(createTableEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_TABLE,
+                                                    new CreateTableEvent(tbl, 
true, this),
+                                                    envContext);
         }
 
         success = ms.commitTransaction();
@@ -1456,11 +1471,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(tblPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          CreateTableEvent createTableEvent =
-              new CreateTableEvent(tbl, success, this);
-          createTableEvent.setEnvironmentContext(envContext);
-          listener.onCreateTable(createTableEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_TABLE,
+                                                new CreateTableEvent(tbl, 
success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -1625,6 +1642,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Path> partPaths = null;
       Table tbl = null;
       boolean ifPurge = false;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop any partitions
@@ -1678,12 +1696,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw new MetaException(indexName == null ? "Unable to drop table " 
+ tableName:
               "Unable to drop index table " + tableName + " for index " + 
indexName);
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, 
deleteData, this);
-            dropTableEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropTable(dropTableEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_TABLE,
+                                                      new DropTableEvent(tbl, 
deleteData, true, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -1698,10 +1716,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           deleteTableData(tblPath, ifPurge);
           // ok even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, 
deleteData, this);
-          dropTableEvent.setEnvironmentContext(envContext);
-          listener.onDropTable(dropTableEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_TABLE,
+                                                new DropTableEvent(tbl, 
deleteData, success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
       return success;
@@ -2165,6 +2186,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, madeDir = false;
       Path partLocation = null;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         part.setDbName(dbName);
@@ -2221,12 +2243,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.addPartition(part)) {
-          if (transactionalListeners.size() > 0) {
-            AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, 
part, true, this);
-            addPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onAddPartition(addPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.ADD_PARTITION,
+                                                      new 
AddPartitionEvent(tbl, part, true, this),
+                                                      envContext);
           }
 
           success = ms.commitTransaction();
@@ -2239,11 +2261,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(tbl, part, success, this);
-          addPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAddPartition(addPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
part, success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
       return part;
@@ -2388,8 +2411,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final Map<PartValEqWrapper, Boolean> addedPartitions =
           Collections.synchronizedMap(new HashMap<PartValEqWrapper, 
Boolean>());
       final List<Partition> newParts = new ArrayList<Partition>();
-      final List<Partition> existingParts = new ArrayList<Partition>();;
+      final List<Partition> existingParts = new ArrayList<Partition>();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
+
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2475,7 +2500,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = false;
         // Notification is generated for newly created partitions only. The 
subset of partitions
         // that already exist (existingParts), will not generate notifications.
-        fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
newParts, true, this));
+        }
+
         success = ms.commitTransaction();
       } finally {
         if (!success) {
@@ -2486,12 +2517,26 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               wh.deleteDir(new 
Path(e.getKey().partition.getSd().getLocation()), true);
             }
           }
-          fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, 
parts, false, this));
+          }
         } else {
-          fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
-          if (existingParts != null) {
-            // The request has succeeded but we failed to add these partitions.
-            fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, 
newParts, true, this),
+                                                  null,
+                                                  
transactionalListenerResponses);
+
+            if (!existingParts.isEmpty()) {
+              // The request has succeeded but we failed to add these 
partitions.
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
existingParts, false, this));
+            }
           }
         }
       }
@@ -2578,6 +2623,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final PartitionSpecProxy.PartitionIterator partitionIterator = 
partitionSpecProxy
           .getPartitionIterator();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2651,7 +2697,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = ms.addPartitions(dbName, tblName, partitionSpecProxy, 
ifNotExists);
         //setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, 
null, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
partitionSpecProxy, true, this));
+        }
+
         success = ms.commitTransaction();
         return addedPartitions.size();
       } finally {
@@ -2664,7 +2717,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             }
           }
         }
-        fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
partitionSpecProxy, true, this),
+                                                null,
+                                                
transactionalListenerResponses);
+        }
       }
     }
 
@@ -2769,6 +2829,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throws InvalidObjectException, AlreadyExistsException, MetaException, 
TException {
       boolean success = false;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(part.getDbName(), part.getTableName());
@@ -2793,7 +2854,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), 
envContext, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, 
Arrays.asList(part), true, this),
+                                                    envContext);
+
+        }
+
         // we proceed only if we'd actually succeeded anyway, otherwise,
         // we'd have thrown an exception
         success = ms.commitTransaction();
@@ -2801,64 +2871,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
         }
-        fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, 
success);
-      }
-      return part;
-    }
-
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final List<Partition> parts, final EnvironmentContext envContext, 
boolean success)
-          throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
 
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final PartitionSpecProxy partitionSpec, final EnvironmentContext 
envContext, boolean success)
-          throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
-
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final List<Partition> parts, final EnvironmentContext envContext, 
boolean success)
-            throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, 
Arrays.asList(part), success, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
 
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final PartitionSpecProxy partitionSpec, final EnvironmentContext 
envContext, boolean success)
-            throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
         }
       }
+      return part;
     }
 
-
     @Override
     public Partition add_partition(final Partition part)
         throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -2941,6 +2966,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path destPath = new Path(destinationTable.getSd().getLocation(),
           Warehouse.makePartName(partitionKeysPresent, partValsPresent));
       List<Partition> destPartitions = new ArrayList<Partition>();
+
+      Map<String, String> transactionalListenerResponsesForAddPartition = 
Collections.emptyMap();
+      List<Map<String, String>> transactionalListenerResponsesForDropPartition 
=
+          Lists.newArrayListWithCapacity(partitionsToExchange.size());
+
       try {
         for (Partition partition: partitionsToExchange) {
           Partition destPartition = new Partition(partition);
@@ -2968,8 +2998,22 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, 
rollback happens.
         success = false;
-        fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
-            destinationTable, destPartitions, transactionalListeners, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponsesForAddPartition =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new 
AddPartitionEvent(destinationTable, destPartitions, true, this));
+
+          for (Partition partition : partitionsToExchange) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(sourceTable, partition, true, true, 
this);
+            transactionalListenerResponsesForDropPartition.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      dropPartitionEvent));
+          }
+        }
 
         success = ms.commitTransaction();
         return destPartitions;
@@ -2979,34 +3023,31 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (pathCreated) {
             wh.renameDir(destPath, sourcePath);
           }
-
-          fireMetaStoreExchangePartitionEvent(sourceTable, 
partitionsToExchange,
-              destinationTable, destPartitions, listeners, success);
         }
-      }
-    }
 
-    private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
-        List<Partition> partitionsToExchange, Table destinationTable,
-        List<Partition> destPartitions,
-        List<MetaStoreEventListener> eventListeners,
-        boolean status) throws MetaException {
-      if (sourceTable != null && destinationTable != null
-          && !CollectionUtils.isEmpty(partitionsToExchange)
-          && !CollectionUtils.isEmpty(destPartitions)) {
-        if (eventListeners.size() > 0) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(destinationTable, destPartitions, status, 
this);
-          for (MetaStoreEventListener eventListener : eventListeners) {
-            eventListener.onAddPartition(addPartitionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          AddPartitionEvent addPartitionEvent = new 
AddPartitionEvent(destinationTable, destPartitions, success, this);
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                addPartitionEvent,
+                                                null,
+                                                
transactionalListenerResponsesForAddPartition);
 
+          i = 0;
           for (Partition partition : partitionsToExchange) {
             DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(sourceTable, partition, true, status, 
this);
-            for (MetaStoreEventListener eventListener : eventListeners) {
-              eventListener.onDropPartition(dropPartitionEvent);
-            }
+                new DropPartitionEvent(sourceTable, partition, success, true, 
this);
+            Map<String, String> parameters =
+                (transactionalListenerResponsesForDropPartition.size() > i)
+                    ? transactionalListenerResponsesForDropPartition.get(i)
+                    : null;
+
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.DROP_PARTITION,
+                                                  dropPartitionEvent,
+                                                  null,
+                                                  parameters);
+            i++;
           }
         }
       }
@@ -3024,6 +3065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path archiveParentDir = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
 
       try {
         ms.openTransaction();
@@ -3056,13 +3098,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
           throw new MetaException("Unable to drop partition");
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new 
DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -3090,11 +3132,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             // ok even if the data is not deleted
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropPartitionEvent dropPartitionEvent =
-            new DropPartitionEvent(tbl, part, success, deleteData, this);
-          dropPartitionEvent.setEnvironmentContext(envContext);
-          listener.onDropPartition(dropPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_PARTITION,
+                                                new DropPartitionEvent(tbl, 
part, success, deleteData, this),
+                                                envContext,
+                                                
transactionalListenerResponses);
         }
       }
       return true;
@@ -3156,6 +3199,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> parts = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      List<Map<String, String>> transactionalListenerResponses = 
Lists.newArrayList();
+
       try {
         // We need Partition-s for firing events and for result; DN needs 
MPartition-s to drop.
         // Great... Maybe we could bypass fetching MPartitions by issuing 
direct SQL deletes.
@@ -3239,14 +3284,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         ms.dropPartitions(dbName, tblName, partNames);
-        if (parts != null && transactionalListeners.size() > 0) {
+        if (parts != null && !transactionalListeners.isEmpty()) {
           for (Partition part : parts) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+            transactionalListenerResponses.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new 
DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext));
           }
         }
 
@@ -3280,12 +3324,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
         if (parts != null) {
-          for (Partition part : parts) {
-            for (MetaStoreEventListener listener : listeners) {
-              DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, success, deleteData, this);
-              dropPartitionEvent.setEnvironmentContext(envContext);
-              listener.onDropPartition(dropPartitionEvent);
+          int i = 0;
+          if (parts != null && !listeners.isEmpty()) {
+            for (Partition part : parts) {
+              Map<String, String> parameters =
+                  (!transactionalListenerResponses.isEmpty()) ? 
transactionalListenerResponses.get(i) : null;
+
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.DROP_PARTITION,
+                                                    new 
DropPartitionEvent(tbl, part, success, deleteData, this),
+                                                    envContext,
+                                                    parameters);
+
+              i++;
             }
           }
         }
@@ -3720,14 +3771,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Only fetch the table if we actually have a listener
         Table table = null;
-        for (MetaStoreEventListener listener : listeners) {
+        if (!listeners.isEmpty()) {
           if (table == null) {
             table = getMS().getTable(db_name, tbl_name);
           }
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, new_part, table, true, this);
-          alterPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAlterPartition(alterPartitionEvent);
+
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_PARTITION,
+                                                new 
AlterPartitionEvent(oldPart, new_part, table, true, this),
+                                                envContext);
         }
       } catch (InvalidObjectException e) {
         ex = e;
@@ -3791,13 +3843,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           else {
             throw new InvalidOperationException("failed to alterpartitions");
           }
-          for (MetaStoreEventListener listener : listeners) {
-            if (table == null) {
-              table = getMS().getTable(db_name, tbl_name);
-            }
-            AlterPartitionEvent alterPartitionEvent =
-                new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, 
this);
-            listener.onAlterPartition(alterPartitionEvent);
+
+          if (table == null) {
+            table = getMS().getTable(db_name, tbl_name);
+          }
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ALTER_PARTITION,
+                                                  new 
AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this));
           }
         }
       } catch (InvalidObjectException e) {
@@ -3834,16 +3888,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       Index oldIndex = null;
       RawStore ms  = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         oldIndex = get_index_by_name(dbname, base_table_name, index_name);
         firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
         ms.alterIndex(dbname, base_table_name, index_name, newIndex);
-        if (transactionalListeners.size() > 0) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, 
newIndex, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onAlterIndex(alterIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ALTER_INDEX,
+                                                    new 
AlterIndexEvent(oldIndex, newIndex, true, this));
         }
 
         success = ms.commitTransaction();
@@ -3865,9 +3920,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         endFunction("alter_index", success, ex, base_table_name);
-        for (MetaStoreEventListener listener : listeners) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, 
newIndex, success, this);
-          listener.onAlterIndex(alterIndexEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_INDEX,
+                                                new AlterIndexEvent(oldIndex, 
newIndex, success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -3935,11 +3994,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
                 envContext, this);
         success = true;
-        for (MetaStoreEventListener listener : listeners) {
-          AlterTableEvent alterTableEvent =
-              new AlterTableEvent(oldt, newTable, success, this);
-          alterTableEvent.setEnvironmentContext(envContext);
-          listener.onAlterTable(alterTableEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_TABLE,
+                                                new AlterTableEvent(oldt, 
newTable, true, this),
+                                                envContext);
         }
       } catch (NoSuchObjectException e) {
         // thrown when the table to be altered does not exist
@@ -4506,6 +4565,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, indexTableCreated = false;
       String[] qualified =
           MetaStoreUtils.getQualifiedName(index.getDbName(), 
index.getIndexTableName());
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         firePreEvent(new PreAddIndexEvent(index, this));
@@ -4543,11 +4603,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         index.setCreateTime((int) time);
         index.putToParameters(hive_metastoreConstants.DDL_TIME, 
Long.toString(time));
         if (ms.addIndex(index)) {
-          if (transactionalListeners.size() > 0) {
-            AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
-            for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-              transactionalListener.onAddIndex(addIndexEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.CREATE_INDEX,
+                                                      new AddIndexEvent(index, 
true, this));
           }
         }
 
@@ -4564,9 +4624,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, 
this);
-          listener.onAddIndex(addIndexEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_INDEX,
+                                                new AddIndexEvent(index, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -4604,6 +4667,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Index index = null;
       Path tblPath = null;
       List<Path> partPaths = null;
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop the underlying index table
@@ -4636,11 +4700,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        if (transactionalListeners.size() > 0) {
-          DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, 
this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onDropIndex(dropIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_INDEX,
+                                                    new DropIndexEvent(index, 
true, this));
         }
 
         success = ms.commitTransaction();
@@ -4653,11 +4717,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           // ok even if the data is not deleted
         }
         // Skip the event listeners if the index is NULL
-        if (index != null) {
-          for (MetaStoreEventListener listener : listeners) {
-            DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, 
this);
-            listener.onDropIndex(dropIndexEvent);
-          }
+        if (index != null && !listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_INDEX,
+                                                new DropIndexEvent(index, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
       return success;
@@ -6093,6 +6158,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       validateFunctionInfo(func);
       boolean success = false;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         Database db = ms.getDatabase(func.getDbName());
@@ -6109,11 +6175,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         long time = System.currentTimeMillis() / 1000;
         func.setCreateTime((int) time);
         ms.createFunction(func);
-        if (transactionalListeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new 
CreateFunctionEvent(func, true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onCreateFunction(createFunctionEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_FUNCTION,
+                                                    new 
CreateFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6122,11 +6188,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        if (listeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new 
CreateFunctionEvent(func, success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onCreateFunction(createFunctionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_FUNCTION,
+                                                new CreateFunctionEvent(func, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -6138,6 +6205,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false;
       Function func = null;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = 
Collections.emptyMap();
       try {
         ms.openTransaction();
         func = ms.getFunction(dbName, funcName);
@@ -6147,10 +6215,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.dropFunction(dbName, funcName);
         if (transactionalListeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, 
true, this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onDropFunction(dropFunctionEvent);
-          }
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_FUNCTION,
+                                                    new 
DropFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6160,10 +6228,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (listeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, 
success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onDropFunction(dropFunctionEvent);
-          }
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_FUNCTION,
+                                                new DropFunctionEvent(func, 
success, this),
+                                                null,
+                                                
transactionalListenerResponses);
         }
       }
     }
@@ -6530,13 +6599,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         InsertEvent event =
             new InsertEvent(rqst.getDbName(), rqst.getTableName(), 
rqst.getPartitionVals(), rqst
                 .getData().getInsertData(), rqst.isSuccessful(), this);
-        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-          transactionalListener.onInsert(event);
-        }
 
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onInsert(event);
-        }
+        /*
+         * The transactional listener response will be set already on the 
event, so there is not need
+         * to pass the response to the non-transactional listener.
+         */
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners, 
EventType.INSERT, event);
+        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, 
event);
 
         return new FireEventResponse();
 

Reply via email to