Repository: atlas
Updated Branches:
  refs/heads/master 1179aff80 -> 52ef9e7f3


ATLAS-2823: updated hooks to support asynchronous notifications


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/52ef9e7f
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/52ef9e7f
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/52ef9e7f

Branch: refs/heads/master
Commit: 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877
Parents: 1179aff
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Tue Aug 14 16:18:19 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Wed Aug 15 13:39:37 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/falcon/hook/FalconHook.java    |  85 +-----
 .../atlas/hbase/bridge/HBaseAtlasHook.java      | 273 +++----------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  12 +-
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  |  14 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java |   9 +-
 .../java/org/apache/atlas/hook/AtlasHook.java   | 176 ++++++++----
 .../org/apache/atlas/hook/AtlasHookTest.java    |  12 +-
 7 files changed, 180 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 77177b4..8c09d33 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -18,13 +18,10 @@
 
 package org.apache.atlas.falcon.hook;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.falcon.bridge.FalconBridge;
 import org.apache.atlas.falcon.event.FalconEvent;
 import org.apache.atlas.falcon.publisher.FalconEventPublisher;
 import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
@@ -32,17 +29,12 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Falcon hook sends lineage information to the Atlas Service.
@@ -50,27 +42,6 @@ import java.util.concurrent.TimeUnit;
 public class FalconHook extends AtlasHook implements FalconEventPublisher {
     private static final Logger LOG = 
LoggerFactory.getLogger(FalconHook.class);
 
-    public static final String CONF_PREFIX = "atlas.hook.falcon.";
-    private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
-    private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
-    private static final String KEEP_ALIVE_TIME = CONF_PREFIX + 
"keepAliveTime";
-    public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
-    public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
-
-    public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-
-    // wait time determines how long we wait before we exit the jvm on
-    // shutdown. Pending requests after that will not be sent.
-    private static final int WAIT_TIME = 3;
-    private static ExecutorService executor;
-
-    private static final int minThreadsDefault = 5;
-    private static final int maxThreadsDefault = 5;
-    private static final long keepAliveTimeDefault = 10;
-    private static final int queueSizeDefault = 10000;
-
-    private static boolean sync;
-
     private static ConfigurationStore STORE;
 
     private enum Operation {
@@ -80,45 +51,11 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
 
     static {
         try {
-            // initialize the async facility to process hook calls. We don't
-            // want to do this inline since it adds plenty of overhead for the 
query.
-            int minThreads = atlasProperties.getInt(MIN_THREADS, 
minThreadsDefault);
-            int maxThreads = atlasProperties.getInt(MAX_THREADS, 
maxThreadsDefault);
-            long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, 
keepAliveTimeDefault);
-            int queueSize = atlasProperties.getInt(QUEUE_SIZE, 
queueSizeDefault);
-            sync = atlasProperties.getBoolean(CONF_SYNC, false);
-
-            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTime, TimeUnit.MILLISECONDS,
-                    new LinkedBlockingQueue<Runnable>(queueSize),
-                    new ThreadFactoryBuilder().setNameFormat("Atlas Logger 
%d").build());
-
-            ShutdownHookManager.get().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        LOG.info("==> Shutdown of Atlas Falcon Hook");
-
-                        executor.shutdown();
-                        executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-                        executor = null;
-                    } catch (InterruptedException ie) {
-                        LOG.info("Interrupt received in shutdown.");
-                    } finally {
-                        LOG.info("<== Shutdown of Atlas Falcon Hook");
-                    }
-                    // shutdown client
-                }
-            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
-
             STORE = ConfigurationStore.get();
-
-            notificationInterface = NotificationProvider.get();
-
         } catch (Exception e) {
             LOG.error("Caught exception initializing the falcon hook.", e);
         }
 
-
         LOG.info("Created Atlas Hook for Falcon");
     }
 
@@ -126,30 +63,12 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
     public void publish(final Data data) {
         final FalconEvent event = data.getEvent();
         try {
-            if (sync) {
-                fireAndForget(event);
-            } else {
-                executor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            fireAndForget(event);
-                        } catch (Throwable e) {
-                            LOG.info("Atlas hook failed", e);
-                        }
-                    }
-                });
-            }
+            fireAndForget(event);
         } catch (Throwable t) {
             LOG.warn("Error in processing data {}", data, t);
         }
     }
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
     private void fireAndForget(FalconEvent event) throws FalconException, 
URISyntaxException {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", 
event.getOperation());
         List<HookNotification> messages = new ArrayList<>();
@@ -163,7 +82,7 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
             break;
 
         }
-        notifyEntities(messages);
+        notifyEntities(messages, null);
     }
 
     private List<Referenceable> createEntities(FalconEvent event, String user) 
throws FalconException, URISyntaxException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
 
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 09998cf..e7e9187 100644
--- 
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ 
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.hbase.bridge;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hbase.model.HBaseOperationContext;
 import org.apache.atlas.hbase.model.HBaseDataTypes;
@@ -26,7 +25,6 @@ import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.notification.HookNotification;
 import 
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
 import 
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
 import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
@@ -40,42 +38,20 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 // This will register Hbase entities into Atlas
 public class HBaseAtlasHook extends AtlasHook {
     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseAtlasHook.class);
 
-    public static final  String CONF_PREFIX      = "atlas.hook.hbase.";
-    public static final  String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    public static final  String QUEUE_SIZE       = CONF_PREFIX + "queueSize";
-    public static final  String CONF_SYNC        = CONF_PREFIX + "synchronous";
-    private static final String MIN_THREADS      = CONF_PREFIX + "minThreads";
-    private static final String MAX_THREADS      = CONF_PREFIX + "maxThreads";
-    private static final String KEEP_ALIVE_TIME  = CONF_PREFIX + 
"keepAliveTime";
-
-    private static final int  minThreadsDefault    = 5;
-    private static final int  maxThreadsDefault    = 5;
-    private static final int  queueSizeDefault     = 10000;
-    private static final long keepAliveTimeDefault = 10;
-    // wait time determines how long we wait before we exit the jvm on 
shutdown. Pending requests after that will not be sent.
-    private static final int  WAIT_TIME            = 3;
-    private static boolean         sync;
-    private static ExecutorService executor;
 
     public static final String HBASE_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
@@ -147,43 +123,6 @@ public class HBaseAtlasHook extends AtlasHook {
         }
     }
 
-    static {
-        try {
-            // initialize the async facility to process hook calls. We don't
-            // want to do this inline since it adds plenty of overhead for the 
query.
-            int  minThreads    = atlasProperties.getInt(MIN_THREADS, 
minThreadsDefault);
-            int  maxThreads    = atlasProperties.getInt(MAX_THREADS, 
maxThreadsDefault);
-            int  queueSize     = atlasProperties.getInt(QUEUE_SIZE, 
queueSizeDefault);
-            long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, 
keepAliveTimeDefault);
-
-            sync = atlasProperties.getBoolean(CONF_SYNC, false);
-            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTime, TimeUnit.MILLISECONDS,
-                                              new 
LinkedBlockingQueue<Runnable>(queueSize), 
-                                              new 
ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
-
-            ShutdownHookManager.get().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        LOG.info("==> Shutdown of Atlas HBase Hook");
-                        executor.shutdown();
-                        executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-                        executor = null;
-                    } catch (InterruptedException ie) {
-                        LOG.info("Interrupt received in shutdown.", ie);
-                    } finally {
-                        LOG.info("<== Shutdown of Atlas HBase Hook");
-                    }
-                    // shutdown client
-                }
-            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
-        } catch (Exception e) {
-            LOG.error("Caught exception initializing the Atlas HBase hook.", 
e);
-        }
-
-        LOG.info("Created Atlas Hook for HBase");
-    }
-
     public static HBaseAtlasHook getInstance() {
         HBaseAtlasHook ret = me;
 
@@ -212,11 +151,6 @@ public class HBaseAtlasHook extends AtlasHook {
         this.clusterName = clusterName;
     }
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
 
     public void createAtlasInstances(HBaseOperationContext 
hbaseOperationContext) {
         HBaseAtlasHook.OPERATION operation = 
hbaseOperationContext.getOperation();
@@ -571,103 +505,21 @@ public class HBaseAtlasHook extends AtlasHook {
         return ret;
     }
 
-    private void notifyAsPrivilegedAction(final HBaseOperationContext 
hbaseOperationContext) {
+    public void sendHBaseNameSpaceOperation(final NamespaceDescriptor 
namespaceDescriptor, final String nameSpace, final OPERATION operation) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> HBaseAtlasHook.notifyAsPrivilegedAction({})", 
hbaseOperationContext);
+            LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()");
         }
 
-        final List<HookNotification> messages = 
hbaseOperationContext.getMessages();
-
         try {
-            PrivilegedExceptionAction<Object> privilegedNotify = new 
PrivilegedExceptionAction<Object>() {
-                @Override
-                public Object run() {
-                    notifyEntities(messages);
-                    return hbaseOperationContext;
-                }
-            };
-
-            //Notify as 'hbase' service user in doAs mode
-            UserGroupInformation realUser         = 
hbaseOperationContext.getUgi().getRealUser();
-            String               numberOfMessages = 
Integer.toString(messages.size());
-            String               operation        = 
hbaseOperationContext.getOperation().toString();
-            String               user             = 
hbaseOperationContext.getUgi().getShortUserName();
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
 
-            if (realUser != null) {
-                LOG.info("Sending notification for event {} as service user {} 
#messages {}", operation, realUser.getShortUserName(), numberOfMessages);
-
-                realUser.doAs(privilegedNotify);
-            } else {
-                LOG.info("Sending notification for event {} as service user {} 
#messages {}", operation, user, numberOfMessages);
-
-                hbaseOperationContext.getUgi().doAs(privilegedNotify);
-            }
-        } catch (Throwable e) {
-            LOG.error("Error during notify {} ", 
hbaseOperationContext.getOperation(), e);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== HBaseAtlasHook.notifyAsPrivilegedAction()");
+            sendNotification(hbaseOperationContext);
+        } catch (Throwable t) {
+            LOG.error("HBaseAtlasHook.sendHBaseNameSpaceOperation(): failed to 
send notification", t);
         }
-    }
 
-    /**
-     * Notify atlas of the entity through message. The entity can be a
-     * complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the
-     * unique attribute on the entities.
-     *
-     * @param messages hook notification messages
-     */
-    protected void notifyEntities(List<HookNotification> messages) {
-        final int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-        notifyEntities(messages, maxRetries);
-    }
-
-    public void sendHBaseNameSpaceOperation(final NamespaceDescriptor 
namespaceDescriptor, final String nameSpace, final OPERATION operation) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()");
-        }
-        try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseNameSpaceOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed due to error ", 
e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
-                    }
-                });
-            }
-        } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.debug("<== HBaseAtlasHook.sendHBaseNameSpaceOperation()");
         }
     }
 
@@ -675,50 +527,17 @@ public class HBaseAtlasHook extends AtlasHook {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation()");
         }
+
         try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseTableOperation(){}",  hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseTableOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseTableOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseTableOperation(): Atlas hook failed due to error ", e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBasecolumnFamilyOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
-                    }
-                });
-            }
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
+
+            sendNotification(hbaseOperationContext);
         } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): failed to 
send notification", t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation()");
         }
     }
 
@@ -726,54 +545,30 @@ public class HBaseAtlasHook extends AtlasHook {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation()");
         }
+
         try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}",  hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseColumnFamilyOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed due to error 
", e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
 
-                    }
-                });
-            }
+            sendNotification(hbaseOperationContext);
         } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): 
failed to send notification", t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation()");
         }
     }
 
+    private void sendNotification(HBaseOperationContext hbaseOperationContext) 
{
+        UserGroupInformation ugi = hbaseOperationContext.getUgi();
+
+        if (ugi != null && ugi.getRealUser() != null) {
+            ugi = ugi.getRealUser();
+        }
+
+        notifyEntities(hbaseOperationContext.getMessages(), ugi);
+    }
+
     private HBaseOperationContext 
handleHBaseNameSpaceOperation(NamespaceDescriptor namespaceDescriptor, String 
nameSpace, OPERATION operation) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.handleHBaseNameSpaceOperation()");

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 78f2e83..b1ffd1d 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +45,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
     public static final String CONF_PREFIX                    = 
"atlas.hook.hive.";
-    public static final String HOOK_NUM_RETRIES               = CONF_PREFIX + 
"numRetries";
     public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + 
"database.name.cache.count";
     public static final String HOOK_TABLE_NAME_CACHE_COUNT    = CONF_PREFIX + 
"table.name.cache.count";
     public static final String CONF_CLUSTER_NAME              = 
"atlas.cluster.name";
@@ -72,11 +73,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     }
 
     @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
-    @Override
     public void run(HookContext hookContext) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
@@ -153,7 +149,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             }
 
             if (event != null) {
-                super.notifyEntities(event.getNotificationMessages());
+                final UserGroupInformation ugi = hookContext.getUgi() == null 
? Utils.getUGI() : hookContext.getUgi();
+
+                super.notifyEntities(event.getNotificationMessages(), ugi);
             }
         } catch (Throwable t) {
             LOG.error("HiveHook.run(): failed to process operation {}", 
hookContext.getOperationName(), t);

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index ac49601..5397a4b 100644
--- 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -52,8 +52,6 @@ import java.util.Date;
 public class SqoopHook extends SqoopJobDataPublisher {
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
 
-    public static final String CONF_PREFIX          = "atlas.hook.sqoop.";
-    public static final String HOOK_NUM_RETRIES     = CONF_PREFIX + 
"numRetries";
     public static final String ATLAS_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
@@ -71,8 +69,12 @@ public class SqoopHook extends SqoopJobDataPublisher {
     public static final String OUTPUTS        = "outputs";
     public static final String ATTRIBUTE_DB   = "db";
 
+    private static final AtlasHookImpl atlasHook;
+
     static {
         
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
+
+        atlasHook = new AtlasHookImpl();
     }
 
     @Override
@@ -95,7 +97,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
             HookNotification message = new 
EntityCreateRequestV2(AtlasHook.getUser(), entities);
 
-            AtlasHook.notifyEntities(Collections.singletonList(message), 
atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+            atlasHook.sendNotification(message);
         } catch(Exception e) {
             LOG.error("SqoopHook.publish() failed", e);
 
@@ -225,4 +227,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
         return name.toString();
     }
+
+    private static class AtlasHookImpl extends AtlasHook {
+        public void sendNotification(HookNotification notification) {
+            super.notifyEntities(Collections.singletonList(notification), 
null);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 17493f6..97668a3 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -63,17 +63,10 @@ import java.util.Date;
 public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
     public static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
 
-    private static final String CONF_PREFIX             = "atlas.hook.storm.";
-    private static final String HOOK_NUM_RETRIES        = CONF_PREFIX + 
"numRetries";
     public  static final String ANONYMOUS_OWNER         = "anonymous"; // if 
Storm topology does not contain the owner instance; possible if Storm is 
running in unsecure mode.
     public  static final String HBASE_NAMESPACE_DEFAULT = "default";
     public  static final String ATTRIBUTE_DB            = "db";
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
     /**
      * This is the client-side hook that storm fires when a topology is added.
      *
@@ -106,7 +99,7 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
 
             List<HookNotification> hookNotifications = 
Collections.singletonList(new EntityCreateRequestV2(user, entity));
 
-            notifyEntities(hookNotifications);
+            notifyEntities(hookNotifications, null);
         } catch (Exception e) {
             throw new RuntimeException("Atlas hook is unable to process the 
topology.", e);
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index bf6a36c..406f679 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -19,46 +19,57 @@
 package org.apache.atlas.hook;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.notification.HookNotification;
-import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.security.InMemoryJAASConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 
 /**
  * A base class for atlas hooks.
  */
 public abstract class AtlasHook {
-
     private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
 
-    protected static Configuration atlasProperties;
-
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS                 
   = "atlas.notification.hook.asynchronous";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS     
   = "atlas.notification.hook.asynchronous.minThreads";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS     
   = "atlas.notification.hook.asynchronous.maxThreads";
+    public static final String 
ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS = 
"atlas.notification.hook.asynchronous.keepAliveTimeMs";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE      
   = "atlas.notification.hook.asynchronous.queueSize";
+    public static final String ATLAS_NOTIFICATION_MAX_RETRIES                  
   = "atlas.notification.hook.retry.maxRetries";
+    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL               
   = "atlas.notification.hook.retry.interval";
+    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY 
   = "atlas.notification.failed.messages.filename";
+    public static final String 
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = 
"atlas.notification.log.failed.messages";
+    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME     
   = "atlas_hook_failed_messages.log";
+
+    protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
 
-    private static boolean logFailedMessages;
-    private static FailedMessagesLogger failedMessagesLogger;
-    private static int notificationRetryInterval;
-    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = 
"atlas.notification.hook.retry.interval";
+    private static final int                  SHUTDOWN_HOOK_WAIT_TIME_MS = 
3000;
+    private static final boolean              logFailedMessages;
+    private static final FailedMessagesLogger failedMessagesLogger;
+    private static final int                  notificationMaxRetries;
+    private static final int                  notificationRetryInterval;
+    private static       ExecutorService      executor = null;
 
-    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY 
=
-            "atlas.notification.failed.messages.filename";
-    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = 
"atlas_hook_failed_messages.log";
-    public static final String 
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
-            "atlas.notification.log.failed.messages";
 
     static {
         try {
@@ -67,12 +78,15 @@ public abstract class AtlasHook {
             LOG.info("Failed to load application properties", e);
         }
 
-        String failedMessageFile = 
atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY,
-                ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+        String failedMessageFile = 
atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, 
ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+
         logFailedMessages = 
atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, 
true);
+
         if (logFailedMessages) {
             failedMessagesLogger = new FailedMessagesLogger(failedMessageFile);
             failedMessagesLogger.init();
+        } else {
+            failedMessagesLogger = null;
         }
 
         if (!isLoginKeytabBased()) {
@@ -81,8 +95,9 @@ public abstract class AtlasHook {
             }
         }
 
+        notificationMaxRetries    = 
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
         notificationRetryInterval = 
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
-        notificationInterface = NotificationProvider.get();
+        notificationInterface     = NotificationProvider.get();
 
         String currentUser = "";
 
@@ -94,15 +109,37 @@ public abstract class AtlasHook {
 
         notificationInterface.setCurrentUser(currentUser);
 
-        LOG.info("Created Atlas Hook");
-    }
+        boolean isAsync = 
atlasProperties.getBoolean(ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE);
+
+        if (isAsync) {
+            int  minThreads      = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1);
+            int  maxThreads      = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 5);
+            long keepAliveTimeMs = 
atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 
10000);
+            int  queueSize       = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 10000);
 
-    protected abstract String getNumberOfRetriesPropertyKey();
+            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTimeMs, TimeUnit.MILLISECONDS,
+                                              new 
LinkedBlockingDeque<>(queueSize),
+                                              new 
ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").build());
+
+            ShutdownHookManager.get().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        LOG.info("==> Shutdown of Atlas Hook");
+
+                        executor.shutdown();
+                        executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, 
TimeUnit.MILLISECONDS);
+                        executor = null;
+                    } catch (InterruptedException excp) {
+                        LOG.info("Interrupt received in shutdown.", excp);
+                    } finally {
+                        LOG.info("<== Shutdown of Atlas Hook");
+                    }
+                }
+            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
+        }
 
-    protected void notifyEntities(String user, List<Referenceable> entities) {
-        List<HookNotification> hookNotifications = new ArrayList<>();
-        hookNotifications.add(new EntityCreateRequest(user, entities));
-        notifyEntities(hookNotifications);
+        LOG.info("Created Atlas Hook");
     }
 
     /**
@@ -114,48 +151,78 @@ public abstract class AtlasHook {
      * @param messages   hook notification messages
      * @param maxRetries maximum number of retries while sending message to 
messaging system
      */
-    public static void notifyEntities(List<HookNotification> messages, int 
maxRetries) {
-        notifyEntitiesInternal(messages, maxRetries, notificationInterface, 
logFailedMessages, failedMessagesLogger);
+    public static void notifyEntities(List<HookNotification> messages, 
UserGroupInformation ugi, int maxRetries) {
+        if (executor == null) { // send synchronously
+            notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger);
+        } else {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger);
+                }
+            });
+        }
     }
 
     @VisibleForTesting
-    static void notifyEntitiesInternal(List<HookNotification> messages, int 
maxRetries,
+    static void notifyEntitiesInternal(List<HookNotification> messages, int 
maxRetries, UserGroupInformation ugi,
                                        NotificationInterface 
notificationInterface,
                                        boolean shouldLogFailedMessages, 
FailedMessagesLogger logger) {
         if (messages == null || messages.isEmpty()) {
             return;
         }
 
-        final String message = messages.toString();
-        int numRetries = 0;
-        while (true) {
-            try {
-                
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
-                return;
-            } catch (Exception e) {
-                numRetries++;
-                if (numRetries < maxRetries) {
-                    LOG.error("Failed to send notification - attempt #{}; 
error={}", numRetries, e.getMessage());
-                    try {
-                        LOG.debug("Sleeping for {} ms before retry", 
notificationRetryInterval);
-                        Thread.sleep(notificationRetryInterval);
-                    } catch (InterruptedException ie) {
-                        LOG.error("Notification hook thread sleep 
interrupted");
-                    }
+        final int    maxAttempts         = maxRetries < 1 ? 1 : maxRetries;
+        final String message             = messages.toString();
+        Exception    notificationFailure = null;
+
+        for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {
+            if (numAttempt > 1) { // retry attempt
+                try {
+                    LOG.debug("Sleeping for {} ms before retry", 
notificationRetryInterval);
+
+                    Thread.sleep(notificationRetryInterval);
+                } catch (InterruptedException ie) {
+                    LOG.error("Notification hook thread sleep interrupted");
 
+                    break;
+                }
+            }
+
+            try {
+                if (ugi == null) {
+                    
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
                 } else {
-                    if (shouldLogFailedMessages && e instanceof 
NotificationException) {
-                        List<String> failedMessages = ((NotificationException) 
e).getFailedMessages();
-                        for (String msg : failedMessages) {
-                            logger.log(msg);
+                    PrivilegedExceptionAction<Object> privilegedNotify = new 
PrivilegedExceptionAction<Object>() {
+                        @Override
+                        public Object run() throws Exception {
+                            
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
+                            return messages;
                         }
-                    }
-                    LOG.error("Failed to notify atlas for entity {} after {} 
retries. Quitting",
-                            message, maxRetries, e);
-                    return;
+                    };
+
+                    ugi.doAs(privilegedNotify);
                 }
+
+                notificationFailure = null; // notification sent successfully, 
reset error
+
+                break;
+            } catch (Exception e) {
+                notificationFailure = e;
+
+                LOG.error("Failed to send notification - attempt #{}; 
error={}", numAttempt, e.getMessage());
             }
         }
+
+        if (shouldLogFailedMessages && notificationFailure instanceof 
NotificationException) {
+            final List<String> failedMessages = ((NotificationException) 
notificationFailure).getFailedMessages();
+
+            for (String msg : failedMessages) {
+                logger.log(msg);
+            }
+
+            LOG.error("Giving up after {} failed attempts to send notification 
to Atlas: {}", maxAttempts, message, notificationFailure);
+        }
     }
 
     /**
@@ -166,9 +233,8 @@ public abstract class AtlasHook {
      *
      * @param messages hook notification messages
      */
-    protected void notifyEntities(List<HookNotification> messages) {
-        final int maxRetries = 
atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
-        notifyEntities(messages, maxRetries);
+    protected void notifyEntities(List<HookNotification> messages, 
UserGroupInformation ugi) {
+        notifyEntities(messages, ugi, notificationMaxRetries);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 0a0620f..1ae7c27 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -55,7 +55,7 @@ public class AtlasHookTest {
         List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 0, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 0, null, 
notificationInterface, false,
                 failedMessagesLogger);
         // if we've reached here, the method finished OK.
     }
@@ -69,7 +69,7 @@ public class AtlasHookTest {
             };
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verify(notificationInterface, times(2)).
@@ -86,7 +86,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message");
@@ -98,7 +98,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);
@@ -114,7 +114,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message1", "test message2")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message1");
@@ -126,7 +126,7 @@ public class AtlasHookTest {
         List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new RuntimeException("test 
message")).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
-        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);

Reply via email to