This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 389fb169d2782729aa48b820f3cd5bbfcb5f8dd8
Author: Vihang Karajgaonkar <vih...@cloudera.com>
AuthorDate: Thu Dec 20 15:55:04 2018 -0800

    IMPALA-7970 : Add support for metastore event based automatic invalidate
    
    This change adds support to CatalogD to poll metastore events to issue
    invalidate on tables automatically. It adds basic infrastructure to poll
    HMS notifications events at a configurable frequency using a backend
    config called hms_event_polling_interval_s flag. Currently, it issues
    invalidate at tables when it received alter events on table and
    partitions. It also adds tables/databases and removes tables from
    catalogD when it receives create_table/create_database and
    drop_table/drop_database events. The default value of
    hms_event_polling_interval_s is 0 which disables the feature. A
    non-zero value in seconds of this configuration can be used to enable
    the feature and set the polling frequency.
    
    In order to process each event atomically, this feature relies on
    version lock in CatalogServiceCatalog. It adds new methods in
    CatalogServiceCatalog which takes a write lock on version so that
    readers are blocked until the catalog state is updated based on the
    events. In case of processing events, the metastore operation is already
    completed and only catalog state needs to be updated. Hence we do not
    need to make new metastore calls while processing the events and only
    version lock is sufficient to serialize updates to the catalog objects
    based on events. This locking protocol is similar to what is done in
    case of DDL processing in CatalogOpExecutor except it does not need to
    take metastoreDdlLock since no metastore operations are needed during
    event processing.
    
    The change also adds a new test class to test the basic functionality
    for each of the event type which is supported.
    
    Note that this feature is still a work in progress and additional
    improvements will be done in subsequent patches. By default the feature
    is turned off.
    
    Change-Id: Ic70b27780560b7ac9b33418d132b36cd0ca4abf7
    Reviewed-on: http://gerrit.cloudera.org:8080/12118
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/common/global-flags.cc                      |  14 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../impala/catalog/CatalogServiceCatalog.java      | 208 ++++-
 .../catalog/events/ExternalEventsProcessor.java    |  54 ++
 .../impala/catalog/events/MetastoreEventUtils.java | 697 +++++++++++++++
 .../catalog/events/MetastoreEventsProcessor.java   | 419 +++++++++
 .../events/MetastoreNotificationException.java     |  35 +
 .../MetastoreNotificationFetchException.java       |  35 +
 .../impala/catalog/events/NoOpEventProcessor.java  |  62 ++
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../events/MetastoreEventsProcessorTest.java       | 977 +++++++++++++++++++++
 .../SynchronousHMSEventProcessorForTests.java      |  36 +
 .../resources/postgresql-hive-site.xml.template    |  11 +
 14 files changed, 2553 insertions(+), 3 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0b83acd..6620bdc 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -242,6 +242,20 @@ DEFINE_bool(invalidate_tables_on_memory_pressure, false, 
"Configure catalogd to
     "invalidate_table_timeout_s. To enable this feature, a true flag must be 
applied to "
     "both catalogd and impalad.");
 
+DEFINE_int32(hms_event_polling_interval_s, 0,
+    "Configure catalogd to invalidate cached table metadata based on metastore 
events. "
+    "These metastore events could be generated by external systems like Apache 
Hive or "
+    "a different Impala cluster using the same Hive metastore server as this 
one. "
+    "A non-zero value of this flag sets the polling interval of catalogd in 
seconds to "
+    "fetch new metastore events. A value of zero disables this feature. When 
enabled, "
+    "this flag has the same effect as \"INVALIDATE METADATA\" statement on the 
table "
+    "for certain metastore event types. Additionally, in case of events which 
detect "
+    "creation or removal of objects from metastore, catalogd adds or removes 
such "
+    "objects from its cached metadata. This feature is independent of time and 
memory "
+    "based automatic invalidation of tables. Note that this is still an 
experimental "
+    "feature and not recommended to be deployed on production systems until it 
is "
+    "made generally available.");
+
 DEFINE_double_hidden(invalidate_tables_gc_old_gen_full_threshold, 0.6, "The 
threshold "
     "above which CatalogdTableInvalidator would consider the old generation to 
be almost "
     "full and trigger an invalidation on recently unused tables");
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index f78b317..b02c70a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -70,6 +70,7 @@ DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 DECLARE_int64(exchg_node_buffer_size_bytes);
 DECLARE_int32(kudu_mutation_buffer_size);
 DECLARE_int32(kudu_error_buffer_size);
+DECLARE_int32(hms_event_polling_interval_s);
 
 namespace impala {
 
@@ -139,6 +140,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
       FLAGS_exchg_node_buffer_size_bytes);
   cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
   cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
+  cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index a56b260..fe724c2 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -113,4 +113,6 @@ struct TBackendGflags {
   43: required i32 kudu_mutation_buffer_size
 
   44: required i32 kudu_error_buffer_size
+
+  45: required i32 hms_event_polling_interval_s
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d9aa2a9..d6d375f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -38,10 +38,14 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.ExternalEventsProcessor;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor;
+import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -225,6 +229,9 @@ public class CatalogServiceCatalog extends Catalog {
 
   private CatalogdTableInvalidator catalogdTableInvalidator_;
 
+  // Manages the event processing from metastore for issuing invalidates on 
tables
+  private ExternalEventsProcessor metastoreEventProcessor_;
+
   /**
    * See the gflag definition in be/.../catalog-server.cc for details on these 
modes.
    */
@@ -281,7 +288,39 @@ public class CatalogServiceCatalog extends Catalog {
         
BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
     catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
         BackendConfig.INSTANCE);
+    metastoreEventProcessor_ = getEventsProcessor();
     Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
+    // start polling for metastore events
+    metastoreEventProcessor_.start();
+  }
+
+  /**
+   * Returns a Metastore event processor object if
+   * <code>BackendConfig#getHMSPollingIntervalInSeconds</code> returns a 
non-zero
+   *.value of polling interval. Otherwise, returns a no-op events processor. 
It is
+   * important to fetch the current notification event id at the Catalog 
service
+   * initialization time so that event processor starts to sync at the event id
+   * corresponding to the catalog start time.
+   */
+  private ExternalEventsProcessor getEventsProcessor() throws ImpalaException {
+    long eventPollingInterval = 
BackendConfig.INSTANCE.getHMSPollingIntervalInSeconds();
+    if (eventPollingInterval <= 0) {
+      LOG.info(String
+          .format("Metastore event processing is disabled. Event polling 
interval is %d",
+              eventPollingInterval));
+      return NoOpEventProcessor.getInstance();
+    }
+    try (MetaStoreClient metaStoreClient = getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationId =
+          metaStoreClient.getHiveClient().getCurrentNotificationEventId();
+      return MetastoreEventsProcessor.getInstance(
+          this, currentNotificationId.getEventId(), eventPollingInterval);
+    } catch (TException e) {
+      LOG.error("Unable to fetch the current notification event id from 
metastore."
+          + "Metastore event processing will be disabled.", e);
+      throw new CatalogException(
+          "Fatal error while initializing metastore event processor", e);
+    }
   }
 
   // Timeout for acquiring a table lock
@@ -1162,10 +1201,22 @@ public class CatalogServiceCatalog extends Catalog {
     refreshAuthorization(true, /*catalog objects added*/ new ArrayList<>(),
         /*catalog objects removed*/ new ArrayList<>());
 
+    // Even though we get the current notification event id before stopping 
the event
+    // processing here there is a small window of time where we could 
re-process some of
+    // the event ids, if there is external DDL activity on metastore during 
reset.
+    // Unfortunately, there is no good way to avoid this since HMS does not 
provide
+    // APIs which can fetch all the tables/databases at a given id. It is OKAY 
to
+    // re-process some of these events since event processor relies on 
creationTime of
+    // the objects to uniquely identify tables from create and drop events. In 
case of
+    // alter events, however it is likely that some tables would be 
unnecessarily
+    // invalidated. That would happen when during reset, there were external 
alter events
+    // and by the time we processed them, Catalog had already loaded them.
+    long currentEventId = metastoreEventProcessor_.getCurrentEventId();
+    // stop the event processing since the cache is anyways being cleared
+    metastoreEventProcessor_.stop();
     // Update the HDFS cache pools
     CachePoolReader reader = new CachePoolReader(true);
     reader.run();
-
     versionLock_.writeLock().lock();
     // In case of an empty new catalog, the version should still change to 
reflect the
     // reset operation itself and to unblock impalads by making the catalog 
version >
@@ -1217,6 +1268,8 @@ public class CatalogServiceCatalog extends Catalog {
       throw new CatalogException("Error initializing Catalog. Catalog may be 
empty.", e);
     } finally {
       versionLock_.writeLock().unlock();
+      // restart the event processing for id just before the reset
+      metastoreEventProcessor_.start(currentEventId);
     }
     LOG.info("Invalidated all metadata.");
     return currentCatalogVersion;
@@ -1239,6 +1292,25 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Adds a database name to the metadata cache if not exists and returns the
+   * true is a new Db Object was added. Used by MetastoreEventProcessor to 
handle
+   * CREATE_DATABASE events
+   */
+  public boolean addDbIfNotExists(
+      String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) {
+        return addDb(dbName, msDb) != null;
+      }
+      return false;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Removes a database from the metadata cache and returns the removed 
database,
    * or null if the database did not exist in the cache.
    * Used by DROP DATABASE statements.
@@ -1278,6 +1350,31 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Adds table with the given db and table name to the catalog if it does not 
exists.
+   * @return true if the table was successfully added and false if the table 
already
+   * exists
+   * @throws CatalogException if the db is not found
+   */
+  public boolean addTableIfNotExists(String dbName, String tblName)
+      throws CatalogException {
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) {
+        throw new CatalogException(String.format("Db %s does not exist", 
dbName));
+      }
+      Table existingTable = db.getTable(tblName);
+      if (existingTable != null) return false;
+      Table incompleteTable = IncompleteTable.createUninitializedTable(db, 
tblName);
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+      return true;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Adds a table with the given name to the catalog and returns the new table,
    * loading the metadata if needed.
    */
@@ -1359,6 +1456,50 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Remove a catalog table based on the given metastore table if it exists 
and its
+   * createTime matches with the metastore table
+   *
+   * @param msTable Metastore table to be used to remove Table
+   * @param tblWasfound is set to true if the table was found in the catalog
+   * @param tblMatched is set to true if the table is found and it matched 
with the
+   * createTime of the cached metastore table in catalog or if the existing 
table is a
+   * incomplete table
+   * @return Removed table object. Return null if the table was not removed
+   */
+  public Table removeTableIfExists(org.apache.hadoop.hive.metastore.api.Table 
msTable,
+      Reference<Boolean> tblWasfound, Reference<Boolean> tblMatched) {
+    tblWasfound.setRef(false);
+    tblMatched.setRef(false);
+    // make sure that the createTime of the input table is valid
+    Preconditions.checkState(msTable.getCreateTime() > 0);
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(msTable.getDbName());
+      if (db == null) return null;
+
+      Table tblToBeRemoved = db.getTable(msTable.getTableName());
+      if (tblToBeRemoved == null) return null;
+
+      tblWasfound.setRef(true);
+      // make sure that you are removing the same instance of the table object 
which
+      // is given by comparing the metastore createTime. In case the found 
table is a
+      // Incomplete table remove it
+      if (tblToBeRemoved instanceof IncompleteTable
+          || (msTable.getCreateTime()
+                 == tblToBeRemoved.getMetaStoreTable().getCreateTime())) {
+        tblMatched.setRef(true);
+        Table removedTbl = db.removeTable(tblToBeRemoved.getName());
+        removedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedTbl.toMinimalTCatalogObject());
+        return removedTbl;
+      }
+      return null;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Removes a table from the catalog and increments the catalog version.
    * Returns the removed Table, or null if the table or db does not exist.
    */
@@ -1462,8 +1603,8 @@ public class CatalogServiceCatalog extends Catalog {
    * 3. T_old, null: Old table was removed but new table was not added.
    * 4. T_old, T_new: Old table was removed and new table was added.
    */
-  public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName 
newTableName)
-      throws CatalogException {
+  public Pair<Table, Table> renameTable(
+      TTableName oldTableName, TTableName newTableName) {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
     if (db == null) return null;
@@ -1480,6 +1621,36 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Renames the table by atomically removing oldTable and adding the 
newTable. If the
+   * oldTable is not found this operation becomes a add new table if not exists
+   * operation.
+   *
+   * @return a pair of booleans. The first of the pair is set if the 
oldTableName was
+   *     found and removed. The second boolean is set if the new table didn't 
exist before
+   *     and hence was added.
+   */
+  public Pair<Boolean, Boolean> renameOrAddTableIfNotExists(TTableName 
oldTableName,
+      TTableName newTableName)
+      throws CatalogException {
+    boolean oldTableRemoved = false;
+    boolean newTableAdded;
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(oldTableName.db_name);
+      if (db != null) {
+        // remove the oldTable if it exists
+        oldTableRemoved =
+            removeTable(oldTableName.db_name, oldTableName.table_name) != null;
+      }
+      // add the new tbl if it doesn't exist
+      newTableAdded = addTableIfNotExists(newTableName.db_name, 
newTableName.table_name);
+      return new Pair<>(oldTableRemoved, newTableAdded);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Reloads metadata for table 'tbl' which must not be an IncompleteTable. 
Updates the
    * table metadata in-place by calling load() on the given table. Returns the
    * TCatalogObject representing 'tbl'. Applies proper synchronization to 
protect the
@@ -1650,6 +1821,31 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Invalidate the table if it exists by overwriting existing entry by a 
Incomplete
+   * Table.
+   * @return null if the table does not exist else return the invalidated table
+   */
+  public Table invalidateTableIfExists(String dbName, String tblName) {
+    Table incompleteTable;
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) return null;
+      if (!db.containsTable(tblName)) return null;
+      incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+    if (loadInBackground_) {
+      tableLoadingMgr_.backgroundLoad(
+          new TTableName(dbName.toLowerCase(), tblName.toLowerCase()));
+    }
+    return incompleteTable;
+  }
+
+  /**
    * Adds a new role with the given name and grant groups to the 
AuthorizationPolicy.
    * If a role with the same name already exists it will be overwritten.
    */
@@ -2295,4 +2491,10 @@ public class CatalogServiceCatalog extends Catalog {
   void setCatalogdTableInvalidator(CatalogdTableInvalidator cleaner) {
     catalogdTableInvalidator_ = cleaner;
   }
+
+  @VisibleForTesting
+  public void setMetastoreEventProcessor(
+      ExternalEventsProcessor metastoreEventProcessor) {
+    this.metastoreEventProcessor_ = metastoreEventProcessor;
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
new file mode 100644
index 0000000..eda52b9
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * Interface to process external events
+ */
+public interface ExternalEventsProcessor {
+  /**
+   * Start the event processing. This could also be used to initialize the 
configuration
+   * like polling interval of the event processor
+   */
+  void start();
+
+  /**
+   * Get the current event id on metastore. Useful for restarting the event 
processing
+   * from a given event id
+   */
+  long getCurrentEventId() throws CatalogException;
+
+  /**
+   * Stop the event processing
+   */
+  void stop();
+
+  /**
+   * Starts the event processing from the given eventId. This method can be 
used to jump
+   * ahead in the event processing under certain cases where it is okay skip 
certain
+   * events
+   */
+  void start(long fromEventId);
+
+  /**
+   * Implements the core logic of processing external events
+   */
+  void processEvents();
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java
new file mode 100644
index 0000000..aba698e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java
@@ -0,0 +1,697 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+import 
org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
+import org.apache.impala.thrift.TTableName;
+import org.apache.log4j.Logger;
+
+/**
+ * Util class which provides Metastore event objects for various event types. 
Also
+ * provides a MetastoreEventFactory to get or create the event instances for a 
given event
+ * type
+ */
+public class MetastoreEventUtils {
+
+  public enum MetastoreEventType {
+    CREATE_TABLE("CREATE_TABLE"),
+    DROP_TABLE("DROP_TABLE"),
+    ALTER_TABLE("ALTER_TABLE"),
+    CREATE_DATABASE("CREATE_DATABASE"),
+    DROP_DATABASE("DROP_DATABASE"),
+    ALTER_DATABASE("ALTER_DATABASE"),
+    ADD_PARTITION("ADD_PARTITION"),
+    ALTER_PARTITION("ALTER_PARTITION"),
+    DROP_PARTITION("DROP_PARTITION"),
+    OTHER("OTHER");
+
+    private final String eventType_;
+
+    MetastoreEventType(String msEventType) {
+      this.eventType_ = msEventType;
+    }
+
+    @Override
+    public String toString() {
+      return eventType_;
+    }
+
+    /**
+     * Returns the MetastoreEventType from a given string value of event from 
Metastore's
+     * NotificationEvent.eventType. If none of the supported 
MetastoreEventTypes match,
+     * return OTHER
+     *
+     * @param eventType EventType value from the <code>NotificationEvent</code>
+     */
+    public static MetastoreEventType from(String eventType) {
+      for (MetastoreEventType metastoreEventType : values()) {
+        if (metastoreEventType.eventType_.equalsIgnoreCase(eventType)) {
+          return metastoreEventType;
+        }
+      }
+      return OTHER;
+    }
+  }
+
+  /**
+   * Factory class to create various MetastoreEvents.
+   */
+  public static class MetastoreEventFactory {
+    private static final Logger LOG = 
Logger.getLogger(MetastoreEventFactory.class);
+
+    // catalog service instance to be used for creating eventHandlers
+    private final CatalogServiceCatalog catalog_;
+
+    public MetastoreEventFactory(CatalogServiceCatalog catalog) {
+      this.catalog_ = Preconditions.checkNotNull(catalog);
+    }
+
+    /**
+     * creates instance of <code>MetastoreEvent</code> used to process a given
+     * event type. If the event type is unknown, returns a IgnoredEvent
+     */
+    private MetastoreEvent get(NotificationEvent event)
+        throws MetastoreNotificationException {
+      Preconditions.checkNotNull(event.getEventType());
+      MetastoreEventType metastoreEventType =
+          MetastoreEventType.from(event.getEventType());
+      switch (metastoreEventType) {
+        case CREATE_TABLE:
+          return new CreateTableEvent(catalog_, event);
+        case DROP_TABLE:
+          return new DropTableEvent(catalog_, event);
+        case ALTER_TABLE:
+          return new AlterTableEvent(catalog_, event);
+        case CREATE_DATABASE:
+          return new CreateDatabaseEvent(catalog_, event);
+        case DROP_DATABASE:
+          return new DropDatabaseEvent(catalog_, event);
+        case ALTER_DATABASE:
+          // alter database events are currently ignored
+          return new IgnoredEvent(catalog_, event);
+        case ADD_PARTITION:
+          // add partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        case DROP_PARTITION:
+          // drop partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        case ALTER_PARTITION:
+          // alter partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        default:
+          // ignore all the unknown events by creating a IgnoredEvent
+          return new IgnoredEvent(catalog_, event);
+      }
+    }
+
+    /**
+     * Given a list of notification events, returns a list of 
<code>MetastoreEvent</code>
+     * In case there are create events which are followed by drop events for 
the same
+     * object, the create events are filtered out. The drop events do not need 
to be
+     * filtered out
+     *
+     * This is needed to avoid the replay problem. For example, if catalog 
created and
+     * removed a table, the create event received will try to add the object 
again.
+     * This table will be visible until the drop table event is processed. 
This can be
+     * avoided by "looking ahead" in the event stream to see if the table with 
the same
+     * name was dropped. In such a case, the create event can be ignored
+     *
+     * @param events NotificationEvents fetched from metastore
+     * @return A list of MetastoreEvents corresponding to the given the 
NotificationEvents
+     * @throws MetastoreNotificationException if a NotificationEvent could not 
be
+     * parsed into MetastoreEvent
+     */
+    List<MetastoreEvent> getFilteredEvents(List<NotificationEvent> events)
+        throws MetastoreNotificationException {
+      Preconditions.checkNotNull(events);
+      List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size());
+      for (NotificationEvent event : events) {
+        metastoreEvents.add(get(event));
+      }
+      Iterator<MetastoreEvent> it = metastoreEvents.iterator();
+      // filter out the create events which has a corresponding drop event 
later
+      int fromIndex = 0;
+      int numFilteredEvents = 0;
+      int inputSize = metastoreEvents.size();
+      while (it.hasNext()) {
+        MetastoreEvent current = it.next();
+        if (fromIndex < metastoreEvents.size() && current.isRemovedAfter(
+            metastoreEvents.subList(fromIndex + 1, metastoreEvents.size()))) {
+          LOG.info(current.debugString("Filtering out this event since the 
object is "
+              + "either removed or renamed later in the event stream"));
+          it.remove();
+          numFilteredEvents++;
+        }
+        fromIndex++;
+      }
+      LOG.info(String.format("Total number of events received: %d Total number 
of events "
+          + "filtered out: %d", inputSize, numFilteredEvents));
+      return metastoreEvents;
+    }
+  }
+
+  /**
+   * Abstract base class for all MetastoreEvents. A MetastoreEvent is a object 
used to
+   * process a NotificationEvent received from metastore. It is self-contained 
with all
+   * the information needed to take action on catalog based on a the given
+   * NotificationEvent
+   */
+  public static abstract class MetastoreEvent {
+
+    // CatalogServiceCatalog instance on which the event needs to be acted upon
+    protected final CatalogServiceCatalog catalog_;
+
+    // the notification received from metastore which is processed by this
+    protected final NotificationEvent event_;
+
+    // Logger available for all the sub-classes
+    protected final Logger LOG = Logger.getLogger(this.getClass());
+
+    // dbName from the event
+    protected final String dbName_;
+
+    // eventId of the event. Used instead of calling getter on event_ everytime
+    protected final long eventId_;
+
+    // eventType from the NotificationEvent
+    protected final MetastoreEventType eventType_;
+
+    protected final NotificationEvent metastoreNotificationEvent_;
+
+    MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, 
NotificationEvent event) {
+      this.catalog_ = catalogServiceCatalog;
+      this.event_ = event;
+      this.eventId_ = event_.getEventId();
+      this.eventType_ = MetastoreEventType.from(event.getEventType());
+      LOG.debug(String
+          .format("Creating event %d of type %s on table %s", 
event.getEventId(),
+              event.getEventType(), event.getTableName()));
+      dbName_ = Preconditions.checkNotNull(event.getDbName());
+      metastoreNotificationEvent_ = event;
+    }
+
+    /**
+     * Process the information available in the NotificationEvent to take 
appropriate
+     * action on Catalog
+     *
+     * @throws MetastoreNotificationException in case of event parsing errors 
out
+     * @throws CatalogException in case catalog operations could not be 
performed
+     */
+    abstract void process() throws MetastoreNotificationException, 
CatalogException;
+
+    /**
+     * Helper method to get debug string with helpful event information 
prepended to the
+     * message
+     *
+     * @param msgFormatString String value to be used in String.format() for 
the given
+     *     message
+     * @param args args to the <code>String.format()</code> for the given
+     *     msgFormatString
+     */
+    protected String debugString(String msgFormatString, Object... args) {
+      String formatString =
+          new StringBuilder("EventId: %d EventType: %s 
").append(msgFormatString)
+              .toString();
+      Object[] formatArgs = new Object[args.length + 2];
+      formatArgs[0] = eventId_;
+      formatArgs[1] = eventType_;
+      int i=2;
+      for (Object arg : args) {
+        formatArgs[i] = arg;
+        i++;
+      }
+      return String.format(formatString, formatArgs);
+    }
+
+    /**
+     * Search for a inverse event (for example drop_table is a inverse event 
for
+     * create_table) for this event from a given list of notificationEvents 
starting
+     * for the startIndex. This is useful for skipping certain events from 
processing
+     *
+     * @param events List of NotificationEvents to be searched
+     * @return true if the object is removed after this event, else false
+     */
+    protected boolean isRemovedAfter(List<MetastoreEvent> events) {
+      return false;
+    }
+  }
+
+  /**
+   * Base class for all the table events
+   */
+  public static abstract class MetastoreTableEvent extends MetastoreEvent {
+
+    // tblName from the event
+    protected final String tblName_;
+
+    private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
+        NotificationEvent event) {
+      super(catalogServiceCatalog, event);
+      tblName_ = Preconditions.checkNotNull(event.getTableName());
+    }
+
+
+    /**
+     * Util method to return the fully qualified table name which is of the 
format
+     * dbName.tblName for this event
+     */
+    protected String getFullyQualifiedTblName() {
+      return new TableName(dbName_, tblName_).toString();
+    }
+
+    /**
+     * Util method to issue invalidate on a given table on the catalog. This 
method
+     * atomically invalidates the table if it exists in the catalog. No-op if 
the table
+     * does not exist
+     */
+    protected boolean invalidateCatalogTable() {
+      return catalog_.invalidateTableIfExists(dbName_, tblName_) != null;
+    }
+  }
+
+  /**
+   * Base class for all the database events
+   */
+  public static abstract class MetastoreDatabaseEvent extends MetastoreEvent {
+
+    MetastoreDatabaseEvent(CatalogServiceCatalog catalogServiceCatalog,
+        NotificationEvent event) {
+      super(catalogServiceCatalog, event);
+    }
+  }
+
+  /**
+   * MetastoreEvent for CREATE_TABLE event type
+   */
+  private static class CreateTableEvent extends MetastoreTableEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private CreateTableEvent(CatalogServiceCatalog catalog, NotificationEvent 
event) {
+      super(catalog, event);
+      
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
+    }
+
+    /**
+     * If the table provided in the catalog does not exist in the catalog, 
this method
+     * will create it. If the table in the catalog already exists, it relies 
of the
+     * creationTime of the Metastore Table to resolve the conflict. If the 
catalog table's
+     * creation time is less than creationTime of the table from the event, it 
will be
+     * overridden. Else, it will ignore the event
+     */
+    @Override
+    public void process() throws MetastoreNotificationException {
+      // check if the table exists already. This could happen in corner cases 
of the
+      // table being dropped and recreated with the same name or in case this 
event is
+      // a self-event (see description of self-event in the class 
documentation of
+      // MetastoreEventsProcessor)
+      boolean tableAdded;
+      try {
+        tableAdded = catalog_.addTableIfNotExists(dbName_, tblName_);
+      } catch (CatalogException e) {
+        // if a exception is thrown, it could be due to the fact that the db 
did not
+        // exist in the catalog cache. This could only happen if the previous
+        // create_database event for this table errored out
+        throw new MetastoreNotificationException(debugString(
+            "Unable to add table while processing for table %s because the "
+                + "database doesn't exist. This could be due to a previous 
error while "
+                + "processing CREATE_DATABASE event for the database %s",
+            getFullyQualifiedTblName(), dbName_), e);
+      }
+      if (!tableAdded) {
+        LOG.debug(
+            debugString("Not adding the table %s since it already exists in 
catalog",
+                tblName_));
+        return;
+      }
+      LOG.info(debugString("Added a table %s", getFullyQualifiedTblName()));
+    }
+
+    @Override
+    public boolean isRemovedAfter(List<MetastoreEvent> events) {
+      Preconditions.checkNotNull(events);
+      for (MetastoreEvent event : events) {
+        if (event.eventType_.equals(MetastoreEventType.DROP_TABLE)) {
+          DropTableEvent dropTableEvent = (DropTableEvent) event;
+          if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
+              .equalsIgnoreCase(dropTableEvent.tblName_)) {
+            LOG.info(debugString("Found table %s is removed later in event %d 
type %s",
+                tblName_, dropTableEvent.eventId_, dropTableEvent.eventType_));
+            return true;
+          }
+        } else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
+          // renames are implemented as a atomic (drop+create) so rename 
events can
+          // also be treated as a inverse event of the create_table event. 
Consider a
+          // DDL op sequence like create table, alter table rename from 
impala. Since
+          // the rename operation is internally implemented as drop+add, 
processing a
+          // create table event on this cluster will show up the table for 
small window
+          // of time, until the actual rename event is processed. If however, 
we ignore
+          // the create table event, the alter rename event just becomes a 
addIfNotExists
+          // event which is valid for both a self-event and external event 
cases
+          AlterTableEvent alterTableEvent = (AlterTableEvent) event;
+          if (alterTableEvent.isRename_ && dbName_
+              .equalsIgnoreCase(alterTableEvent.tableBefore_.getDbName()) && 
tblName_
+              .equalsIgnoreCase(alterTableEvent.tableBefore_.getTableName())) {
+            LOG.info(debugString("Found table %s is renamed later in event %d 
type %s",
+                tblName_, alterTableEvent.eventId_, 
alterTableEvent.eventType_));
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * MetastoreEvent for ALTER_TABLE event type
+   */
+  private static class AlterTableEvent extends MetastoreTableEvent {
+
+    // the table object before alter operation, as parsed from the 
NotificationEvent
+    private final org.apache.hadoop.hive.metastore.api.Table tableBefore_;
+    // the table object after alter operation, as parsed from the 
NotificationEvent
+    private final org.apache.hadoop.hive.metastore.api.Table tableAfter_;
+    // true if this alter event was due to a rename operation
+    private final boolean isRename_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private AlterTableEvent(CatalogServiceCatalog catalog, NotificationEvent 
event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      
Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
+      JSONAlterTableMessage alterTableMessage =
+          (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getAlterTableMessage(event.getMessage());
+      try {
+        tableBefore_ = 
Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+        tableAfter_ = 
Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(
+            debugString("Unable to parse the alter table message"), e);
+      }
+      // this is a rename event if either dbName or tblName of before and 
after object
+      // changed
+      isRename_ = 
!tableBefore_.getDbName().equalsIgnoreCase(tableAfter_.getDbName())
+          || 
!tableBefore_.getTableName().equalsIgnoreCase(tableAfter_.getTableName());
+    }
+
+    /**
+     * If the ALTER_TABLE event is due a table rename, this method removes the 
old table
+     * and creates a new table with the new name. Else, this just issues a 
invalidate
+     * table on the tblName from the event//TODO Check if we can rename the 
existing table
+     * in-place
+     */
+    @Override
+    public void process() throws MetastoreNotificationException {
+      // in case of table level alters from external systems it is better to 
do a full
+      // invalidate  eg. this could be due to as simple as adding a new 
parameter or a
+      // full blown adding  or changing column type
+      // detect the special where a table is renamed
+      try {
+        if (!isRename_) {
+          // table is not renamed, need to invalidate
+          if (!invalidateCatalogTable()) {
+            LOG.debug(debugString("Table %s does not need to be "
+                    + "invalidated since it does not exist anymore",
+                getFullyQualifiedTblName()));
+          } else {
+            LOG.info(debugString("Table %s is invalidated", 
getFullyQualifiedTblName()));
+          }
+          return;
+        }
+        // table was renamed, remove the old table
+        LOG.info(debugString("Found that %s table was renamed. Renaming it by "
+                + "remove and adding a new table", new 
TableName(tableBefore_.getDbName(),
+            tableBefore_.getTableName())));
+        TTableName oldTTableName =
+            new TTableName(tableBefore_.getDbName(), 
tableBefore_.getTableName());
+        TTableName newTTableName =
+            new TTableName(tableAfter_.getDbName(), 
tableAfter_.getTableName());
+
+        // atomically rename the old table to new table
+        Pair<Boolean, Boolean> result =
+            catalog_.renameOrAddTableIfNotExists(oldTTableName, newTTableName);
+
+        // old table was not found. This could be because catalogD is stale 
and didn't
+        // have any entry for the oldTable
+        if (!result.first) {
+          LOG.debug(debugString("Did not remove old table to rename table %s 
to %s since "
+                  + "it does not exist anymore", qualify(oldTTableName),
+              qualify(newTTableName)));
+        }
+        // the new table from the event was not added since it was already 
present
+        if (!result.second) {
+          LOG.debug(
+              debugString("Did not add new table name while renaming table %s 
to %s",
+                  qualify(oldTTableName), qualify(newTTableName)));
+        }
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(e);
+      }
+    }
+
+    private String qualify(TTableName tTableName) {
+      return new TableName(tTableName.db_name, 
tTableName.table_name).toString();
+    }
+  }
+
+  /**
+   * MetastoreEvent for the DROP_TABLE event type
+   */
+  private static class DropTableEvent extends MetastoreTableEvent {
+
+    // the metastore table object as parsed from the drop table event
+    private final org.apache.hadoop.hive.metastore.api.Table droppedTable_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private DropTableEvent(CatalogServiceCatalog catalog, NotificationEvent 
event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
+      JSONDropTableMessage dropTableMessage =
+          (JSONDropTableMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getDropTableMessage(event.getMessage());
+      try {
+        droppedTable_ = 
Preconditions.checkNotNull(dropTableMessage.getTableObj());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString(
+            "Could not parse event message. "
+                + "Check if %s is set to true in metastore configuration",
+            
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+      }
+    }
+
+    /**
+     * Process the drop table event type. If the table from the event doesn't 
exist in the
+     * catalog, ignore the event. If the table exists in the catalog, compares 
the
+     * createTime of the table in catalog with the createTime of the table 
from the event
+     * and remove the catalog table if there is a match. If the catalog table 
is a
+     * incomplete table it is removed as well.
+     */
+    @Override
+    public void process() {
+      Reference<Boolean> tblWasFound = new Reference<>();
+      Reference<Boolean> tblMatched = new Reference<>();
+      Table removedTable =
+          catalog_.removeTableIfExists(droppedTable_, tblWasFound, tblMatched);
+      if (removedTable != null) {
+        LOG.info(debugString("Removed table %s ", getFullyQualifiedTblName()));
+      } else if (!tblMatched.getRef()) {
+        LOG.warn(debugString("Table %s was not removed from "
+            + "catalog since the creation time of the table did not match", 
tblName_));
+      } else if (!tblWasFound.getRef()) {
+        LOG.debug(
+            debugString("Table %s was not removed since it did not exist in 
catalog.",
+                tblName_));
+      }
+    }
+  }
+
+  /**
+   * MetastoreEvent for CREATE_DATABASE event type
+   */
+  private static class CreateDatabaseEvent extends MetastoreDatabaseEvent {
+
+    // metastore database object as parsed from NotificationEvent message
+    private final Database createdDatabase_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private CreateDatabaseEvent(CatalogServiceCatalog catalog, 
NotificationEvent event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      
Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
+      JSONCreateDatabaseMessage createDatabaseMessage =
+          (JSONCreateDatabaseMessage) 
MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getCreateDatabaseMessage(event.getMessage());
+      try {
+        createdDatabase_ =
+            
Preconditions.checkNotNull(createDatabaseMessage.getDatabaseObject());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString(
+            "Database object is null in the event. "
+                + "This could be a metastore configuration problem. "
+                + "Check if %s is set to true in metastore configuration",
+            
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+      }
+    }
+
+    /**
+     * Processes the create database event by adding the Db object from the 
event if it
+     * does not exist in the catalog already. TODO we should compare the 
creationTime of
+     * the Database in catalog with the Database in the event to make sure we 
are ignoring
+     * only catalog has the latest Database object. This will be added after 
HIVE-21077 is
+     * fixed and available
+     */
+    @Override
+    public void process() {
+      // if the database already exists in catalog, by definition, it is a 
later version
+      // of the database since metastore will not allow it be created if it 
was already
+      // existing at the time of creation. In such case, it is safe to assume 
that the
+      // already existing database in catalog is a later version with the same 
name and
+      // this event can be ignored
+      if (catalog_.addDbIfNotExists(dbName_, createdDatabase_)) {
+        LOG.info(debugString("Successfully added database %s", dbName_));
+      } else {
+        LOG.info(debugString("Database %s already exists", dbName_));
+      }
+    }
+
+    @Override
+    public boolean isRemovedAfter(List<MetastoreEvent> events) {
+      Preconditions.checkNotNull(events);
+      for (MetastoreEvent event : events) {
+        if (event.eventType_.equals(MetastoreEventType.DROP_DATABASE)) {
+          DropDatabaseEvent dropDatabaseEvent = (DropDatabaseEvent) event;
+          if (dbName_.equalsIgnoreCase(dropDatabaseEvent.dbName_)) {
+            LOG.info(debugString(
+                "Found database %s is removed later in event %d of " + "type 
%s ",
+                dbName_, dropDatabaseEvent.eventId_, 
dropDatabaseEvent.eventType_));
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * MetastoreEvent for the DROP_DATABASE event
+   */
+  private static class DropDatabaseEvent extends MetastoreDatabaseEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private DropDatabaseEvent(CatalogServiceCatalog catalog, NotificationEvent 
event) {
+      super(catalog, event);
+    }
+
+    /**
+     * Process the drop database event. Currently, this handler removes the db 
object from
+     * catalog. TODO Once we have HIVE-21077 we should compare creationTime to 
make sure
+     * that catalog's Db matches with the database object in the event
+     */
+    @Override
+    public void process() {
+      // TODO this does not currently handle the case where the was a new 
instance
+      // of database with the same name created in catalog after this database 
instance
+      // was removed. For instance, user does a CREATE db, drop db and create 
db again
+      // with the same dbName. In this case, the drop database event will 
remove the
+      // database instance which is created after this create event. We should 
add a
+      // check to compare the creation time of the database with the creation 
time in
+      // the event to make sure we are removing the right databases object. 
Unfortunately,
+      // database do not have creation time currently. This would be fixed in 
HIVE-21077
+      Db removedDb = catalog_.removeDb(dbName_);
+      // if database did not exist in the cache there was nothing to do
+      if (removedDb != null) {
+        LOG.info(debugString("Successfully removed database %s", dbName_));
+      }
+    }
+  }
+
+  /**
+   * MetastoreEvent for which issues invalidate on a table from the event
+   */
+  private static class TableInvalidatingEvent extends MetastoreTableEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private TableInvalidatingEvent(CatalogServiceCatalog catalog,
+        NotificationEvent event) {
+      super(catalog, event);
+    }
+
+    /**
+     * Issues a invalidate table on the catalog on the table from the event. 
This
+     * invalidate does not fetch information from metastore unlike the 
invalidate metadata
+     * command since event is triggered post-metastore activity. This handler 
invalidates
+     * by atomically removing existing loaded table and replacing it with a
+     * IncompleteTable. If the table doesn't exist in catalog this operation 
is a no-op
+     */
+    @Override
+    public void process() {
+      if (invalidateCatalogTable()) {
+        LOG.info(debugString("Table %s is invalidated", 
getFullyQualifiedTblName()));
+      } else {
+        LOG.debug(debugString("Table %s does not need to be invalidated since "
+            + "it does not exist anymore", getFullyQualifiedTblName()));
+      }
+    }
+  }
+
+  /**
+   * An event type which is ignored. Useful for unsupported metastore event 
types
+   */
+  private static class IgnoredEvent extends MetastoreEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private IgnoredEvent(CatalogServiceCatalog catalog, NotificationEvent 
event) {
+      super(catalog, event);
+    }
+
+    @Override
+    public void process() {
+      LOG.debug(debugString("Ignored"));
+    }
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
new file mode 100644
index 0000000..cca29bb
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import 
org.apache.hadoop.hive.metastore.messaging.json.ExtendedJSONMessageFactory;
+import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.META;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEvent;
+import 
org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEventFactory;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/**
+ * A metastore event is a instance of the class
+ * <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. 
Metastore can be
+ * configured, to work with Listeners which are called on various DDL 
operations like
+ * create/alter/drop operations on database, table, partition etc. Each event 
has a unique
+ * incremental id and the generated events are be fetched from Metastore to get
+ * incremental updates to the metadata stored in Hive metastore using the the 
public API
+ * <code>get_next_notification</code> These events could be generated by 
external
+ * Metastore clients like Apache Hive or Apache Spark as well as other Impala 
clusters
+ * configured to talk with the same metastore.
+ *
+ * This class is used to poll metastore for such events at a given frequency. 
By observing
+ * such events, we can take appropriate action on the catalogD 
(invalidate/add/remove) so
+ * that catalog represents the latest information available in metastore. We 
keep track of
+ * the last synced event id in each polling iteration so the next batch can be 
requested
+ * appropriately. The current batch size is constant and set to 
MAX_EVENTS_PER_RPC.
+ *
+ * <pre>
+ *      +---------------+   +----------------+        +--------------+
+ *      |Catalog state  |   |Catalog         |        |              |
+ *      |stale          |   |State up-to-date|        |Catalog state |
+ *      |(apply event)  |   |(ignore)        |        |is newer than |
+ *      |               |   |                |        |event         |
+ *      |               |   |                |        |(ignore)      |
+ *      +------+--------+   +-----+----------+        +-+------------+
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ * +-----------V------------------V---------------------V----------->  Event 
Timeline
+ *                                ^
+ *                                |
+ *                                |
+ *                                |
+ *                                |
+ *                                E
+ *
+ * </pre>
+ * Consistency model: Events could be seen as DDLs operations from past either 
done from
+ * this same cluster or some other external system. For example in the events 
timeline
+ * given above, consider a Event E at any given time. The catalog state for the
+ * corresponding object of the event could either be stale, exactly-same or at 
a version
+ * which is higher than one provided by event. Catalog state should only be 
updated when
+ * it is stale with respect to the event. In order to determine if the catalog 
object is
+ * stale, we rely on a combination of creationTime and object version. A 
object in catalog
+ * is stale if and only if its creationTime is < creationTime of the object 
from event OR
+ * its version < version from event if createTime matches
+ *
+ * If the object has the same createTime and version when compared to event or 
if the
+ * createTime > createTime from the event, the event can be safely ignored.
+ *
+ * Following table shows the actions to be taken when the catalog state is 
stale.
+ *
+ * <pre>
+ *               +----------------------------------------+
+ *               |    Catalog object state                |
+ * +----------------------------+------------+------------+
+ * | Event type  | Loaded       | Incomplete | Not present|
+ * |             |              |            |            |
+ * +------------------------------------------------------+
+ * |             |              |            |            |
+ * | CREATE EVENT| removeAndAdd | Ignore     | Add        |
+ * |             |              |            |            |
+ * |             |              |            |            |
+ * | ALTER EVENT | Invalidate   | Ignore     | Ignore     |
+ * |             |              |            |            |
+ * |             |              |            |            |
+ * | DROP EVENT  | Remove       | Remove     | Ignore     |
+ * |             |              |            |            |
+ * +-------------+--------------+------------+------------+
+ * </pre>
+ *
+ * //TODO - Object version support is a work-in-progress in Hive (HIVE-21115). 
Current
+ * event handlers only rely on createTime on Table and Partition. Database 
createTime is a
+ * work-in-progress in Hive in (HIVE-20776)
+ *
+ * All the operations which change the state of catalog cache while processing 
a certain
+ * event type must be atomic in nature. We rely on taking a write lock on 
version object
+ * in CatalogServiceCatalog to make sure that readers are blocked while the 
metadata
+ * update operation is being performed. Since the events are generated 
post-metastore
+ * operations, such catalog updates do not need to update the state in Hive 
Metastore.
+ *
+ * Error Handling: The event processor could be in ACTIVE, STOPPED, ERROR 
states. In case
+ * of any errors while processing the events the state of event processor 
changes to ERROR
+ * and no subsequent events are polled. In such a case a invalidate metadata 
command
+ * restarts the event polling which updates the lastSyncedEventId to the 
latest from
+ * metastore.
+ */
+public class MetastoreEventsProcessor implements ExternalEventsProcessor {
+
+  public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
+      "hive.metastore.notifications.add.thrift.objects";
+  private static final Logger LOG = 
Logger.getLogger(MetastoreEventsProcessor.class);
+  // Use ExtendedJSONMessageFactory to deserialize the event messages.
+  // ExtendedJSONMessageFactory adds additional information over 
JSONMessageFactory so
+  // that events are compatible with Sentry
+  // TODO this should be moved to JSONMessageFactory when Sentry switches to
+  // JSONMessageFactory
+  private static final MessageFactory messageFactory =
+      ExtendedJSONMessageFactory.getInstance();
+
+  private static MetastoreEventsProcessor instance;
+
+  // maximum number of events to poll in each RPC
+  private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
+
+  // possible status of event processor
+  public enum EventProcessorStatus {
+    STOPPED, // event processor is instantiated but not yet scheduled
+    ACTIVE, // event processor is scheduled at a given frequency
+    ERROR // event processor is in error state and event processing has stopped
+  }
+
+  // current status of this event processor
+  private EventProcessorStatus eventProcessorStatus_ = 
EventProcessorStatus.STOPPED;
+
+  // event factory which is used to get or create MetastoreEvents
+  private final MetastoreEventFactory metastoreEventFactory_;
+
+  // keeps track of the last event id which we have synced to
+  private long lastSyncedEventId_;
+
+  // polling interval in seconds. Note this is a time we wait AFTER each fetch 
call
+  private final long pollingFrequencyInSec_;
+
+  // catalog service instance to be used while processing events
+  private final CatalogServiceCatalog catalog_;
+
+  // scheduler daemon thread executor for processing events at given frequency
+  private final ScheduledExecutorService scheduler_ = Executors
+      .newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("MetastoreEventsProcessor").build());
+
+  @VisibleForTesting
+  MetastoreEventsProcessor(CatalogServiceCatalog catalog, long startSyncFromId,
+      long pollingFrequencyInSec) {
+    Preconditions.checkState(pollingFrequencyInSec > 0);
+    this.catalog_ = Preconditions.checkNotNull(catalog);
+    lastSyncedEventId_ = startSyncFromId;
+    metastoreEventFactory_ = new MetastoreEventFactory(catalog_);
+    pollingFrequencyInSec_ = pollingFrequencyInSec;
+  }
+
+  /**
+   * Schedules the daemon thread at a given frequency. It is important to note 
that this
+   * method schedules with FixedDelay instead of FixedRate. The reason it is 
scheduled at
+   * a fixedDelay is to make sure that we don't pile up the pending tasks in 
case each
+   * polling operation is taking longer than the given frequency. Because of 
the fixed
+   * delay, the new poll operation is scheduled at the time when previousPoll 
operation
+   * completes + givenDelayInSec
+   */
+  @Override
+  public synchronized void start() {
+    Preconditions.checkState(eventProcessorStatus_ != 
EventProcessorStatus.ACTIVE);
+    startScheduler();
+    eventProcessorStatus_ = EventProcessorStatus.ACTIVE;
+    LOG.info(String.format("Successfully started metastore event processing."
+        + "Polling interval: %d seconds.", pollingFrequencyInSec_));
+  }
+
+  /**
+   * Gets the current event processor status
+   */
+  @VisibleForTesting
+  EventProcessorStatus getStatus() {
+    return eventProcessorStatus_;
+  }
+
+  /**
+   * returns the current value of LastSyncedEventId. This method is not 
thread-safe and
+   * only to be used for testing purposes
+   */
+  @VisibleForTesting
+  public long getLastSyncedEventId() {
+    return lastSyncedEventId_;
+  }
+
+  @VisibleForTesting
+  void startScheduler() {
+    Preconditions.checkState(pollingFrequencyInSec_ > 0);
+    LOG.info(String.format("Starting metastore event polling with interval %d 
seconds.",
+        pollingFrequencyInSec_));
+    scheduler_.scheduleWithFixedDelay(this::processEvents, 
pollingFrequencyInSec_,
+        pollingFrequencyInSec_, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Stops the event processing and changes the status of event processor to
+   * <code>EventProcessorStatus.STOPPED</code>. No new events will be 
processed as long
+   * the status is stopped. If this event processor is actively processing 
events when
+   * stop is called, this method blocks until the current processing is 
complete
+   */
+  @Override
+  public synchronized void stop() {
+    Preconditions.checkState(eventProcessorStatus_ != 
EventProcessorStatus.STOPPED);
+    eventProcessorStatus_ = EventProcessorStatus.STOPPED;
+    LOG.info(String.format("Event processing is stopped. Last synced event id 
is %d",
+        lastSyncedEventId_));
+  }
+
+  /**
+   * Get the current notification event id from metastore
+   */
+  @Override
+  public long getCurrentEventId() throws CatalogException {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      return 
metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+    } catch (TException e) {
+      throw new CatalogException("Unable to fetch the current notification 
event id. "
+          + "Check if metastore service is accessible");
+    }
+  }
+
+  /**
+   * Starts the event processor from a given event id
+   */
+  @Override
+  public synchronized void start(long fromEventId) {
+    Preconditions.checkArgument(fromEventId >= 0);
+    Preconditions.checkState(eventProcessorStatus_ != 
EventProcessorStatus.ACTIVE,
+        "Event processing start called when it is already active");
+    long prevLastSyncedEventId = lastSyncedEventId_;
+    lastSyncedEventId_ = fromEventId;
+    eventProcessorStatus_ = EventProcessorStatus.ACTIVE;
+    LOG.info(String.format(
+        "Metastore event processing restarted. Last synced event id was 
updated "
+            + "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_));
+  }
+
+  /**
+   * Fetch the next batch of NotificationEvents from metastore. The default 
batch size if
+   * <code>EVENTS_BATCH_SIZE_PER_RPC</code>
+   */
+  @VisibleForTesting
+  protected List<NotificationEvent> getNextMetastoreEvents()
+      throws MetastoreNotificationFetchException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      // fetch the current notification event id. We assume that the polling 
interval
+      // is small enough that most of these polling operations result in zero 
new
+      // events. In such a case, fetching current notification event id is 
much faster
+      // (and cheaper on HMS side) instead of polling for events directly
+      CurrentNotificationEventId currentNotificationEventId =
+          msClient.getHiveClient().getCurrentNotificationEventId();
+      long currentEventId = currentNotificationEventId.getEventId();
+
+      // no new events since we last polled
+      if (currentEventId <= lastSyncedEventId_) {
+        return Collections.emptyList();
+      }
+
+      NotificationEventResponse response = msClient.getHiveClient()
+          .getNextNotification(lastSyncedEventId_, EVENTS_BATCH_SIZE_PER_RPC, 
null);
+      LOG.info(String
+          .format("Received %d events. Start event id : %d", 
response.getEvents().size(),
+              lastSyncedEventId_));
+      return response.getEvents();
+    } catch (TException e) {
+      throw new MetastoreNotificationFetchException(
+          "Unable to fetch notifications from metastore. Last synced event id 
is "
+              + lastSyncedEventId_, e);
+    }
+  }
+
+  /**
+   * This method issues a request to Hive Metastore if needed, based on the 
current event
+   * id in metastore and the last synced event_id. Events are fetched in fixed 
sized
+   * batches. Each NotificationEvent received is processed by its corresponding
+   * <code>MetastoreEvent</code>
+   */
+  @Override
+  public void processEvents() {
+    NotificationEvent lastProcessedEvent = null;
+    try {
+      EventProcessorStatus currentStatus = eventProcessorStatus_;
+      if (currentStatus == EventProcessorStatus.STOPPED
+          || currentStatus == EventProcessorStatus.ERROR) {
+        LOG.warn(String.format(
+            "Event processing is skipped since status is %s. Last synced event 
id is %d",
+            currentStatus, lastSyncedEventId_));
+        return;
+      }
+
+      List<NotificationEvent> events = getNextMetastoreEvents();
+      lastProcessedEvent = processEvents(events);
+    } catch (MetastoreNotificationFetchException ex) {
+      updateStatus(EventProcessorStatus.ERROR);
+      LOG.error("Unable to fetch the next batch of metastore events", ex);
+    } catch (MetastoreNotificationException | CatalogException ex) {
+      updateStatus(EventProcessorStatus.ERROR);
+      LOG.error(String.format(
+          "Unable to process notification event %d due to %s. Event processing 
will be "
+              + "stopped", lastProcessedEvent.getEventId(), ex.getMessage()));
+      dumpEventInfoToLog(lastProcessedEvent);
+    }
+  }
+
+  /**
+   * Process the given list of notification events. Useful for tests which 
provide a list
+   * of events
+   *
+   * @return the last Notification event which was processed.
+   */
+  @VisibleForTesting
+  protected NotificationEvent processEvents(List<NotificationEvent> events)
+      throws MetastoreNotificationException, CatalogException {
+    List<MetastoreEvent> filteredEvents =
+        metastoreEventFactory_.getFilteredEvents(events);
+    NotificationEvent lastProcessedEvent = null;
+    for (MetastoreEvent event : filteredEvents) {
+      // synchronizing each event processing reduces the scope of the lock so 
the a
+      // potential reset() during event processing is not blocked for longer 
than
+      // necessary
+      synchronized (this) {
+        if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
+          break;
+        }
+        lastProcessedEvent = event.metastoreNotificationEvent_;
+        event.process();
+        lastSyncedEventId_ = event.eventId_;
+      }
+    }
+    return lastProcessedEvent;
+  }
+
+  /**
+   * Updates the current states to the given status.
+   */
+  private synchronized void updateStatus(EventProcessorStatus toStatus) {
+    eventProcessorStatus_ = toStatus;
+  }
+
+  private void dumpEventInfoToLog(NotificationEvent event) {
+    StringBuilder msg =
+        new StringBuilder().append("Event id: 
").append(event.getEventId()).append("\n")
+            .append("Event Type: ").append(event.getEventType()).append("\n")
+            .append("Event time: ").append(event.getEventTime()).append("\n")
+            .append("Database name: ").append(event.getDbName()).append("\n");
+    if (event.getTableName() != null) {
+      msg.append("Table name: ").append(event.getTableName()).append("\n");
+    }
+    msg.append("Event message: ").append(event.getMessage()).append("\n");
+    LOG.error(msg.toString());
+  }
+
+  /**
+   * Create a instance of this object if it is not initialized. Currently, 
this object is
+   * a singleton and should only be created during catalogD initialization 
time, so that
+   * the start syncId matches with the catalogD startup time.
+   *
+   * @param catalog the CatalogServiceCatalog instance to which this event 
processing
+   *     belongs
+   * @param startSyncFromId Start event id. Events will be polled starting 
from this
+   *     event id
+   * @param eventPollingInterval HMS polling interval in seconds
+   * @return this object is already created, or create a new one if it is not 
yet
+   *     instantiated
+   */
+  public static synchronized ExternalEventsProcessor getInstance(
+      CatalogServiceCatalog catalog, long startSyncFromId, long 
eventPollingInterval) {
+    if (instance != null) {
+      return instance;
+    }
+
+    instance =
+        new MetastoreEventsProcessor(catalog, startSyncFromId, 
eventPollingInterval);
+    return instance;
+  }
+
+  @VisibleForTesting
+  public MetastoreEventFactory getMetastoreEventFactory() {
+    return metastoreEventFactory_;
+  }
+
+  public static MessageFactory getMessageFactory() {
+    return messageFactory;
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java
new file mode 100644
index 0000000..341e8aa
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.common.ImpalaException;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationException extends ImpalaException {
+  private static final long serialVersionUID = -2493154165900437878L;
+
+  public MetastoreNotificationException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public MetastoreNotificationException(String msg) { super(msg); }
+
+  public MetastoreNotificationException(Exception e) { super(e); }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java
new file mode 100644
index 0000000..d249b20
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+public class MetastoreNotificationFetchException extends 
MetastoreNotificationException {
+
+  private static final long serialVersionUID = -2965835338838695815L;
+
+  public MetastoreNotificationFetchException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public MetastoreNotificationFetchException(String msg) {
+    super(msg);
+  }
+
+  public MetastoreNotificationFetchException(Exception e) {
+    super(e);
+  }
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java 
b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
new file mode 100644
index 0000000..44150d1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+/**
+ * A simple no-op events processor which does nothing. Used to plugin to the 
catalog
+ * when event processing is disabled so that we don't have to do a null check 
every
+ * time the event processor is called
+ */
+public class NoOpEventProcessor implements ExternalEventsProcessor {
+  private static final ExternalEventsProcessor INSTANCE = new 
NoOpEventProcessor();
+
+  /**
+   * Gets the instance of NoOpEventProcessor
+   */
+  public static ExternalEventsProcessor getInstance() { return INSTANCE; }
+
+  private NoOpEventProcessor() {
+    // prevents instantiation
+  }
+
+  @Override
+  public void start() {
+    // no-op
+  }
+
+  @Override
+  public long getCurrentEventId() {
+    // dummy event id
+    return -1;
+  }
+
+  @Override
+  public void stop() {
+    // no-op
+  }
+
+  @Override
+  public void start(long fromEventId) {
+    // no-op
+  }
+
+  @Override
+  public void processEvents() {
+    // no-op
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 0e65903..a4a7b72 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -125,6 +125,10 @@ public class BackendConfig {
     return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
   }
 
+  public long getHMSPollingIntervalInSeconds() {
+    return backendCfg_.hms_event_polling_interval_s;
+  }
+
   public boolean isOrcScannerEnabled() {
     return backendCfg_.enable_orc_scanner;
   }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
new file mode 100644
index 0000000..bcbcd43
--- /dev/null
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -0,0 +1,977 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.DatabaseNotFoundException;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.IncompleteTable;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEventType;
+import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TAlterTableOrViewRenameParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TColumnType;
+import org.apache.impala.thrift.TCreateDbParams;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TDropDbParams;
+import org.apache.impala.thrift.TDropTableOrViewParams;
+import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TPrimitiveType;
+import org.apache.impala.thrift.TScalarType;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTypeNode;
+import org.apache.impala.thrift.TTypeNodeType;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Main test class to cover the functionality of MetastoreEventProcessor. In 
order to make
+ * the test deterministic, this test relies on the fact the default value of
+ * hms_event_polling_interval_s is 0. This means that there is no automatic 
scheduled
+ * frequency of the polling for events from metastore. In order to simulate a 
poll
+ * operation this test issues the <code>processEvents</code> method
+ * manually to process the pending events. This test relies on a external HMS 
process
+ * running in a minicluster environment such that events are generated and 
they have the
+ * thrift objects enabled in the event messages.
+ */
+public class MetastoreEventsProcessorTest {
+  private static final String TEST_TABLE_NAME_PARTITIONED = 
"test_partitioned_tbl";
+  private static final String TEST_DB_NAME = "events_test_db";
+  private static final String TEST_TABLE_NAME_NONPARTITIONED = 
"test_nonpartitioned_tbl";
+
+  private static CatalogServiceCatalog catalog_;
+  private static CatalogOpExecutor catalogOpExecutor_;
+  private static MetastoreEventsProcessor eventsProcessor_;
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws TException {
+    catalog_ = CatalogServiceTestCatalog.create();
+    catalogOpExecutor_ = new CatalogOpExecutor(catalog_);
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationId =
+          metaStoreClient.getHiveClient().getCurrentNotificationEventId();
+      eventsProcessor_ = new SynchronousHMSEventProcessorForTests(
+          catalog_, currentNotificationId.getEventId(), 10L);
+      eventsProcessor_.start();
+    }
+    catalog_.setMetastoreEventProcessor(eventsProcessor_);
+  }
+
+  @AfterClass
+  public static void tearDownTestSetup() {
+    try {
+      dropDatabaseCascadeFromHMS();
+      // remove database from catalog as well to clean up catalog state
+      catalog_.removeDb(TEST_DB_NAME);
+    } catch (Exception ex) {
+      // ignored
+    }
+  }
+
+  private static void dropDatabaseCascadeFromHMS() throws TException {
+    dropDatabaseCascade(TEST_DB_NAME);
+  }
+
+  private static void dropDatabaseCascade(String dbName) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().dropDatabase(dbName, true, true, true);
+    }
+  }
+
+  /**
+   * Cleans up the test database from both metastore and catalog
+   * @throws TException
+   */
+  @Before
+  public void beforeTest() throws TException, CatalogException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().dropDatabase(TEST_DB_NAME, true, true, true);
+    }
+    catalog_.removeDb(TEST_DB_NAME);
+    // reset the event processor to the current eventId
+    eventsProcessor_.stop();
+    eventsProcessor_.start(eventsProcessor_.getCurrentEventId());
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+  }
+
+  /**
+   * Make sure the eventProcessor is in ACTIVE state after processing all the 
events in
+   * the test. All tests should make sure that the eventprocessor is returned 
back to
+   * active state so that next test execution starts clean
+   */
+  @After
+  public void afterTest() {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+  }
+
+  /**
+   * Checks that database exists after processing a CREATE_DATABASE event
+   */
+  @Test
+  public void testCreateDatabaseEvent() throws TException, ImpalaException {
+    createDatabase();
+    eventsProcessor_.processEvents();
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+  }
+
+  /**
+   * Checks that Db object does not exist after processing DROP_DATABASE event 
when the
+   * dropped database is empty
+   */
+  @Test
+  public void testDropEmptyDatabaseEvent() throws TException, ImpalaException {
+    dropDatabaseCascade("database_to_be_dropped");
+    // create empty database
+    createDatabase("database_to_be_dropped");
+    eventsProcessor_.processEvents();
+    assertNotNull(catalog_.getDb("database_to_be_dropped"));
+    dropDatabaseCascade("database_to_be_dropped");
+    eventsProcessor_.processEvents();
+    assertNull("Database should not be found after processing drop_database 
event",
+        catalog_.getDb("database_to_be_dropped"));
+  }
+
+  /**
+   * Checks that Db object does not exist after processing DROP_DATABASE event 
when the
+   * dropped database is not empty. This event could be generated by issuing a 
DROP
+   * DATABASE .. CASCADE command. In this case since the tables in the 
database are also
+   * dropped, we expect to see a DatabaseNotFoundException when we query for 
the tables in
+   * the dropped database.
+   */
+  @Test
+  public void testdropDatabaseEvent() throws TException, ImpalaException {
+    createDatabase();
+    String tblToBeDropped = "tbl_to_be_dropped";
+    createTable(tblToBeDropped, true);
+    createTable("tbl_to_be_dropped_unpartitioned", false);
+    // create 2 partitions
+    List<List<String>> partVals = new ArrayList<>(2);
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    addPartitions(tblToBeDropped, partVals);
+    eventsProcessor_.processEvents();
+    loadTable(tblToBeDropped);
+    // now drop the database with cascade option
+    dropDatabaseCascadeFromHMS();
+    eventsProcessor_.processEvents();
+    assertTrue(
+        "Dropped database should not be found after processing drop_database 
event",
+        catalog_.getDb(TEST_DB_NAME) == null);
+    // throws DatabaseNotFoundException
+    try {
+      catalog_.getTable(TEST_DB_NAME, tblToBeDropped);
+      fail();
+    } catch (DatabaseNotFoundException expectedEx) {
+      // expected exception; ignored
+    }
+  }
+
+  @Ignore("Disabled until we fix Hive bug to deserialize alter_database event 
messages")
+  @Test
+  public void testAlterDatabaseEvents() throws TException, ImpalaException {
+    createDatabase();
+    String testDbParamKey = "testKey";
+    String testDbParamVal = "testVal";
+    eventsProcessor_.processEvents();
+    assertFalse("Newly created test database has db should not have parameter 
with key "
+            + testDbParamKey,
+        catalog_.getDb(TEST_DB_NAME)
+            .getMetaStoreDb()
+            .getParameters()
+            .containsKey(testDbParamKey));
+    // test change of parameters to the Database
+    addDatabaseParameters(testDbParamKey, "someDbParamVal");
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have set the key " + testDbParamKey + 
" to value "
+            + testDbParamVal + " in parameters",
+        testDbParamVal.equals(catalog_.getDb(TEST_DB_NAME)
+                                  .getMetaStoreDb()
+                                  .getParameters()
+                                  .get(testDbParamKey)));
+
+    // test update to the default location
+    String currentLocation =
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getLocationUri();
+    String newLocation = currentLocation + File.separatorChar + 
"newTestLocation";
+    Database alteredDb = 
catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().deepCopy();
+    alteredDb.setLocationUri(newLocation);
+    alterDatabase(alteredDb);
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have the updated location",
+        newLocation.equals(
+            catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getLocationUri()));
+
+    // test change of owner
+    String owner = 
catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName();
+    final String newOwner = "newTestOwner";
+    // sanity check
+    assertFalse(newOwner.equals(owner));
+    alteredDb = catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().deepCopy();
+    alteredDb.setOwnerName(newOwner);
+    alterDatabase(alteredDb);
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have the updated owner",
+        
newOwner.equals(catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName()));
+  }
+
+  /**
+   * Test creates two table (partitioned and non-partitioned) and makes sure 
that CatalogD
+   * has the two created table objects after the CREATE_TABLE events are 
processed.
+   */
+  @Test
+  public void testCreateTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    eventsProcessor_.processEvents();
+    assertNull(TEST_TABLE_NAME_NONPARTITIONED + " is not expected to exist",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    // create a non-partitioned table
+    createTable(TEST_TABLE_NAME_NONPARTITIONED, false);
+    eventsProcessor_.processEvents();
+    assertNotNull("Catalog should have a incomplete instance of table after 
CREATE_TABLE "
+            + "event is received",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    assertTrue("Newly created table from events should be a IncompleteTable",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+    // test partitioned table case
+    createTable(TEST_TABLE_NAME_PARTITIONED, true);
+    eventsProcessor_.processEvents();
+    assertNotNull("Catalog should have create a incomplete table after 
receiving "
+            + "CREATE_TABLE event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+    assertTrue("Newly created table should be instance of IncompleteTable",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+                instanceof IncompleteTable);
+  }
+
+  /**
+   * This tests adds few partitions to a existing table and makes sure that 
the subsequent
+   * load table command fetches the expected number of partitions. It relies 
on the fact
+   * the HMSEventProcessor currently just issues a invalidate command on the 
table instead
+   * of directly refreshing the partition objects TODO: This test can be 
improved further
+   * to check if the table has new partitions without the load command once 
IMPALA-7973 is
+   * fixed
+   */
+  @Test
+  public void testPartitionEvents() throws TException, ImpalaException {
+    createDatabase();
+    createTable(TEST_TABLE_NAME_PARTITIONED, true);
+    // sync to latest event id
+    eventsProcessor_.processEvents();
+
+    // simulate the table being loaded by explicitly calling load table
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    List<List<String>> partVals = new ArrayList<>();
+
+    // create 4 partitions
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    partVals.add(Arrays.asList("3"));
+    partVals.add(Arrays.asList("4"));
+    addPartitions(TEST_TABLE_NAME_PARTITIONED, partVals);
+
+    eventsProcessor_.processEvents();
+    // after ADD_PARTITION event is received currently we just invalidate the 
table
+    assertTrue("Table should have been invalidated after add partition event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+                instanceof IncompleteTable);
+
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    assertEquals("Unexpected number of partitions fetched for the loaded 
table", 4,
+        ((HdfsTable) catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_PARTITIONED))
+            .getPartitions()
+            .size());
+
+    // now remove some partitions to see if catalogD state gets invalidated
+    partVals.clear();
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    partVals.add(Arrays.asList("3"));
+    dropPartitions(TEST_TABLE_NAME_PARTITIONED, partVals);
+    eventsProcessor_.processEvents();
+
+    assertTrue("Table should have been invalidated after drop partition event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+            instanceof IncompleteTable);
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    assertEquals("Unexpected number of partitions fetched for the loaded 
table", 1,
+        ((HdfsTable) catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_PARTITIONED))
+            .getPartitions().size());
+
+    // issue alter partition ops
+    partVals.clear();
+    partVals.add(Arrays.asList("4"));
+    Map<String, String> newParams = new HashMap<>(2);
+    newParams.put("alterKey1", "alterVal1");
+    alterPartitions(TEST_TABLE_NAME_PARTITIONED, partVals, newParams);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after alter partition 
event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+            instanceof IncompleteTable);
+  }
+
+  /**
+   * Test generates ALTER_TABLE events for various cases (table rename, 
parameter change,
+   * add/remove/change column) and makes sure that the table is updated on the 
CatalogD
+   * side after the ALTER_TABLE event is processed.
+   */
+  @Test
+  public void testAlterTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    createTable("old_name", false);
+    // sync to latest events
+    eventsProcessor_.processEvents();
+    // simulate the table being loaded by explicitly calling load table
+    loadTable("old_name");
+
+    // test renaming a table from outside aka metastore client
+    alterTableRename("old_name", TEST_TABLE_NAME_NONPARTITIONED);
+    eventsProcessor_.processEvents();
+    // table with the old name should not be present anymore
+    assertNull(
+        "Old named table still exists", catalog_.getTable(TEST_DB_NAME, 
"old_name"));
+    // table with the new name should be present in Incomplete state
+    Table newTable = catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED);
+    assertNotNull("Table with the new name is not found", newTable);
+    assertTrue("Table with the new name should be incomplete",
+        newTable instanceof IncompleteTable);
+
+    // check invalidate after alter table add parameter
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "somekey", 
"someval");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should be incomplete after alter table add parameter",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table add col
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableAddCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol", "int", "null");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after alter table add 
column",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table change column type
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    altertableChangeCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol", "string", 
null);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after changing column type",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table remove column
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableRemoveCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after removing a column",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+  }
+
+  /**
+   * Test drops table using a metastore client and makes sure that the table 
does not
+   * exist in the catalogD after processing DROP_TABLE event is processed. 
Repeats the
+   * same test for a partitioned table.
+   */
+  @Test
+  public void testDropTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    final String TBL_TO_BE_DROPPED = "tbl_to_be_dropped";
+    createTable(TBL_TO_BE_DROPPED, false);
+    eventsProcessor_.processEvents();
+    loadTable(TBL_TO_BE_DROPPED);
+    // issue drop table and make sure it doesn't exist after processing the 
events
+    dropTable(TBL_TO_BE_DROPPED);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should not be found after processing drop_table event",
+        catalog_.getTable(TEST_DB_NAME, TBL_TO_BE_DROPPED) == null);
+
+    // test partitioned table drop
+    createTable(TBL_TO_BE_DROPPED, true);
+
+    eventsProcessor_.processEvents();
+    loadTable(TBL_TO_BE_DROPPED);
+    // create 2 partitions
+    List<List<String>> partVals = new ArrayList<>(2);
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    addPartitions(TBL_TO_BE_DROPPED, partVals);
+    dropTable(TBL_TO_BE_DROPPED);
+    eventsProcessor_.processEvents();
+    assertTrue("Partitioned table should not be found after processing 
drop_table event",
+        catalog_.getTable(TEST_DB_NAME, TBL_TO_BE_DROPPED) == null);
+  }
+
+  /**
+   * Test makes sure that the events are not processed when the event 
processor is in
+   * STOPPED state
+   * @throws TException
+   */
+  @Test
+  public void testStopEventProcessing() throws TException {
+    try {
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      eventsProcessor_.stop();
+      createDatabase();
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.STOPPED, eventsProcessor_.getStatus());
+      assertNull(
+          "Test database should not be in catalog when event processing is 
stopped",
+          catalog_.getDb(TEST_DB_NAME));
+    } finally {
+      eventsProcessor_.start();
+    }
+  }
+
+  /**
+   * Test makes sure that event processing is restarted after a 
stop/start(eventId)
+   * call sequence to event processor
+   */
+  @Test
+  public void testEventProcessorRestart() throws TException {
+    try {
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      long syncedIdBefore = eventsProcessor_.getLastSyncedEventId();
+      eventsProcessor_.stop();
+      createDatabase();
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.STOPPED, eventsProcessor_.getStatus());
+      assertNull(
+          "Test database should not be in catalog when event processing is 
stopped",
+          catalog_.getDb(TEST_DB_NAME));
+      eventsProcessor_.start(syncedIdBefore);
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      eventsProcessor_.processEvents();
+      assertNotNull(
+          "Test database should be in catalog when event processing is 
restarted",
+          catalog_.getDb(TEST_DB_NAME));
+    } finally {
+      if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) {
+        eventsProcessor_.start();
+      }
+    }
+  }
+
+  /**
+   * Test makes sure that event processor is restarted after reset()
+   */
+  @Test
+  public void testEventProcessingAfterReset() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    long syncedIdBefore = eventsProcessor_.getLastSyncedEventId();
+    catalog_.reset();
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    // nothing changed so event id remains the same
+    assertEquals(syncedIdBefore, eventsProcessor_.getLastSyncedEventId());
+  }
+
+  /**
+   * Test creates, drops and creates a table with the same name from Impala. 
This would
+   * lead to an interesting sequence of CREATE_TABLE, DROP_TABLE, CREATE_TABLE 
events
+   * while the catalogD state has the latest version of the table cached. Test 
makes sure
+   * that Event processor does not modify catalogd state since the catalog 
table is
+   * already at its latest state
+   */
+  @Test
+  public void testCreateDropCreateTableFromImpala() throws ImpalaException, 
TException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabase();
+    eventsProcessor_.processEvents();
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    assertNotNull("Table should have been found after create table statement",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
+    // now catalogD does not have the table entry, create the table again
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    assertNotNull("Table should have been found after create table statement",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    // the first create table event should not change anything to the 
catalogd's
+    // created table
+    assertEquals(3, events.size());
+    Table existingTable = catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED);
+    int creationTime = existingTable.getMetaStoreTable().getCreateTime();
+    assertEquals("CREATE_TABLE", events.get(0).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    // after processing the create_table the original table should still 
remain the same
+    assertEquals(creationTime, catalog_.getTable(TEST_DB_NAME,
+        TEST_TABLE_NAME_NONPARTITIONED).getMetaStoreTable().getCreateTime());
+    //second event should be drop_table. This event should also be skipped 
since
+    // catalog state is more recent than the event
+    assertEquals("DROP_TABLE", events.get(1).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    // even after drop table event, the table should still exist
+    assertNotNull("Table should have existed since catalog state is current 
and event "
+        + "is stale", catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
+    // the final create table event should also be ignored since its a 
self-event
+    assertEquals("CREATE_TABLE", events.get(2).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    assertFalse(
+        "Table should have been loaded since the create_table should be " + 
"ignored",
+        catalog_.getTable(TEST_DB_NAME,
+            TEST_TABLE_NAME_NONPARTITIONED) instanceof IncompleteTable);
+    //finally make sure the table is still the same
+    assertEquals(creationTime, catalog_.getTable(TEST_DB_NAME,
+        TEST_TABLE_NAME_NONPARTITIONED).getMetaStoreTable().getCreateTime());
+  }
+
+  /**
+   * Test generates DDL events on table and makes sure that event processing 
does not
+   * modify the catalog state
+   *
+   * @throws ImpalaException
+   */
+  @Test
+  public void testTableEventsFromImpala() throws ImpalaException {
+    createDatabaseFromImpala(TEST_DB_NAME, "created from Impala");
+    createTableFromImpala(TEST_TABLE_NAME_PARTITIONED, true);
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(2, events.size());
+
+    eventsProcessor_.processEvents(events);
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertNotNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_PARTITIONED));
+    assertFalse("Table should have been loaded since it was already latest", 
catalog_
+        .getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED) instanceof 
IncompleteTable);
+
+    dropTableFromImpala(TEST_TABLE_NAME_PARTITIONED);
+    assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+    events = eventsProcessor_.getNextMetastoreEvents();
+    // should have 1 drop_table event
+    assertEquals(1, events.size());
+    eventsProcessor_.processEvents(events);
+    // dropping a non-existant table should cause event processor to go into 
error state
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+  }
+
+  /**
+   * Creates events like create, drop with the same tblName. In such case the 
create
+   * table should not create a in
+   */
+  @Test
+  public void testEventFiltering() throws ImpalaException {
+    createDatabaseFromImpala(TEST_DB_NAME, "");
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    assertNotNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
+    dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
+    // the create table event should be filtered out
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(3, events.size());
+    List<MetastoreEvent> filteredEvents =
+        eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
+    assertEquals(2, filteredEvents.size());
+    assertEquals(MetastoreEventType.CREATE_DATABASE, 
filteredEvents.get(0).eventType_);
+    assertEquals(MetastoreEventType.DROP_TABLE, 
filteredEvents.get(1).eventType_);
+    eventsProcessor_.processEvents();
+    assertNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
+
+    // test the table rename case
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    renameTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, "new_name");
+    events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(2, events.size());
+    filteredEvents =
+        eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
+    assertEquals(1, filteredEvents.size());
+    assertEquals(MetastoreEventType.ALTER_TABLE, 
filteredEvents.get(0).eventType_);
+  }
+
+  /**
+   * Similar to create,drop,create sequence table as in
+   * <code>testCreateDropCreateTableFromImpala</code> but operates on Database 
instead
+   * of Table. Makes sure that the database creationTime is checked before 
processing
+   * create and drop database events
+   */
+  @Ignore("Ignored since database createTime is unavailable until we have 
HIVE-21077")
+  @Test
+  public void testCreateDropCreateDatabaseFromImpala() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabaseFromImpala(TEST_DB_NAME, "first");
+    assertNotNull("Db should have been found after create database statement",
+        catalog_.getDb(TEST_DB_NAME));
+    dropDatabaseFromImpala(TEST_DB_NAME);
+    assertNull(catalog_.getDb(TEST_DB_NAME));
+    createDatabaseFromImpala(TEST_DB_NAME, "second");
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    // should have 3 events for create,drop and create database
+    assertEquals(3, events.size());
+
+    assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    // create_database event should have no effect since catalogD has already 
a later
+    // version of database with the same name.
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+
+    // now process drop_database event
+    assertEquals("DROP_DATABASE", events.get(1).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    // database should not be dropped since catalogD is at the latest state
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+
+    // the third create_database event should have no effect too
+    assertEquals("CREATE_DATABASE", events.get(2).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+  }
+
+  private void createDatabase() throws TException { 
createDatabase(TEST_DB_NAME); }
+
+  private void createDatabase(String dbName) throws TException {
+    Database database =
+        new DatabaseBuilder()
+            .setName(dbName)
+            .setDescription("Notification test database")
+            .addParam("dbparamkey", "dbparamValue")
+            .setOwnerName("NotificationTestOwner")
+            .setOwnerType(PrincipalType.USER).build();
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().createDatabase(database);
+    }
+  }
+
+  private void addDatabaseParameters(String key, String val) throws TException 
{
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      Database msDb = msClient.getHiveClient().getDatabase(TEST_DB_NAME);
+      assertFalse(key + " already exists in the database parameters",
+          msDb.getParameters().containsKey(key));
+      msDb.putToParameters(key, val);
+      msClient.getHiveClient().alterDatabase(TEST_DB_NAME, msDb);
+    }
+  }
+
+  private void alterDatabase(Database newDatabase) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().alterDatabase(newDatabase.getName(), 
newDatabase);
+    }
+  }
+
+  private void createTable(String tblName, boolean isPartitioned) throws 
TException {
+    TableBuilder tblBuilder =
+        new TableBuilder()
+            .setTableName(tblName)
+            .setDbName(TEST_DB_NAME)
+            .addTableParam("tblParamKey", "tblParamValue")
+            .addCol("c1", "string", "c1 description")
+            .addCol("c2", "string", "c2 description")
+            .setSerdeLib(HdfsFileFormat.PARQUET.serializationLib())
+            .setInputFormat(HdfsFileFormat.PARQUET.inputFormat())
+            .setOutputFormat(HdfsFileFormat.PARQUET.outputFormat());
+    if (isPartitioned) {
+      tblBuilder.addPartCol("p1", "string", "partition p1 description");
+    }
+
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().createTable(tblBuilder.build());
+    }
+  }
+
+  /**
+   * Drops table from Impala
+   */
+  private void dropTableFromImpala(String tblName) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.DROP_TABLE);
+    TDropTableOrViewParams dropTableParams = new TDropTableOrViewParams();
+    dropTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
+    dropTableParams.setIf_exists(true);
+    req.setDrop_table_or_view_params(dropTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Creates db from Impala
+   */
+  private void createDatabaseFromImpala(String dbName, String desc)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_DATABASE);
+    TCreateDbParams createDbParams = new TCreateDbParams();
+    createDbParams.setDb(dbName);
+    createDbParams.setComment(desc);
+    req.setCreate_db_params(createDbParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Drops db from Impala
+   */
+  private void dropDatabaseFromImpala(String dbName) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.DROP_DATABASE);
+    TDropDbParams dropDbParams = new TDropDbParams();
+    dropDbParams.setDb(dbName);
+    req.setDrop_db_params(dropDbParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Creates a table using CatalogOpExecutor to simulate a DDL operation from 
Impala
+   * client
+   */
+  private void createTableFromImpala(String tblName, boolean isPartitioned)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_TABLE);
+    TCreateTableParams createTableParams = new TCreateTableParams();
+    createTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
+    createTableParams.setFile_format(THdfsFileFormat.PARQUET);
+    createTableParams.setIs_external(false);
+    createTableParams.setIf_not_exists(false);
+    Map<String, String> properties = new HashMap<>();
+    properties.put("tblParamKey", "tblParamValue");
+    createTableParams.setTable_properties(properties);
+    List<TColumn> columns = new ArrayList<>(2);
+    columns.add(getScalarColumn("c1", TPrimitiveType.STRING));
+    columns.add(getScalarColumn("c2", TPrimitiveType.STRING));
+    createTableParams.setColumns(columns);
+    // create two partition columns if specified
+    if (isPartitioned) {
+      List<TColumn> partitionColumns = new ArrayList<>(2);
+      partitionColumns.add(getScalarColumn("p1", TPrimitiveType.INT));
+      partitionColumns.add(getScalarColumn("p2", TPrimitiveType.STRING));
+      createTableParams.setPartition_columns(partitionColumns);
+    }
+    req.setCreate_table_params(createTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Renames a table from oldTblName to newTblName from Impala
+   */
+  private void renameTableFromImpala(String oldTblName, String newTblName)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.ALTER_TABLE);
+    TAlterTableOrViewRenameParams renameParams = new 
TAlterTableOrViewRenameParams();
+    renameParams.new_table_name = new TTableName(TEST_DB_NAME, newTblName);
+    TAlterTableParams alterTableParams = new TAlterTableParams();
+    alterTableParams.setAlter_type(TAlterTableType.RENAME_TABLE);
+    alterTableParams.setTable_name(new TTableName(TEST_DB_NAME, oldTblName));
+    alterTableParams.setRename_params(renameParams);
+    req.setAlter_table_params(alterTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  private TColumn getScalarColumn(String colName, TPrimitiveType type) {
+    TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR);
+    tTypeNode.setScalar_type(new TScalarType(type));
+    TColumnType columnType = new TColumnType(Arrays.asList(tTypeNode));
+    return new TColumn(colName, columnType);
+  }
+
+  private void dropTable(String tableName) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().dropTable(TEST_DB_NAME, tableName, true, false);
+    }
+  }
+
+  private void alterTableRename(String tblName, String newTblName) throws 
TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.setTableName(newTblName);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableAddParameter(String tblName, String key, String val)
+      throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.getParameters().put(key, val);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableAddCol(
+      String tblName, String colName, String colType, String comment) throws 
TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.getSd().getCols().add(new FieldSchema(colName, colType, 
comment));
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void altertableChangeCol(
+      String tblName, String colName, String colType, String comment) throws 
TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      FieldSchema targetCol = null;
+      for (FieldSchema col : msTable.getSd().getCols()) {
+        if (col.getName().equalsIgnoreCase(colName)) {
+          targetCol = col;
+          break;
+        }
+      }
+      assertNotNull("Column " + colName + " does not exist", targetCol);
+      targetCol.setName(colName);
+      targetCol.setType(colType);
+      targetCol.setComment(comment);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableRemoveCol(String tblName, String colName) throws 
TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      FieldSchema targetCol = null;
+      for (FieldSchema col : msTable.getSd().getCols()) {
+        if (col.getName().equalsIgnoreCase(colName)) {
+          targetCol = col;
+          break;
+        }
+      }
+      assertNotNull("Column " + colName + " does not exist", targetCol);
+      msTable.getSd().getCols().remove(targetCol);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  /**
+   * Removes the partition by values from HMS
+   * @param tblName
+   * @param partitionValues
+   * @throws TException
+   */
+  private void dropPartitions(String tblName, List<List<String>> 
partitionValues)
+      throws TException {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      for (List<String> partVals : partitionValues) {
+        metaStoreClient.getHiveClient().dropPartition(TEST_DB_NAME, tblName,
+            partVals, true);
+      }
+    }
+  }
+
+  private void alterPartitions(String tblName, List<List<String>> partValsList,
+      Map<String, String> newParams)
+      throws TException {
+    GetPartitionsRequest request = new GetPartitionsRequest();
+    request.setDbName(TEST_DB_NAME);
+    List<Partition> partitions = new ArrayList<>();
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      for (List<String> partVal : partValsList) {
+        Partition partition = 
metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
+            tblName,
+            partVal);
+        partition.setParameters(newParams);
+        partitions.add(partition);
+      }
+
+      metaStoreClient.getHiveClient().alter_partitions(TEST_DB_NAME, tblName, 
partitions);
+    }
+  }
+
+  private void addPartitions(String tblName, List<List<String>> 
partitionValues)
+      throws TException {
+    int i = 0;
+    List<Partition> partitions = new ArrayList<>(partitionValues.size());
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      for (List<String> partVals : partitionValues) {
+        partitions.add(
+            new PartitionBuilder()
+                .fromTable(msTable)
+                .setInputFormat(msTable.getSd().getInputFormat())
+                
.setSerdeLib(msTable.getSd().getSerdeInfo().getSerializationLib())
+                .setOutputFormat(msTable.getSd().getOutputFormat())
+                .setValues(partVals)
+                .build());
+      }
+    }
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      metaStoreClient.getHiveClient().add_partitions(partitions);
+    }
+  }
+
+  private Table loadTable(String tblName) throws CatalogException {
+    Table loadedTable = catalog_.getOrLoadTable(TEST_DB_NAME, tblName);
+    assertFalse("Table should have been loaded after getOrLoadTable call",
+        loadedTable instanceof IncompleteTable);
+    return loadedTable;
+  }
+}
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
new file mode 100644
index 0000000..2976bbe
--- /dev/null
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.catalog.CatalogServiceCatalog;
+
+/**
+ * A test MetastoreEventProcessor which executes in the same thread. Useful 
for testing
+ * functionality of MetastoreEventsProcessor
+ */
+public class SynchronousHMSEventProcessorForTests extends 
MetastoreEventsProcessor {
+  SynchronousHMSEventProcessorForTests(
+      CatalogServiceCatalog catalog, long startSyncFromId, long 
pollingFrequencyInSec) {
+    super(catalog, startSyncFromId, pollingFrequencyInSec);
+  }
+
+  @Override
+  public void startScheduler() {
+    // nothing to do here; there is no background thread for this processor
+  }
+}
diff --git a/fe/src/test/resources/postgresql-hive-site.xml.template 
b/fe/src/test/resources/postgresql-hive-site.xml.template
index 0847aef..60c047e 100644
--- a/fe/src/test/resources/postgresql-hive-site.xml.template
+++ b/fe/src/test/resources/postgresql-hive-site.xml.template
@@ -226,4 +226,15 @@
   <value>${INTERNAL_LISTEN_HOST}:2181</value>
   <description>The ZooKeeper token store connect string.</description>
 </property>
+
+<!-- This property is required to issue invalidates based on metastore events.
+See IMPALA-7954 for details -->
+<property>
+  <name>hive.metastore.notifications.add.thrift.objects</name>
+  <value>true</value>
+</property>
+<property>
+  <name>hive.metastore.alter.notifications.basic</name>
+  <value>false</value>
+</property>
 </configuration>

Reply via email to