Repository: atlas
Updated Branches:
  refs/heads/master 01b195a96 -> 56eefb2a9


ATLAS-2853: updated to send entity-notifications after successful graph 
transaction commit


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/56eefb2a
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/56eefb2a
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/56eefb2a

Branch: refs/heads/master
Commit: 56eefb2a9806265df997dda2ef70274eede72675
Parents: 01b195a
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Thu Aug 30 19:36:35 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Aug 31 09:07:26 2018 -0700

----------------------------------------------------------------------
 .../graph/v2/AtlasEntityChangeNotifier.java     |  27 ++--
 .../EntityNotificationListenerV2.java           |  15 ++-
 .../notification/EntityNotificationSender.java  | 131 +++++++++++++++++++
 .../NotificationEntityChangeListener.java       |  37 ++----
 4 files changed, 165 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/56eefb2a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 783d62f..9d8afdf 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -38,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.FullTextMapperV2;
 import org.apache.atlas.repository.graph.GraphHelper;
@@ -56,13 +55,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
 import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
 import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
-import static 
org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
 
 
 @Component
@@ -74,6 +71,7 @@ public class AtlasEntityChangeNotifier {
     private final AtlasInstanceConverter      instanceConverter;
     private final FullTextMapperV2            fullTextMapperV2;
     private final AtlasTypeRegistry           atlasTypeRegistry;
+    private final boolean                     isV2EntityNotificationEnabled;
 
 
     @Inject
@@ -82,11 +80,12 @@ public class AtlasEntityChangeNotifier {
                                      AtlasInstanceConverter instanceConverter,
                                      FullTextMapperV2 fullTextMapperV2,
                                      AtlasTypeRegistry atlasTypeRegistry) {
-        this.entityChangeListeners   = entityChangeListeners;
-        this.entityChangeListenersV2 = entityChangeListenersV2;
-        this.instanceConverter       = instanceConverter;
-        this.fullTextMapperV2 = fullTextMapperV2;
-        this.atlasTypeRegistry = atlasTypeRegistry;
+        this.entityChangeListeners         = entityChangeListeners;
+        this.entityChangeListenersV2       = entityChangeListenersV2;
+        this.instanceConverter             = instanceConverter;
+        this.fullTextMapperV2              = fullTextMapperV2;
+        this.atlasTypeRegistry             = atlasTypeRegistry;
+        this.isV2EntityNotificationEnabled = 
AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
     }
 
     public void onEntitiesMutated(EntityMutationResponse 
entityMutationResponse, boolean isImport) throws AtlasBaseException {
@@ -114,7 +113,7 @@ public class AtlasEntityChangeNotifier {
     }
 
     public void onClassificationAddedToEntity(AtlasEntity entity, 
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
 
             for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
@@ -141,7 +140,7 @@ public class AtlasEntityChangeNotifier {
     }
 
     public void onClassificationUpdatedToEntity(AtlasEntity entity, 
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
 
             for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
@@ -168,7 +167,7 @@ public class AtlasEntityChangeNotifier {
     }
 
     public void onClassificationDeletedFromEntity(AtlasEntity entity, 
List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
 
             for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
@@ -197,7 +196,7 @@ public class AtlasEntityChangeNotifier {
 
     public void onTermAddedToEntities(AtlasGlossaryTerm term, 
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity association only if v2 
notifications are enabled
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
                 listener.onTermAdded(term, entityIds);
             }
@@ -216,7 +215,7 @@ public class AtlasEntityChangeNotifier {
 
     public void onTermDeletedFromEntities(AtlasGlossaryTerm term, 
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity disassociation only if v2 
notifications are enabled
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
                 listener.onTermDeleted(term, entityIds);
             }
@@ -279,7 +278,7 @@ public class AtlasEntityChangeNotifier {
             return;
         }
 
-        if (isV2EntityNotificationEnabled()) {
+        if (isV2EntityNotificationEnabled) {
             notifyV2Listeners(entityHeaders, operation, isImport);
         } else {
             notifyV1Listeners(entityHeaders, operation, isImport);

http://git-wip-us.apache.org/repos/asf/atlas/blob/56eefb2a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index c70011f..9587af9 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -35,6 +35,8 @@ import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
@@ -45,7 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
 import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
 import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*;
 import static 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME;
@@ -56,15 +57,17 @@ import static 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.QU
 
 @Component
 public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
-    private final AtlasTypeRegistry     typeRegistry;
-    private final NotificationInterface notificationInterface;
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityNotificationListenerV2.class);
+
+    private final AtlasTypeRegistry                              typeRegistry;
+    private final EntityNotificationSender<EntityNotificationV2> 
notificationSender;
 
     @Inject
     public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry,
                                         NotificationInterface 
notificationInterface,
                                         Configuration configuration) {
-        this.typeRegistry          = typeRegistry;
-        this.notificationInterface = notificationInterface;
+        this.typeRegistry       = typeRegistry;
+        this.notificationSender = new 
EntityNotificationSender<>(notificationInterface, configuration);
     }
 
     @Override
@@ -127,7 +130,7 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
 
         if (!messages.isEmpty()) {
             try {
-                notificationInterface.send(ENTITIES, messages);
+                notificationSender.send(messages);
             } catch (NotificationException e) {
                 throw new 
AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, 
operationType.name());
             }

http://git-wip-us.apache.org/repos/asf/atlas/blob/56eefb2a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java
 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java
new file mode 100644
index 0000000..c2ae512
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
+
+
+public class EntityNotificationSender<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityNotificationSender.class);
+
+    private final static boolean NOTIFY_POST_COMMIT_DEFAULT = true;
+
+    private final NotificationSender<T> notificationSender;
+
+    public EntityNotificationSender(NotificationInterface 
notificationInterface, Configuration configuration) {
+        this(notificationInterface, configuration != null ? 
configuration.getBoolean("atlas.notification.send.postcommit", 
NOTIFY_POST_COMMIT_DEFAULT) : NOTIFY_POST_COMMIT_DEFAULT);
+    }
+
+    public EntityNotificationSender(NotificationInterface 
notificationInterface, boolean sendPostCommit) {
+        if (sendPostCommit) {
+            LOG.info("EntityNotificationSender: notifications will be sent 
after transaction commit");
+
+            this.notificationSender = new 
PostCommitNotificationSender(notificationInterface);
+        } else {
+            LOG.info("EntityNotificationSender: notifications will be sent 
inline (i.e. not waiting for transaction to commit)");
+
+            this.notificationSender = new 
InlineNotificationSender(notificationInterface);
+        }
+    }
+
+    public void send(List<T> notifications) throws NotificationException {
+        this.notificationSender.send(notifications);
+    }
+
+
+    private interface NotificationSender<T> {
+        void send(List<T> notifications) throws NotificationException;
+    }
+
+    private class InlineNotificationSender<T> implements NotificationSender<T> 
{
+        private final NotificationInterface notificationInterface;
+
+        public InlineNotificationSender(NotificationInterface 
notificationInterface) {
+            this.notificationInterface = notificationInterface;
+        }
+
+        @Override
+        public void send(List<T> notifications) throws NotificationException {
+            notificationInterface.send(ENTITIES, notifications);
+        }
+    }
+
+    private class PostCommitNotificationSender<T> implements 
NotificationSender<T> {
+        private final NotificationInterface                   
notificationInterface;
+        private final ThreadLocal<PostCommitNotificationHook> 
postCommitNotificationHooks = new ThreadLocal<>();
+
+        public PostCommitNotificationSender(NotificationInterface 
notificationInterface) {
+            this.notificationInterface = notificationInterface;
+        }
+
+        @Override
+        public void send(List<T> notifications) throws NotificationException {
+            PostCommitNotificationHook notificationHook = 
postCommitNotificationHooks.get();
+
+            if (notificationHook == null) {
+                notificationHook = new 
PostCommitNotificationHook(notifications);
+
+                postCommitNotificationHooks.set(notificationHook);
+            } else {
+                notificationHook.addNotifications(notifications);
+            }
+        }
+
+        class PostCommitNotificationHook<T> extends 
GraphTransactionInterceptor.PostTransactionHook {
+            private final List<T> notifications = new ArrayList<>();
+
+            public PostCommitNotificationHook(List<T> notifications) {
+                this.addNotifications(notifications);
+            }
+
+            public void addNotifications(List<T> notifications) {
+                if (notifications != null) {
+                    this.notifications.addAll(notifications);
+                }
+            }
+
+            @Override
+            public void onComplete(boolean isSuccess) {
+                postCommitNotificationHooks.remove();
+
+                if (CollectionUtils.isNotEmpty(notifications)) {
+                    if (isSuccess) {
+                        try {
+                            notificationInterface.send(ENTITIES, 
notifications);
+                        } catch (NotificationException excp) {
+                            LOG.error("failed to send entity notifications", 
excp);
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Transaction not committed. Not sending 
{} notifications: {}", notifications.size(), notifications);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/56eefb2a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 1eeecef..b5e7ed8 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -18,11 +18,9 @@
 package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
-import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.v1.model.notification.EntityNotificationV1;
@@ -45,11 +43,11 @@ import java.util.*;
 public class NotificationEntityChangeListener implements EntityChangeListener {
     protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = 
"atlas.notification.entity";
 
-    private final NotificationInterface     notificationInterface;
-    private final AtlasTypeRegistry         typeRegistry;
-    private final Map<String, List<String>> notificationAttributesCache = new 
HashMap<>();
+    private final AtlasTypeRegistry                              typeRegistry;
+    private final Configuration                                  configuration;
+    private final EntityNotificationSender<EntityNotificationV1> 
notificationSender;
+    private final Map<String, List<String>>                      
notificationAttributesCache = new HashMap<>();
 
-    private static Configuration APPLICATION_PROPERTIES = null;
 
 
 
@@ -62,9 +60,11 @@ public class NotificationEntityChangeListener implements 
EntityChangeListener {
      * @param typeRegistry the Atlas type system
      */
     @Inject
-    public NotificationEntityChangeListener(NotificationInterface 
notificationInterface, AtlasTypeRegistry typeRegistry) {
-        this.notificationInterface = notificationInterface;
-        this.typeRegistry          = typeRegistry;
+    public NotificationEntityChangeListener(NotificationInterface 
notificationInterface, AtlasTypeRegistry typeRegistry, Configuration 
configuration) {
+        this.typeRegistry       = typeRegistry;
+        this.configuration      = configuration;
+        this.notificationSender = new 
EntityNotificationSender<>(notificationInterface, configuration);
+
     }
 
 
@@ -184,20 +184,17 @@ public class NotificationEntityChangeListener implements 
EntityChangeListener {
         }
 
         if (!messages.isEmpty()) {
-            notificationInterface.send(NotificationType.ENTITIES, messages);
+            notificationSender.send(messages);
         }
     }
 
     private List<String> getNotificationAttributes(String entityType) {
         List<String> ret = null;
 
-        initApplicationProperties();
-
         if (notificationAttributesCache.containsKey(entityType)) {
             ret = notificationAttributesCache.get(entityType);
-        } else if (APPLICATION_PROPERTIES != null) {
-            String[] notificationAttributes = 
APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." +
-                                                                               
     entityType + "." + "attributes.include");
+        } else if (configuration != null) {
+            String[] notificationAttributes = 
configuration.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + 
entityType + "." + "attributes.include");
 
             if (notificationAttributes != null) {
                 ret = Arrays.asList(notificationAttributes);
@@ -208,14 +205,4 @@ public class NotificationEntityChangeListener implements 
EntityChangeListener {
 
         return ret;
     }
-
-    private void initApplicationProperties() {
-        if (APPLICATION_PROPERTIES == null) {
-            try {
-                APPLICATION_PROPERTIES = ApplicationProperties.get();
-            } catch (AtlasException ex) {
-                // ignore
-            }
-        }
-    }
 }

Reply via email to