HIVE-16164: Provide mechanism for passing HMS notification ID between transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d0df902e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d0df902e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d0df902e Branch: refs/heads/branch-2 Commit: d0df902e41354d6d80a6d192e0964da8c043467b Parents: e283305 Author: Sergio Pena <sergio.p...@cloudera.com> Authored: Tue Apr 4 09:42:06 2017 -0500 Committer: Sergio Pena <sergio.p...@cloudera.com> Committed: Tue Apr 4 09:46:29 2017 -0500 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 46 +- .../MetaStoreEventListenerConstants.java | 33 ++ .../listener/TestDbNotificationListener.java | 190 +++++++ .../hadoop/hive/metastore/HiveAlterHandler.java | 60 +-- .../hadoop/hive/metastore/HiveMetaStore.java | 529 +++++++++++-------- .../metastore/MetaStoreListenerNotifier.java | 224 ++++++++ .../hive/metastore/events/ListenerEvent.java | 106 ++++ .../hadoop/hive/metastore/TestObjectStore.java | 50 ++ 8 files changed, 959 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index ea6cb79..bbfbc36 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; @@ -137,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, tableEvent); } /** @@ -152,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildDropTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, tableEvent); } /** @@ -168,7 +169,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildAlterTableMessage(before, after).toString()); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); - process(event); + process(event, tableEvent); } class FileIterator implements Iterator<String> { @@ -276,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -291,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -307,7 +308,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -321,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory .buildCreateDatabaseMessage(db).toString()); event.setDbName(db.getName()); - process(event); + process(event, dbEvent); } /** @@ -335,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory .buildDropDatabaseMessage(db).toString()); event.setDbName(db.getName()); - process(event); + process(event, dbEvent); } /** @@ -349,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory .buildCreateFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); - process(event); + process(event, fnEvent); } /** @@ -363,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory .buildDropFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); - process(event); + process(event, fnEvent); } /** @@ -377,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory .buildCreateIndexMessage(index).toString()); event.setDbName(index.getDbName()); - process(event); + process(event, indexEvent); } /** @@ -391,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory .buildDropIndexMessage(index).toString()); event.setDbName(index.getDbName()); - process(event); + process(event, indexEvent); } /** @@ -406,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory .buildAlterIndexMessage(before, after).toString()); event.setDbName(before.getDbName()); - process(event); + process(event, indexEvent); } class FileChksumIterator implements Iterator<String> { @@ -443,7 +444,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); - process(event); + process(event, insertEvent); } /** @@ -467,14 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener { return (int)millis; } - // Process this notification by adding it to metastore DB - private void process(NotificationEvent event) throws MetaException { + /** + * Process this notification by adding it to metastore DB. + * + * @param event NotificationEvent is the object written to the metastore DB. + * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the + * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners. + */ + private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); synchronized (NOTIFICATION_TBL_LOCK) { LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), event.getMessage()); HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); } + + // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. + if (event.isSetEventId()) { + listenerEvent.putParameter( + MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, + Long.toString(event.getEventId())); + } } private static class CleanerThread extends Thread { http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java new file mode 100644 index 0000000..a4f2d59 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.listener; + +/** + * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent + * parameters. + */ +public class MetaStoreEventListenerConstants { + /* + * DbNotificationListener keys reserved for updating ListenerEvent parameters. + * + * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener + * processed during an event. This event identifier might be shared + * across other MetaStoreEventListener implementations. + */ + public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 1cf47c3..50d8878 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -31,13 +31,16 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Stack; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; @@ -46,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Order; @@ -56,6 +60,21 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; @@ -75,6 +94,8 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.data.Pair; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -97,12 +118,105 @@ public class TestDbNotificationListener { private int startTime; private long firstEventId; + /* This class is used to verify that HiveMetaStore calls the non-transactional listeners with the + * current event ID set by the DbNotificationListener class */ + public static class MockMetaStoreEventListener extends MetaStoreEventListener { + private static Stack<Pair<EventType, String>> eventsIds = new Stack<>(); + + private static void pushEventId(EventType eventType, final ListenerEvent event) { + if (event.getStatus()) { + Map<String, String> parameters = event.getParameters(); + if (parameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) { + Pair<EventType, String> pair = + new Pair<>(eventType, parameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)); + eventsIds.push(pair); + } + } + } + + public static void popAndVerifyLastEventId(EventType eventType, long id) { + if (!eventsIds.isEmpty()) { + Pair<EventType, String> pair = eventsIds.pop(); + + assertEquals("Last event type does not match.", eventType, pair.first); + assertEquals("Last event ID does not match.", Long.toString(id), pair.second); + } else { + assertTrue("List of events is empty.",false); + } + } + + public static void clearEvents() { + eventsIds.clear(); + } + + public MockMetaStoreEventListener(Configuration config) { + super(config); + } + + public void onCreateTable (CreateTableEvent tableEvent) throws MetaException { + pushEventId(EventType.CREATE_TABLE, tableEvent); + } + + public void onDropTable (DropTableEvent tableEvent) throws MetaException { + pushEventId(EventType.DROP_TABLE, tableEvent); + } + + public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { + pushEventId(EventType.ALTER_TABLE, tableEvent); + } + + public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException { + pushEventId(EventType.ADD_PARTITION, partitionEvent); + } + + public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaException { + pushEventId(EventType.DROP_PARTITION, partitionEvent); + } + + public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { + pushEventId(EventType.ALTER_PARTITION, partitionEvent); + } + + public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException { + pushEventId(EventType.CREATE_DATABASE, dbEvent); + } + + public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + pushEventId(EventType.DROP_DATABASE, dbEvent); + } + + public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { + pushEventId(EventType.CREATE_INDEX, indexEvent); + } + + public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { + pushEventId(EventType.DROP_INDEX, indexEvent); + } + + public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException { + pushEventId(EventType.ALTER_INDEX, indexEvent); + } + + public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException { + pushEventId(EventType.CREATE_FUNCTION, fnEvent); + } + + public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { + pushEventId(EventType.DROP_FUNCTION, fnEvent); + } + + public void onInsert(InsertEvent insertEvent) throws MetaException { + pushEventId(EventType.INSERT, insertEvent); + } + } + @SuppressWarnings("rawtypes") @BeforeClass public static void connectToMetastore() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); + conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, MockMetaStoreEventListener.class.getName()); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s"); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); @@ -139,6 +253,12 @@ public class TestDbNotificationListener { DummyRawStoreFailEvent.setEventSucceed(true); } + @After + public void tearDown() { + MockMetaStoreEventListener.clearEvents(); + } + + @Test public void createDatabase() throws Exception { String dbName = "createdb"; @@ -164,6 +284,9 @@ public class TestDbNotificationListener { CreateDatabaseMessage createDbMsg = md.getCreateDatabaseMessage(event.getMessage()); assertEquals(dbName, createDbMsg.getDB()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -206,6 +329,10 @@ public class TestDbNotificationListener { DropDatabaseMessage dropDbMsg = md.getDropDatabaseMessage(event.getMessage()); assertEquals(dbName, dropDbMsg.getDB()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_DATABASE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters); @@ -256,6 +383,9 @@ public class TestDbNotificationListener { assertEquals(tblName, createTblMsg.getTable()); assertEquals(table, createTblMsg.getTableObj()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification table = @@ -312,6 +442,9 @@ public class TestDbNotificationListener { AlterTableMessage alterTableMessage = md.getAlterTableMessage(event.getMessage()); assertEquals(table, alterTableMessage.getTableObjAfter()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -363,6 +496,10 @@ public class TestDbNotificationListener { assertEquals(defaultDbName, dropTblMsg.getDB()); assertEquals(tblName, dropTblMsg.getTable()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_TABLE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification table = @@ -428,6 +565,10 @@ public class TestDbNotificationListener { assertTrue(ptnIter.hasNext()); assertEquals(partition, ptnIter.next()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification partition = @@ -494,6 +635,10 @@ public class TestDbNotificationListener { assertEquals(tblName, alterPtnMsg.getTable()); assertEquals(newPart, alterPtnMsg.getPtnObjAfter()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -557,6 +702,11 @@ public class TestDbNotificationListener { assertEquals(table.getTableName(), tableObj.getTableName()); assertEquals(table.getOwner(), tableObj.getOwner()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification List<String> newpartCol1Vals = Arrays.asList("tomorrow"); @@ -653,6 +803,13 @@ public class TestDbNotificationListener { Iterator<Map<String, String>> parts = dropPtnMsg.getPartitions().iterator(); assertTrue(parts.hasNext()); assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values())); + + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 5); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 4); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); } @Test @@ -693,6 +850,9 @@ public class TestDbNotificationListener { assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType()); assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -742,6 +902,10 @@ public class TestDbNotificationListener { assertEquals(defaultDbName, dropFuncMsg.getDB()); assertEquals(funcName, dropFuncMsg.getFunctionName()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_FUNCTION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification func = @@ -807,6 +971,11 @@ public class TestDbNotificationListener { assertEquals(tableName, indexObj.getOrigTableName()); assertEquals(indexTableName, indexObj.getIndexTableName()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -873,6 +1042,12 @@ public class TestDbNotificationListener { assertEquals(indexTableName.toLowerCase(), dropIdxMsg.getIndexTableName()); assertEquals(tableName.toLowerCase(), dropIdxMsg.getOrigTableName()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_INDEX, firstEventId + 4); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification index = @@ -947,6 +1122,12 @@ public class TestDbNotificationListener { assertEquals(indexTableName, indexObj.getIndexTableName()); assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime()); + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ALTER_INDEX, firstEventId + 4); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification DummyRawStoreFailEvent.setEventSucceed(false); @@ -1003,6 +1184,10 @@ public class TestDbNotificationListener { assertEquals(tblName, event.getTableName()); // Parse the message field verifyInsert(event, defaultDbName, tblName); + + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); } @Test @@ -1063,6 +1248,11 @@ public class TestDbNotificationListener { Map<String,String> partKeyValsFromNotif = insertMessage.getPartitionKeyValues(); assertMapEquals(partKeyVals, partKeyValsFromNotif); + + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); } http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 4ce6a65..d0511ad 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -268,12 +269,11 @@ public class HiveAlterHandler implements AlterHandler { } alterTableUpdateTableColumnStats(msdb, oldt, newt); - if (transactionalListeners != null && transactionalListeners.size() > 0) { - AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, true, handler); - alterTableEvent.setEnvironmentContext(environmentContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterTable(alterTableEvent); - } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventMessage.EventType.ALTER_TABLE, + new AlterTableEvent(oldt, newt, true, handler), + environmentContext); } // commit the changes success = msdb.commitTransaction(); @@ -381,13 +381,13 @@ public class HiveAlterHandler implements AlterHandler { updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part); msdb.alterPartition(dbname, name, new_part.getValues(), new_part); - if (transactionalListeners != null && transactionalListeners.size() > 0) { - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); - alterPartitionEvent.setEnvironmentContext(environmentContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterPartition(alterPartitionEvent); - } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventMessage.EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler), + environmentContext); + + } success = msdb.commitTransaction(); } catch (InvalidObjectException e) { @@ -499,13 +499,11 @@ public class HiveAlterHandler implements AlterHandler { } } - if (transactionalListeners != null && transactionalListeners.size() > 0) { - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); - alterPartitionEvent.setEnvironmentContext(environmentContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterPartition(alterPartitionEvent); - } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventMessage.EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler), + environmentContext); } success = msdb.commitTransaction(); @@ -534,13 +532,11 @@ public class HiveAlterHandler implements AlterHandler { try { msdb.openTransaction(); msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); - if (transactionalListeners != null && transactionalListeners.size() > 0) { - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(new_part, oldPart, tbl, true, handler); - alterPartitionEvent.setEnvironmentContext(environmentContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterPartition(alterPartitionEvent); - } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventMessage.EventType.ALTER_PARTITION, + new AlterPartitionEvent(new_part, oldPart, tbl, success, handler), + environmentContext); } revertMetaDataTransaction = msdb.commitTransaction(); @@ -625,12 +621,10 @@ public class HiveAlterHandler implements AlterHandler { "when invoking MetaStoreEventListener for alterPartitions event."); } - if (transactionalListeners != null && transactionalListeners.size() > 0) { - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, newPart, tbl, true, handler); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterPartition(alterPartitionEvent); - } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventMessage.EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, newPart, tbl, true, handler)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 80b1e98..3aabe22 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -57,8 +57,11 @@ import java.util.regex.Pattern; import javax.jdo.JDOException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimaps; import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -112,6 +115,7 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent; import org.apache.hadoop.hive.metastore.events.PreReadTableEvent; import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -151,10 +155,6 @@ import com.facebook.fb303.fb_status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -869,6 +869,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path dbPath = new Path(db.getLocationUri()); boolean success = false; boolean madeDir = false; + Map<String, String> transactionalListenersResponses = Collections.emptyMap(); try { firePreEvent(new PreCreateDatabaseEvent(db, this)); if (!wh.isDir(dbPath)) { @@ -881,11 +882,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.openTransaction(); ms.createDatabase(db); - if (transactionalListeners.size() > 0) { - CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateDatabase(cde); - } + + if (!transactionalListeners.isEmpty()) { + transactionalListenersResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_DATABASE, + new CreateDatabaseEvent(db, true, this)); } success = ms.commitTransaction(); @@ -896,8 +898,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(dbPath, true); } } - for (MetaStoreEventListener listener : listeners) { - listener.onCreateDatabase(new CreateDatabaseEvent(db, success, this)); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_DATABASE, + new CreateDatabaseEvent(db, success, this), + null, + transactionalListenersResponses); } } } @@ -1012,6 +1019,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Database db = null; List<Path> tablePaths = new ArrayList<Path>(); List<Path> partitionPaths = new ArrayList<Path>(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); db = ms.getDatabase(name); @@ -1094,12 +1102,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (ms.dropDatabase(name)) { - if (transactionalListeners.size() > 0) { - DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropDatabase(dde); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_DATABASE, + new DropDatabaseEvent(db, true, this)); } + success = ms.commitTransaction(); } } finally { @@ -1121,8 +1130,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } // it is not a terrible thing even if the data is not deleted } - for (MetaStoreEventListener listener : listeners) { - listener.onDropDatabase(new DropDatabaseEvent(db, success, this)); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_DATABASE, + new DropDatabaseEvent(db, success, this), + null, + transactionalListenerResponses); } } } @@ -1380,6 +1394,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); Path tblPath = null; boolean success = false, madeDir = false; try { @@ -1440,12 +1455,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); } - if (transactionalListeners.size() > 0) { - CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this); - createTableEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateTable(createTableEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_TABLE, + new CreateTableEvent(tbl, true, this), + envContext); } success = ms.commitTransaction(); @@ -1456,11 +1471,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(tblPath, true); } } - for (MetaStoreEventListener listener : listeners) { - CreateTableEvent createTableEvent = - new CreateTableEvent(tbl, success, this); - createTableEvent.setEnvironmentContext(envContext); - listener.onCreateTable(createTableEvent); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_TABLE, + new CreateTableEvent(tbl, success, this), + envContext, + transactionalListenerResponses); } } } @@ -1625,6 +1642,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Path> partPaths = null; Table tbl = null; boolean ifPurge = false; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); // drop any partitions @@ -1678,12 +1696,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { throw new MetaException(indexName == null ? "Unable to drop table " + tableName: "Unable to drop index table " + tableName + " for index " + indexName); } else { - if (transactionalListeners.size() > 0) { - DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this); - dropTableEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropTable(dropTableEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_TABLE, + new DropTableEvent(tbl, deleteData, true, this), + envContext); } success = ms.commitTransaction(); } @@ -1698,10 +1716,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { deleteTableData(tblPath, ifPurge); // ok even if the data is not deleted } - for (MetaStoreEventListener listener : listeners) { - DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this); - dropTableEvent.setEnvironmentContext(envContext); - listener.onDropTable(dropTableEvent); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_TABLE, + new DropTableEvent(tbl, deleteData, success, this), + envContext, + transactionalListenerResponses); } } return success; @@ -2165,6 +2186,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false, madeDir = false; Path partLocation = null; Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); part.setDbName(dbName); @@ -2221,12 +2243,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (ms.addPartition(part)) { - if (transactionalListeners.size() > 0) { - AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, part, true, this), + envContext); } success = ms.commitTransaction(); @@ -2239,11 +2261,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - for (MetaStoreEventListener listener : listeners) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, part, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - listener.onAddPartition(addPartitionEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, part, success, this), + envContext, + transactionalListenerResponses); } } return part; @@ -2388,8 +2411,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { final Map<PartValEqWrapper, Boolean> addedPartitions = Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>()); final List<Partition> newParts = new ArrayList<Partition>(); - final List<Partition> existingParts = new ArrayList<Partition>();; + final List<Partition> existingParts = new ArrayList<Partition>(); Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + try { ms.openTransaction(); tbl = ms.getTable(dbName, tblName); @@ -2475,7 +2500,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = false; // Notification is generated for newly created partitions only. The subset of partitions // that already exist (existingParts), will not generate notifications. - fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true); + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, newParts, true, this)); + } + success = ms.commitTransaction(); } finally { if (!success) { @@ -2486,12 +2517,26 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); } } - fireMetaStoreAddPartitionEvent(tbl, parts, null, false); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, parts, false, this)); + } } else { - fireMetaStoreAddPartitionEvent(tbl, newParts, null, true); - if (existingParts != null) { - // The request has succeeded but we failed to add these partitions. - fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, newParts, true, this), + null, + transactionalListenerResponses); + + if (!existingParts.isEmpty()) { + // The request has succeeded but we failed to add these partitions. + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, existingParts, false, this)); + } } } } @@ -2578,6 +2623,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy .getPartitionIterator(); Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); tbl = ms.getTable(dbName, tblName); @@ -2651,7 +2697,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); //setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, partitionSpecProxy, true, this)); + } + success = ms.commitTransaction(); return addedPartitions.size(); } finally { @@ -2664,7 +2717,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } } - fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, partitionSpecProxy, true, this), + null, + transactionalListenerResponses); + } } } @@ -2769,6 +2829,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { throws InvalidObjectException, AlreadyExistsException, MetaException, TException { boolean success = false; Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); tbl = ms.getTable(part.getDbName(), part.getTableName()); @@ -2793,7 +2854,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, Arrays.asList(part), true, this), + envContext); + + } + // we proceed only if we'd actually succeeded anyway, otherwise, // we'd have thrown an exception success = ms.commitTransaction(); @@ -2801,64 +2871,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); } - fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success); - } - return part; - } - - private void fireMetaStoreAddPartitionEvent(final Table tbl, - final List<Partition> parts, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && parts != null && !parts.isEmpty()) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, parts, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { - listener.onAddPartition(addPartitionEvent); - } - } - } - private void fireMetaStoreAddPartitionEvent(final Table tbl, - final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && partitionSpec != null) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, partitionSpec, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { - listener.onAddPartition(addPartitionEvent); - } - } - } - - private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, - final List<Partition> parts, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && parts != null && !parts.isEmpty()) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, parts, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); - } - } - } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, Arrays.asList(part), success, this), + envContext, + transactionalListenerResponses); - private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, - final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && partitionSpec != null) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, partitionSpec, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); } } + return part; } - @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2941,6 +2966,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path destPath = new Path(destinationTable.getSd().getLocation(), Warehouse.makePartName(partitionKeysPresent, partValsPresent)); List<Partition> destPartitions = new ArrayList<Partition>(); + + Map<String, String> transactionalListenerResponsesForAddPartition = Collections.emptyMap(); + List<Map<String, String>> transactionalListenerResponsesForDropPartition = + Lists.newArrayListWithCapacity(partitionsToExchange.size()); + try { for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); @@ -2968,8 +2998,22 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, - destinationTable, destPartitions, transactionalListeners, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponsesForAddPartition = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(destinationTable, destPartitions, true, this)); + + for (Partition partition : partitionsToExchange) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(sourceTable, partition, true, true, this); + transactionalListenerResponsesForDropPartition.add( + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + dropPartitionEvent)); + } + } success = ms.commitTransaction(); return destPartitions; @@ -2979,34 +3023,31 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (pathCreated) { wh.renameDir(destPath, sourcePath); } - - fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, - destinationTable, destPartitions, listeners, success); } - } - } - private void fireMetaStoreExchangePartitionEvent(Table sourceTable, - List<Partition> partitionsToExchange, Table destinationTable, - List<Partition> destPartitions, - List<MetaStoreEventListener> eventListeners, - boolean status) throws MetaException { - if (sourceTable != null && destinationTable != null - && !CollectionUtils.isEmpty(partitionsToExchange) - && !CollectionUtils.isEmpty(destPartitions)) { - if (eventListeners.size() > 0) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(destinationTable, destPartitions, status, this); - for (MetaStoreEventListener eventListener : eventListeners) { - eventListener.onAddPartition(addPartitionEvent); - } + if (!listeners.isEmpty()) { + AddPartitionEvent addPartitionEvent = new AddPartitionEvent(destinationTable, destPartitions, success, this); + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + addPartitionEvent, + null, + transactionalListenerResponsesForAddPartition); + i = 0; for (Partition partition : partitionsToExchange) { DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(sourceTable, partition, true, status, this); - for (MetaStoreEventListener eventListener : eventListeners) { - eventListener.onDropPartition(dropPartitionEvent); - } + new DropPartitionEvent(sourceTable, partition, success, true, this); + Map<String, String> parameters = + (transactionalListenerResponsesForDropPartition.size() > i) + ? transactionalListenerResponsesForDropPartition.get(i) + : null; + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + dropPartitionEvent, + null, + parameters); + i++; } } } @@ -3024,6 +3065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path archiveParentDir = null; boolean mustPurge = false; boolean isExternalTbl = false; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); @@ -3056,13 +3098,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); } else { - if (transactionalListeners.size() > 0) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, true, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropPartition(dropPartitionEvent); - } + if (!transactionalListeners.isEmpty()) { + + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, true, deleteData, this), + envContext); } success = ms.commitTransaction(); } @@ -3090,11 +3132,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // ok even if the data is not deleted } } - for (MetaStoreEventListener listener : listeners) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - listener.onDropPartition(dropPartitionEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, success, deleteData, this), + envContext, + transactionalListenerResponses); } } return true; @@ -3156,6 +3199,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Partition> parts = null; boolean mustPurge = false; boolean isExternalTbl = false; + List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList(); + try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes. @@ -3239,14 +3284,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } ms.dropPartitions(dbName, tblName, partNames); - if (parts != null && transactionalListeners.size() > 0) { + if (parts != null && !transactionalListeners.isEmpty()) { for (Partition part : parts) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, true, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropPartition(dropPartitionEvent); - } + transactionalListenerResponses.add( + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, true, deleteData, this), + envContext)); } } @@ -3280,12 +3324,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } if (parts != null) { - for (Partition part : parts) { - for (MetaStoreEventListener listener : listeners) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - listener.onDropPartition(dropPartitionEvent); + int i = 0; + if (parts != null && !listeners.isEmpty()) { + for (Partition part : parts) { + Map<String, String> parameters = + (!transactionalListenerResponses.isEmpty()) ? transactionalListenerResponses.get(i) : null; + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, success, deleteData, this), + envContext, + parameters); + + i++; } } } @@ -3720,14 +3771,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Only fetch the table if we actually have a listener Table table = null; - for (MetaStoreEventListener listener : listeners) { + if (!listeners.isEmpty()) { if (table == null) { table = getMS().getTable(db_name, tbl_name); } - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, new_part, table, true, this); - alterPartitionEvent.setEnvironmentContext(envContext); - listener.onAlterPartition(alterPartitionEvent); + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, new_part, table, true, this), + envContext); } } catch (InvalidObjectException e) { ex = e; @@ -3791,13 +3843,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { else { throw new InvalidOperationException("failed to alterpartitions"); } - for (MetaStoreEventListener listener : listeners) { - if (table == null) { - table = getMS().getTable(db_name, tbl_name); - } - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this); - listener.onAlterPartition(alterPartitionEvent); + + if (table == null) { + table = getMS().getTable(db_name, tbl_name); + } + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this)); } } } catch (InvalidObjectException e) { @@ -3834,16 +3888,17 @@ public class HiveMetaStore extends ThriftHiveMetastore { Exception ex = null; Index oldIndex = null; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); ms.alterIndex(dbname, base_table_name, index_name, newIndex); - if (transactionalListeners.size() > 0) { - AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterIndex(alterIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ALTER_INDEX, + new AlterIndexEvent(oldIndex, newIndex, true, this)); } success = ms.commitTransaction(); @@ -3865,9 +3920,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } endFunction("alter_index", success, ex, base_table_name); - for (MetaStoreEventListener listener : listeners) { - AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this); - listener.onAlterIndex(alterIndexEvent); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_INDEX, + new AlterIndexEvent(oldIndex, newIndex, success, this), + null, + transactionalListenerResponses); } } } @@ -3935,11 +3994,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext, this); success = true; - for (MetaStoreEventListener listener : listeners) { - AlterTableEvent alterTableEvent = - new AlterTableEvent(oldt, newTable, success, this); - alterTableEvent.setEnvironmentContext(envContext); - listener.onAlterTable(alterTableEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_TABLE, + new AlterTableEvent(oldt, newTable, true, this), + envContext); } } catch (NoSuchObjectException e) { // thrown when the table to be altered does not exist @@ -4506,6 +4565,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false, indexTableCreated = false; String[] qualified = MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName()); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); firePreEvent(new PreAddIndexEvent(index, this)); @@ -4543,11 +4603,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { index.setCreateTime((int) time); index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); if (ms.addIndex(index)) { - if (transactionalListeners.size() > 0) { - AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddIndex(addIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_INDEX, + new AddIndexEvent(index, true, this)); } } @@ -4564,9 +4624,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.rollbackTransaction(); } - for (MetaStoreEventListener listener : listeners) { - AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this); - listener.onAddIndex(addIndexEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_INDEX, + new AddIndexEvent(index, success, this), + null, + transactionalListenerResponses); } } } @@ -4604,6 +4667,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Index index = null; Path tblPath = null; List<Path> partPaths = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); // drop the underlying index table @@ -4636,11 +4700,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - if (transactionalListeners.size() > 0) { - DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropIndex(dropIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_INDEX, + new DropIndexEvent(index, true, this)); } success = ms.commitTransaction(); @@ -4653,11 +4717,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // ok even if the data is not deleted } // Skip the event listeners if the index is NULL - if (index != null) { - for (MetaStoreEventListener listener : listeners) { - DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, this); - listener.onDropIndex(dropIndexEvent); - } + if (index != null && !listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_INDEX, + new DropIndexEvent(index, success, this), + null, + transactionalListenerResponses); } } return success; @@ -6093,6 +6158,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { validateFunctionInfo(func); boolean success = false; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); Database db = ms.getDatabase(func.getDbName()); @@ -6109,11 +6175,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { long time = System.currentTimeMillis() / 1000; func.setCreateTime((int) time); ms.createFunction(func); - if (transactionalListeners.size() > 0) { - CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateFunction(createFunctionEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_FUNCTION, + new CreateFunctionEvent(func, true, this)); } success = ms.commitTransaction(); @@ -6122,11 +6188,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.rollbackTransaction(); } - if (listeners.size() > 0) { - CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this); - for (MetaStoreEventListener listener : listeners) { - listener.onCreateFunction(createFunctionEvent); - } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_FUNCTION, + new CreateFunctionEvent(func, success, this), + null, + transactionalListenerResponses); } } } @@ -6138,6 +6205,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false; Function func = null; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); func = ms.getFunction(dbName, funcName); @@ -6147,10 +6215,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.dropFunction(dbName, funcName); if (transactionalListeners.size() > 0) { - DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropFunction(dropFunctionEvent); - } + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_FUNCTION, + new DropFunctionEvent(func, true, this)); } success = ms.commitTransaction(); @@ -6160,10 +6228,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (listeners.size() > 0) { - DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this); - for (MetaStoreEventListener listener : listeners) { - listener.onDropFunction(dropFunctionEvent); - } + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_FUNCTION, + new DropFunctionEvent(func, success, this), + null, + transactionalListenerResponses); } } } @@ -6530,13 +6599,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst .getData().getInsertData(), rqst.isSuccessful(), this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onInsert(event); - } - for (MetaStoreEventListener listener : listeners) { - listener.onInsert(event); - } + /* + * The transactional listener response will be set already on the event, so there is not need + * to pass the response to the non-transactional listener. + */ + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event); + MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event); return new FireEventResponse();