Repository: hive Updated Branches: refs/heads/branch-2 e28330535 -> d0df902e4
http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java new file mode 100644 index 0000000..20011cc --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; + +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; + +/** + * This class is used to notify a list of listeners about specific MetaStore events. + */ +@Private +public class MetaStoreListenerNotifier { + private interface EventNotifier { + void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException; + } + + private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap( + ImmutableMap.<EventType, EventNotifier>builder() + .put(EventType.CREATE_DATABASE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateDatabase((CreateDatabaseEvent)event); + } + }) + .put(EventType.DROP_DATABASE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropDatabase((DropDatabaseEvent)event); + } + }) + .put(EventType.CREATE_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateTable((CreateTableEvent)event); + } + }) + .put(EventType.DROP_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropTable((DropTableEvent)event); + } + }) + .put(EventType.ADD_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAddPartition((AddPartitionEvent)event); + } + }) + .put(EventType.DROP_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropPartition((DropPartitionEvent)event); + } + }) + .put(EventType.ALTER_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterTable((AlterTableEvent)event); + } + }) + .put(EventType.ALTER_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterPartition((AlterPartitionEvent)event); + } + }) + .put(EventType.INSERT, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onInsert((InsertEvent)event); + } + }) + .put(EventType.CREATE_FUNCTION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateFunction((CreateFunctionEvent)event); + } + }) + .put(EventType.DROP_FUNCTION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropFunction((DropFunctionEvent)event); + } + }) + .put(EventType.CREATE_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAddIndex((AddIndexEvent)event); + } + }) + .put(EventType.DROP_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropIndex((DropIndexEvent)event); + } + }) + .put(EventType.ALTER_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterIndex((AlterIndexEvent)event); + } + }) + .build() + ); + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event) throws MetaException { + + Preconditions.checkNotNull(listeners, "Listeners must not be null."); + Preconditions.checkNotNull(event, "The event must not be null."); + + for (MetaStoreEventListener listener : listeners) { + notificationEvents.get(eventType).notify(listener, event); + } + + // Each listener called above might set a different parameter on the event. + // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API + // method calls. + return event.getParameters(); + } + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event, + EnvironmentContext environmentContext) throws MetaException { + + Preconditions.checkNotNull(event, "The event must not be null."); + + event.setEnvironmentContext(environmentContext); + return notifyEvent(listeners, eventType, event); + } + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client. + * @param parameters A list of key/value pairs with the new parameters to add. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event, + EnvironmentContext environmentContext, + Map<String, String> parameters) throws MetaException { + + Preconditions.checkNotNull(event, "The event must not be null."); + + event.putParameters(parameters); + return notifyEvent(listeners, eventType, event, environmentContext); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java index 62aeb8c..b741549 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java @@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import javax.annotation.concurrent.NotThreadSafe; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * Base class for all the events which are defined for metastore. + * + * This class is not thread-safe and not expected to be called in parallel. */ +@NotThreadSafe public abstract class ListenerEvent { /** @@ -33,6 +41,26 @@ public abstract class ListenerEvent { private final boolean status; private final HMSHandler handler; + /** + * Key/value parameters used by listeners to store notifications results + * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID. + * + * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe + * to avoid locking overhead. + */ + private Map<String, String> parameters; + + /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the + * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times + * than {@link #getParameters()}, so performance may be better by using this cache. + */ + private Map<String, String> unmodifiableParameters; + + // Listener parameters aren't expected to have many values. So far only + // DbNotificationListener will add a parameter; let's set a low initial capacity for now. + // If we find out many parameters are added, then we can adjust or remove this initial capacity. + private static final int PARAMETERS_INITIAL_CAPACITY = 1; + // Properties passed by the client, to be used in execution hooks. private EnvironmentContext environmentContext = null; @@ -40,6 +68,8 @@ public abstract class ListenerEvent { super(); this.status = status; this.handler = handler; + this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY); + updateUnmodifiableParameters(); } /** @@ -49,6 +79,12 @@ public abstract class ListenerEvent { return status; } + /** + * Set the environment context of the event. + * + * @param environmentContext An EnvironmentContext object that contains environment parameters sent from + * the HMS client. + */ public void setEnvironmentContext(EnvironmentContext environmentContext) { this.environmentContext = environmentContext; } @@ -66,4 +102,74 @@ public abstract class ListenerEvent { public HMSHandler getHandler() { return handler; } + + /** + * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter + * must be added, please use the putParameter() method. + * + * + * @return A map object with all parameters. + */ + public final Map<String, String> getParameters() { + return unmodifiableParameters; + } + + /** + * Put a new parameter to the listener event. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param name Name of the parameter. + * @param value Value of the parameter. + * @throws IllegalStateException if a parameter already exists. + */ + public void putParameter(String name, String value) { + putParameterIfAbsent(name, value); + updateUnmodifiableParameters(); + } + + /** + * Put a new set the parameters to the listener event. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param parameters A Map object with the a set of parameters. + * @throws IllegalStateException if a parameter already exists. + */ + public void putParameters(final Map<String, String> parameters) { + if (parameters != null) { + for (Map.Entry<String, String> entry : parameters.entrySet()) { + putParameterIfAbsent(entry.getKey(), entry.getValue()); + } + + updateUnmodifiableParameters(); + } + } + + /** + * Put a parameter to the listener event only if the parameter is absent. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param name Name of the parameter. + * @param value Value of the parameter. + * @throws IllegalStateException if a parameter already exists. + */ + private void putParameterIfAbsent(String name, String value) { + if (parameters.containsKey(name)) { + throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name); + } + + parameters.put(name, value); + } + + /** + * Keeps a cache of unmodifiable parameters returned by the getParameters() method. + */ + private void updateUnmodifiableParameters() { + unmodifiableParameters = Collections.unmodifiableMap(parameters); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d0df902e/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index 1f87eeb..9b8eaf2 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -37,6 +38,9 @@ import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; @@ -46,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -116,6 +121,51 @@ public class TestObjectStore { } /** + * Test notification operations + */ + @Test + public void testNotificationOps() throws InterruptedException { + final int NO_EVENT_ID = 0; + final int FIRST_EVENT_ID = 1; + final int SECOND_EVENT_ID = 2; + + NotificationEvent event = + new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), ""); + NotificationEventResponse eventResponse; + CurrentNotificationEventId eventId; + + // Verify that there is no notifications available yet + eventId = objectStore.getCurrentNotificationEventId(); + Assert.assertEquals(NO_EVENT_ID, eventId.getEventId()); + + // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID + objectStore.addNotificationEvent(event); + Assert.assertEquals(FIRST_EVENT_ID, event.getEventId()); + objectStore.addNotificationEvent(event); + Assert.assertEquals(SECOND_EVENT_ID, event.getEventId()); + + // Verify that objectStore fetches the latest notification event ID + eventId = objectStore.getCurrentNotificationEventId(); + Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId()); + + // Verify that getNextNotification() returns all events + eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); + Assert.assertEquals(2, eventResponse.getEventsSize()); + Assert.assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId()); + // Verify that getNextNotification(last) returns events after a specified event + eventResponse = objectStore.getNextNotification(new NotificationEventRequest(FIRST_EVENT_ID)); + Assert.assertEquals(1, eventResponse.getEventsSize()); + Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + + // Verify that cleanNotificationEvents() cleans up all old notifications + Thread.sleep(1); + objectStore.cleanNotificationEvents(1); + eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); + Assert.assertEquals(0, eventResponse.getEventsSize()); + } + + /** * Test database operations */ @Test