Repository: atlas Updated Branches: refs/heads/branch-1.0 27be0da52 -> e576b763a
ATLAS-2853: updated to send entity-notifications after successful graph transaction commit (cherry picked from commit 56eefb2a9806265df997dda2ef70274eede72675) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e576b763 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e576b763 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e576b763 Branch: refs/heads/branch-1.0 Commit: e576b763a21ae3089e80a1448aaef9e9a678fcb8 Parents: 27be0da Author: Madhan Neethiraj <mad...@apache.org> Authored: Thu Aug 30 19:36:35 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Aug 31 09:47:10 2018 -0700 ---------------------------------------------------------------------- .../graph/v2/AtlasEntityChangeNotifier.java | 27 ++-- .../EntityNotificationListenerV2.java | 15 ++- .../notification/EntityNotificationSender.java | 131 +++++++++++++++++++ .../NotificationEntityChangeListener.java | 37 ++---- 4 files changed, 165 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/e576b763/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java index 193e19c..1b09d49 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java @@ -38,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.FullTextMapperV2; import org.apache.atlas.repository.graph.GraphHelper; @@ -56,13 +55,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY; -import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; @Component @@ -74,6 +71,7 @@ public class AtlasEntityChangeNotifier { private final AtlasInstanceConverter instanceConverter; private final FullTextMapperV2 fullTextMapperV2; private final AtlasTypeRegistry atlasTypeRegistry; + private final boolean isV2EntityNotificationEnabled; @Inject @@ -82,11 +80,12 @@ public class AtlasEntityChangeNotifier { AtlasInstanceConverter instanceConverter, FullTextMapperV2 fullTextMapperV2, AtlasTypeRegistry atlasTypeRegistry) { - this.entityChangeListeners = entityChangeListeners; - this.entityChangeListenersV2 = entityChangeListenersV2; - this.instanceConverter = instanceConverter; - this.fullTextMapperV2 = fullTextMapperV2; - this.atlasTypeRegistry = atlasTypeRegistry; + this.entityChangeListeners = entityChangeListeners; + this.entityChangeListenersV2 = entityChangeListenersV2; + this.instanceConverter = instanceConverter; + this.fullTextMapperV2 = fullTextMapperV2; + this.atlasTypeRegistry = atlasTypeRegistry; + this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled(); } public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { @@ -114,7 +113,7 @@ public class AtlasEntityChangeNotifier { } public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { doFullTextMapping(entity.getGuid()); for (EntityChangeListenerV2 listener : entityChangeListenersV2) { @@ -141,7 +140,7 @@ public class AtlasEntityChangeNotifier { } public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { doFullTextMapping(entity.getGuid()); for (EntityChangeListenerV2 listener : entityChangeListenersV2) { @@ -168,7 +167,7 @@ public class AtlasEntityChangeNotifier { } public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { doFullTextMapping(entity.getGuid()); for (EntityChangeListenerV2 listener : entityChangeListenersV2) { @@ -197,7 +196,7 @@ public class AtlasEntityChangeNotifier { public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { // listeners notified on term-entity association only if v2 notifications are enabled - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) { listener.onTermAdded(term, entityIds); } @@ -216,7 +215,7 @@ public class AtlasEntityChangeNotifier { public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { // listeners notified on term-entity disassociation only if v2 notifications are enabled - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) { listener.onTermDeleted(term, entityIds); } @@ -272,7 +271,7 @@ public class AtlasEntityChangeNotifier { return; } - if (isV2EntityNotificationEnabled()) { + if (isV2EntityNotificationEnabled) { notifyV2Listeners(entityHeaders, operation, isImport); } else { notifyV1Listeners(entityHeaders, operation, isImport); http://git-wip-us.apache.org/repos/asf/atlas/blob/e576b763/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java index c70011f..9587af9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -35,6 +35,8 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; @@ -45,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES; import static org.apache.atlas.repository.graph.GraphHelper.isInternalType; import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*; import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME; @@ -56,15 +57,17 @@ import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.QU @Component public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { - private final AtlasTypeRegistry typeRegistry; - private final NotificationInterface notificationInterface; + private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationListenerV2.class); + + private final AtlasTypeRegistry typeRegistry; + private final EntityNotificationSender<EntityNotificationV2> notificationSender; @Inject public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, NotificationInterface notificationInterface, Configuration configuration) { - this.typeRegistry = typeRegistry; - this.notificationInterface = notificationInterface; + this.typeRegistry = typeRegistry; + this.notificationSender = new EntityNotificationSender<>(notificationInterface, configuration); } @Override @@ -127,7 +130,7 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { if (!messages.isEmpty()) { try { - notificationInterface.send(ENTITIES, messages); + notificationSender.send(messages); } catch (NotificationException e) { throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/e576b763/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java new file mode 100644 index 0000000..c2ae512 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES; + + +public class EntityNotificationSender<T> { + private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationSender.class); + + private final static boolean NOTIFY_POST_COMMIT_DEFAULT = true; + + private final NotificationSender<T> notificationSender; + + public EntityNotificationSender(NotificationInterface notificationInterface, Configuration configuration) { + this(notificationInterface, configuration != null ? configuration.getBoolean("atlas.notification.send.postcommit", NOTIFY_POST_COMMIT_DEFAULT) : NOTIFY_POST_COMMIT_DEFAULT); + } + + public EntityNotificationSender(NotificationInterface notificationInterface, boolean sendPostCommit) { + if (sendPostCommit) { + LOG.info("EntityNotificationSender: notifications will be sent after transaction commit"); + + this.notificationSender = new PostCommitNotificationSender(notificationInterface); + } else { + LOG.info("EntityNotificationSender: notifications will be sent inline (i.e. not waiting for transaction to commit)"); + + this.notificationSender = new InlineNotificationSender(notificationInterface); + } + } + + public void send(List<T> notifications) throws NotificationException { + this.notificationSender.send(notifications); + } + + + private interface NotificationSender<T> { + void send(List<T> notifications) throws NotificationException; + } + + private class InlineNotificationSender<T> implements NotificationSender<T> { + private final NotificationInterface notificationInterface; + + public InlineNotificationSender(NotificationInterface notificationInterface) { + this.notificationInterface = notificationInterface; + } + + @Override + public void send(List<T> notifications) throws NotificationException { + notificationInterface.send(ENTITIES, notifications); + } + } + + private class PostCommitNotificationSender<T> implements NotificationSender<T> { + private final NotificationInterface notificationInterface; + private final ThreadLocal<PostCommitNotificationHook> postCommitNotificationHooks = new ThreadLocal<>(); + + public PostCommitNotificationSender(NotificationInterface notificationInterface) { + this.notificationInterface = notificationInterface; + } + + @Override + public void send(List<T> notifications) throws NotificationException { + PostCommitNotificationHook notificationHook = postCommitNotificationHooks.get(); + + if (notificationHook == null) { + notificationHook = new PostCommitNotificationHook(notifications); + + postCommitNotificationHooks.set(notificationHook); + } else { + notificationHook.addNotifications(notifications); + } + } + + class PostCommitNotificationHook<T> extends GraphTransactionInterceptor.PostTransactionHook { + private final List<T> notifications = new ArrayList<>(); + + public PostCommitNotificationHook(List<T> notifications) { + this.addNotifications(notifications); + } + + public void addNotifications(List<T> notifications) { + if (notifications != null) { + this.notifications.addAll(notifications); + } + } + + @Override + public void onComplete(boolean isSuccess) { + postCommitNotificationHooks.remove(); + + if (CollectionUtils.isNotEmpty(notifications)) { + if (isSuccess) { + try { + notificationInterface.send(ENTITIES, notifications); + } catch (NotificationException excp) { + LOG.error("failed to send entity notifications", excp); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction not committed. Not sending {} notifications: {}", notifications.size(), notifications); + } + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/e576b763/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index 1eeecef..b5e7ed8 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -18,11 +18,9 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.model.glossary.AtlasGlossaryTerm; -import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.notification.EntityNotificationV1; @@ -45,11 +43,11 @@ import java.util.*; public class NotificationEntityChangeListener implements EntityChangeListener { protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; - private final NotificationInterface notificationInterface; - private final AtlasTypeRegistry typeRegistry; - private final Map<String, List<String>> notificationAttributesCache = new HashMap<>(); + private final AtlasTypeRegistry typeRegistry; + private final Configuration configuration; + private final EntityNotificationSender<EntityNotificationV1> notificationSender; + private final Map<String, List<String>> notificationAttributesCache = new HashMap<>(); - private static Configuration APPLICATION_PROPERTIES = null; @@ -62,9 +60,11 @@ public class NotificationEntityChangeListener implements EntityChangeListener { * @param typeRegistry the Atlas type system */ @Inject - public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry) { - this.notificationInterface = notificationInterface; - this.typeRegistry = typeRegistry; + public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry, Configuration configuration) { + this.typeRegistry = typeRegistry; + this.configuration = configuration; + this.notificationSender = new EntityNotificationSender<>(notificationInterface, configuration); + } @@ -184,20 +184,17 @@ public class NotificationEntityChangeListener implements EntityChangeListener { } if (!messages.isEmpty()) { - notificationInterface.send(NotificationType.ENTITIES, messages); + notificationSender.send(messages); } } private List<String> getNotificationAttributes(String entityType) { List<String> ret = null; - initApplicationProperties(); - if (notificationAttributesCache.containsKey(entityType)) { ret = notificationAttributesCache.get(entityType); - } else if (APPLICATION_PROPERTIES != null) { - String[] notificationAttributes = APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + - entityType + "." + "attributes.include"); + } else if (configuration != null) { + String[] notificationAttributes = configuration.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + entityType + "." + "attributes.include"); if (notificationAttributes != null) { ret = Arrays.asList(notificationAttributes); @@ -208,14 +205,4 @@ public class NotificationEntityChangeListener implements EntityChangeListener { return ret; } - - private void initApplicationProperties() { - if (APPLICATION_PROPERTIES == null) { - try { - APPLICATION_PROPERTIES = ApplicationProperties.get(); - } catch (AtlasException ex) { - // ignore - } - } - } }