Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 9da91296f -> 9bdbb3184


ATLAS-2823: updated hooks to support asynchronous notifications

(cherry picked from commit 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9bdbb318
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9bdbb318
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9bdbb318

Branch: refs/heads/branch-0.8
Commit: 9bdbb31842c93412ebfe27d98bd8dcbabe7d80f4
Parents: 9da9129
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Tue Aug 14 16:18:19 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Thu Aug 16 18:27:39 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/falcon/hook/FalconHook.java    |  85 +-----
 .../atlas/hbase/bridge/HBaseAtlasHook.java      | 274 +++----------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  12 +-
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  |  14 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java | 138 +++++-----
 .../java/org/apache/atlas/hook/AtlasHook.java   | 193 ++++++++-----
 .../org/apache/atlas/hook/AtlasHookTest.java    |  12 +-
 7 files changed, 250 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 188b754..6876595 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -18,30 +18,22 @@
 
 package org.apache.atlas.falcon.hook;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.falcon.bridge.FalconBridge;
 import org.apache.atlas.falcon.event.FalconEvent;
 import org.apache.atlas.falcon.publisher.FalconEventPublisher;
 import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Falcon hook sends lineage information to the Atlas Service.
@@ -49,27 +41,6 @@ import java.util.concurrent.TimeUnit;
 public class FalconHook extends AtlasHook implements FalconEventPublisher {
     private static final Logger LOG = 
LoggerFactory.getLogger(FalconHook.class);
 
-    public static final String CONF_PREFIX = "atlas.hook.falcon.";
-    private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
-    private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
-    private static final String KEEP_ALIVE_TIME = CONF_PREFIX + 
"keepAliveTime";
-    public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
-    public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
-
-    public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-
-    // wait time determines how long we wait before we exit the jvm on
-    // shutdown. Pending requests after that will not be sent.
-    private static final int WAIT_TIME = 3;
-    private static ExecutorService executor;
-
-    private static final int minThreadsDefault = 5;
-    private static final int maxThreadsDefault = 5;
-    private static final long keepAliveTimeDefault = 10;
-    private static final int queueSizeDefault = 10000;
-
-    private static boolean sync;
-
     private static ConfigurationStore STORE;
 
     private enum Operation {
@@ -79,45 +50,11 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
 
     static {
         try {
-            // initialize the async facility to process hook calls. We don't
-            // want to do this inline since it adds plenty of overhead for the 
query.
-            int minThreads = atlasProperties.getInt(MIN_THREADS, 
minThreadsDefault);
-            int maxThreads = atlasProperties.getInt(MAX_THREADS, 
maxThreadsDefault);
-            long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, 
keepAliveTimeDefault);
-            int queueSize = atlasProperties.getInt(QUEUE_SIZE, 
queueSizeDefault);
-            sync = atlasProperties.getBoolean(CONF_SYNC, false);
-
-            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTime, TimeUnit.MILLISECONDS,
-                    new LinkedBlockingQueue<Runnable>(queueSize),
-                    new ThreadFactoryBuilder().setNameFormat("Atlas Logger 
%d").build());
-
-            ShutdownHookManager.get().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        LOG.info("==> Shutdown of Atlas Falcon Hook");
-
-                        executor.shutdown();
-                        executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-                        executor = null;
-                    } catch (InterruptedException ie) {
-                        LOG.info("Interrupt received in shutdown.");
-                    } finally {
-                        LOG.info("<== Shutdown of Atlas Falcon Hook");
-                    }
-                    // shutdown client
-                }
-            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
-
             STORE = ConfigurationStore.get();
-
-            notificationInterface = NotificationProvider.get();
-
         } catch (Exception e) {
             LOG.error("Caught exception initializing the falcon hook.", e);
         }
 
-
         LOG.info("Created Atlas Hook for Falcon");
     }
 
@@ -125,30 +62,12 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
     public void publish(final Data data) {
         final FalconEvent event = data.getEvent();
         try {
-            if (sync) {
-                fireAndForget(event);
-            } else {
-                executor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            fireAndForget(event);
-                        } catch (Throwable e) {
-                            LOG.info("Atlas hook failed", e);
-                        }
-                    }
-                });
-            }
+            fireAndForget(event);
         } catch (Throwable t) {
             LOG.warn("Error in processing data {}", data, t);
         }
     }
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
     private void fireAndForget(FalconEvent event) throws FalconException, 
URISyntaxException {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", 
event.getOperation());
         List<HookNotification.HookNotificationMessage> messages = new 
ArrayList<>();
@@ -162,7 +81,7 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
             break;
 
         }
-        notifyEntities(messages);
+        notifyEntities(messages, null);
     }
 
     private List<Referenceable> createEntities(FalconEvent event, String user) 
throws FalconException, URISyntaxException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
 
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 314ff68..d4758a2 100644
--- 
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ 
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.hbase.bridge;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hbase.model.HBaseOperationContext;
 import org.apache.atlas.hbase.model.HBaseDataTypes;
@@ -29,7 +28,6 @@ import org.apache.atlas.model.instance.AtlasObjectId;
 import 
org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2;
 import 
org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequestV2;
 import 
org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
-import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
@@ -40,42 +38,20 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 // This will register Hbase entities into Atlas
 public class HBaseAtlasHook extends AtlasHook {
     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseAtlasHook.class);
 
-    public static final  String CONF_PREFIX      = "atlas.hook.hbase.";
-    public static final  String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    public static final  String QUEUE_SIZE       = CONF_PREFIX + "queueSize";
-    public static final  String CONF_SYNC        = CONF_PREFIX + "synchronous";
-    private static final String MIN_THREADS      = CONF_PREFIX + "minThreads";
-    private static final String MAX_THREADS      = CONF_PREFIX + "maxThreads";
-    private static final String KEEP_ALIVE_TIME  = CONF_PREFIX + 
"keepAliveTime";
-
-    private static final int  minThreadsDefault    = 5;
-    private static final int  maxThreadsDefault    = 5;
-    private static final int  queueSizeDefault     = 10000;
-    private static final long keepAliveTimeDefault = 10;
-    // wait time determines how long we wait before we exit the jvm on 
shutdown. Pending requests after that will not be sent.
-    private static final int  WAIT_TIME            = 3;
-    private static boolean         sync;
-    private static ExecutorService executor;
 
     public static final String HBASE_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
@@ -147,43 +123,6 @@ public class HBaseAtlasHook extends AtlasHook {
         }
     }
 
-    static {
-        try {
-            // initialize the async facility to process hook calls. We don't
-            // want to do this inline since it adds plenty of overhead for the 
query.
-            int  minThreads    = atlasProperties.getInt(MIN_THREADS, 
minThreadsDefault);
-            int  maxThreads    = atlasProperties.getInt(MAX_THREADS, 
maxThreadsDefault);
-            int  queueSize     = atlasProperties.getInt(QUEUE_SIZE, 
queueSizeDefault);
-            long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, 
keepAliveTimeDefault);
-
-            sync = atlasProperties.getBoolean(CONF_SYNC, false);
-            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTime, TimeUnit.MILLISECONDS,
-                                              new 
LinkedBlockingQueue<Runnable>(queueSize), 
-                                              new 
ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
-
-            ShutdownHookManager.get().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        LOG.info("==> Shutdown of Atlas HBase Hook");
-                        executor.shutdown();
-                        executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-                        executor = null;
-                    } catch (InterruptedException ie) {
-                        LOG.info("Interrupt received in shutdown.", ie);
-                    } finally {
-                        LOG.info("<== Shutdown of Atlas HBase Hook");
-                    }
-                    // shutdown client
-                }
-            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
-        } catch (Exception e) {
-            LOG.error("Caught exception initializing the Atlas HBase hook.", 
e);
-        }
-
-        LOG.info("Created Atlas Hook for HBase");
-    }
-
     public static HBaseAtlasHook getInstance() {
         HBaseAtlasHook ret = me;
 
@@ -212,11 +151,6 @@ public class HBaseAtlasHook extends AtlasHook {
         this.clusterName = clusterName;
     }
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
 
     public void createAtlasInstances(HBaseOperationContext 
hbaseOperationContext) {
         HBaseAtlasHook.OPERATION operation = 
hbaseOperationContext.getOperation();
@@ -571,104 +505,21 @@ public class HBaseAtlasHook extends AtlasHook {
         return ret;
     }
 
-    private void notifyAsPrivilegedAction(final HBaseOperationContext 
hbaseOperationContext) {
+    public void sendHBaseNameSpaceOperation(final NamespaceDescriptor 
namespaceDescriptor, final String nameSpace, final OPERATION operation) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> HBaseAtlasHook.notifyAsPrivilegedAction({})", 
hbaseOperationContext);
+            LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()");
         }
 
-        final List<HookNotificationMessage> messages = 
hbaseOperationContext.getMessages();
-
-
         try {
-            PrivilegedExceptionAction<Object> privilegedNotify = new 
PrivilegedExceptionAction<Object>() {
-                @Override
-                public Object run() {
-                    notifyEntities(messages);
-                    return hbaseOperationContext;
-                }
-            };
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
 
-            //Notify as 'hbase' service user in doAs mode
-            UserGroupInformation realUser         = 
hbaseOperationContext.getUgi().getRealUser();
-            String               numberOfMessages = 
Integer.toString(messages.size());
-            String               operation        = 
hbaseOperationContext.getOperation().toString();
-            String               user             = 
hbaseOperationContext.getUgi().getShortUserName();
-
-            if (realUser != null) {
-                LOG.info("Sending notification for event {} as service user {} 
#messages {}", operation, realUser.getShortUserName(), numberOfMessages);
-
-                realUser.doAs(privilegedNotify);
-            } else {
-                LOG.info("Sending notification for event {} as service user {} 
#messages {}", operation, user, numberOfMessages);
-
-                hbaseOperationContext.getUgi().doAs(privilegedNotify);
-            }
-        } catch (Throwable e) {
-            LOG.error("Error during notify {} ", 
hbaseOperationContext.getOperation(), e);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== HBaseAtlasHook.notifyAsPrivilegedAction()");
+            sendNotification(hbaseOperationContext);
+        } catch (Throwable t) {
+            LOG.error("HBaseAtlasHook.sendHBaseNameSpaceOperation(): failed to 
send notification", t);
         }
-    }
 
-    /**
-     * Notify atlas of the entity through message. The entity can be a
-     * complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the
-     * unique attribute on the entities.
-     *
-     * @param messages hook notification messages
-     */
-    protected void notifyEntities(List<HookNotificationMessage> messages) {
-        final int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-        notifyEntities(messages, maxRetries);
-    }
-
-    public void sendHBaseNameSpaceOperation(final NamespaceDescriptor 
namespaceDescriptor, final String nameSpace, final OPERATION operation) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()");
-        }
-        try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseNameSpaceOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed due to error ", 
e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
-                    }
-                });
-            }
-        } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.debug("<== HBaseAtlasHook.sendHBaseNameSpaceOperation()");
         }
     }
 
@@ -676,50 +527,17 @@ public class HBaseAtlasHook extends AtlasHook {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation()");
         }
+
         try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseTableOperation(){}",  hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseTableOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseTableOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseTableOperation(): Atlas hook failed due to error ", e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBasecolumnFamilyOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
-                    }
-                });
-            }
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseTableOperation(hTableDescriptor, tableName, operation);
+
+            sendNotification(hbaseOperationContext);
         } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): failed to 
send notification", t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation()");
         }
     }
 
@@ -727,54 +545,30 @@ public class HBaseAtlasHook extends AtlasHook {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation()");
         }
+
         try {
-            final UserGroupInformation ugi                   = getUGI();
-            HBaseOperationContext      hbaseOperationContext = null;
-            if (executor == null) {
-                hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
-                if (hbaseOperationContext != null) {
-                    notifyAsPrivilegedAction(hbaseOperationContext);
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}",  hbaseOperationContext);
-                }
-            } else {
-                executor.submit(new Runnable() {
-                    HBaseOperationContext hbaseOperationContext = null;
-
-                    @Override
-                    public void run() {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("==> 
HBaseAtlasHook.sendHBaseColumnFamilyOperation():executor.submit()");
-                        }
-                        if (ugi != null) {
-                            try {
-                                ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
-                                    @Override
-                                    public Object run() {
-                                        hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
-                                        return hbaseOperationContext;
-                                    }
-                                });
-                                
notifyAsPrivilegedAction(hbaseOperationContext);
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}",  hbaseOperationContext);
-                                }
-                            } catch (Throwable e) {
-                                LOG.error("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed due to error 
", e);
-                            }
-                        } else {
-                            LOG.error("<== 
HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed, 
UserGroupInformation cannot be NULL!");
-                        }
+            HBaseOperationContext hbaseOperationContext = 
handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, 
operation);
 
-                    }
-                });
-            }
+            sendNotification(hbaseOperationContext);
         } catch (Throwable t) {
-            LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): 
Submitting to thread pool failed due to error ", t);
+            LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): 
failed to send notification", t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation()");
         }
     }
 
+    private void sendNotification(HBaseOperationContext hbaseOperationContext) 
{
+        UserGroupInformation ugi = hbaseOperationContext.getUgi();
+
+        if (ugi != null && ugi.getRealUser() != null) {
+            ugi = ugi.getRealUser();
+        }
+
+        notifyEntities(hbaseOperationContext.getMessages(), ugi);
+    }
+
     private HBaseOperationContext 
handleHBaseNameSpaceOperation(NamespaceDescriptor namespaceDescriptor, String 
nameSpace, OPERATION operation) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HBaseAtlasHook.handleHBaseNameSpaceOperation()");

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index e001a06..a0a4410 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +45,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
     public static final String CONF_PREFIX                    = 
"atlas.hook.hive.";
-    public static final String HOOK_NUM_RETRIES               = CONF_PREFIX + 
"numRetries";
     public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + 
"database.name.cache.count";
     public static final String HOOK_TABLE_NAME_CACHE_COUNT    = CONF_PREFIX + 
"table.name.cache.count";
     public static final String CONF_CLUSTER_NAME              = 
"atlas.cluster.name";
@@ -73,11 +74,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     }
 
     @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
-    @Override
     public void run(HookContext hookContext) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
@@ -154,7 +150,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             }
 
             if (event != null) {
-                super.notifyEntities(event.getNotificationMessages());
+                final UserGroupInformation ugi = hookContext.getUgi() == null 
? Utils.getUGI() : hookContext.getUgi();
+
+                super.notifyEntities(event.getNotificationMessages(), ugi);
             }
         } catch (Throwable t) {
             LOG.error("HiveHook.run(): failed to process operation {}", 
hookContext.getOperationName(), t);

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 666ec13..77468ce 100644
--- 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -52,8 +52,6 @@ import java.util.Date;
 public class SqoopHook extends SqoopJobDataPublisher {
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
 
-    public static final String CONF_PREFIX          = "atlas.hook.sqoop.";
-    public static final String HOOK_NUM_RETRIES     = CONF_PREFIX + 
"numRetries";
     public static final String ATLAS_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
@@ -70,8 +68,12 @@ public class SqoopHook extends SqoopJobDataPublisher {
     public static final String INPUTS         = "inputs";
     public static final String OUTPUTS        = "outputs";
 
+    private static final AtlasHookImpl atlasHook;
+
     static {
         
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
+
+        atlasHook = new AtlasHookImpl();
     }
 
     @Override
@@ -94,7 +96,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
             HookNotificationMessage message = new 
EntityCreateRequestV2(AtlasHook.getUser(), entities);
 
-            AtlasHook.notifyEntities(Collections.singletonList(message), 
atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+            atlasHook.sendNotification(message);
         } catch(Exception e) {
             LOG.error("SqoopHook.publish() failed", e);
 
@@ -224,4 +226,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
         return name.toString();
     }
+
+    private static class AtlasHookImpl extends AtlasHook {
+        public void sendNotification(HookNotificationMessage notification) {
+            super.notifyEntities(Collections.singletonList(notification), 
null);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index c3df8fc..c47bd1b 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -56,18 +56,9 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
 
     public static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
 
-    private static final String CONF_PREFIX = "atlas.hook.storm.";
-    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    // will be used for owner if Storm topology does not contain the owner 
instance
-    // possible if Storm is running in unsecure mode.
-    public static final String ANONYMOUS_OWNER = "anonymous";
-
-    public static final String HBASE_NAMESPACE_DEFAULT = "default";
-
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
+    public  static final String ANONYMOUS_OWNER         = "anonymous"; // if 
Storm topology does not contain the owner instance; possible if Storm is 
running in unsecure mode.
+    public  static final String HBASE_NAMESPACE_DEFAULT = "default";
+    public  static final String ATTRIBUTE_DB            = "db";
 
     /**
      * This is the client-side hook that storm fires when a topology is added.
@@ -77,27 +68,28 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
      * @param stormTopology a storm topology
      */
     @Override
-    public void notify(TopologyInfo topologyInfo, Map stormConf,
-                       StormTopology stormTopology) {
-
+    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology 
stormTopology) {
         LOG.info("Collecting metadata for a new storm topology: {}", 
topologyInfo.get_name());
+
         try {
-            ArrayList<Referenceable> entities = new ArrayList<>();
-            Referenceable topologyReferenceable = 
createTopologyInstance(topologyInfo, stormConf);
-            List<Referenceable> dependentEntities = 
addTopologyDataSets(stormTopology, topologyReferenceable,
-                    topologyInfo.get_owner(), stormConf);
-            if (dependentEntities.size()>0) {
+            ArrayList<Referenceable> entities              = new ArrayList<>();
+            Referenceable            topologyReferenceable = 
createTopologyInstance(topologyInfo, stormConf);
+            List<Referenceable>      dependentEntities     = 
addTopologyDataSets(stormTopology, topologyReferenceable, 
topologyInfo.get_owner(), stormConf);
+
+            if (dependentEntities.size() > 0) {
                 entities.addAll(dependentEntities);
             }
+
             // create the graph for the topology
-            ArrayList<Referenceable> graphNodes = createTopologyGraph(
-                    stormTopology, stormTopology.get_spouts(), 
stormTopology.get_bolts());
+            ArrayList<Referenceable> graphNodes = 
createTopologyGraph(stormTopology, stormTopology.get_spouts(), 
stormTopology.get_bolts());
             // add the connection from topology to the graph
             topologyReferenceable.set("nodes", graphNodes);
+
             entities.add(topologyReferenceable);
 
             LOG.debug("notifying entities, size = {}", entities.size());
             String user = getUser(topologyInfo.get_owner(), null);
+
             notifyEntities(user, entities);
         } catch (Exception e) {
             throw new RuntimeException("Atlas hook is unable to process the 
topology.", e);
@@ -105,15 +97,18 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
     }
 
     private Referenceable createTopologyInstance(TopologyInfo topologyInfo, 
Map stormConf) {
-        Referenceable topologyReferenceable = new Referenceable(
-                StormDataTypes.STORM_TOPOLOGY.getName());
+        Referenceable topologyReferenceable = new 
Referenceable(StormDataTypes.STORM_TOPOLOGY.getName());
+
         topologyReferenceable.set("id", topologyInfo.get_id());
         topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name());
         topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
topologyInfo.get_name());
+
         String owner = topologyInfo.get_owner();
+
         if (StringUtils.isEmpty(owner)) {
             owner = ANONYMOUS_OWNER;
         }
+
         topologyReferenceable.set(AtlasClient.OWNER, owner);
         topologyReferenceable.set("startTime", new 
Date(System.currentTimeMillis()));
         topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
@@ -121,30 +116,26 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
         return topologyReferenceable;
     }
 
-    private List<Referenceable> addTopologyDataSets(StormTopology 
stormTopology,
-                                                    Referenceable 
topologyReferenceable,
-                                                    String topologyOwner,
-                                                    Map stormConf) {
+    private List<Referenceable> addTopologyDataSets(StormTopology 
stormTopology, Referenceable topologyReferenceable, String topologyOwner, Map 
stormConf) {
         List<Referenceable> dependentEntities = new ArrayList<>();
+
         // add each spout as an input data set
-        addTopologyInputs(topologyReferenceable,
-                stormTopology.get_spouts(), stormConf, topologyOwner, 
dependentEntities);
+        addTopologyInputs(topologyReferenceable, stormTopology.get_spouts(), 
stormConf, topologyOwner, dependentEntities);
+
         // add the appropriate bolts as output data sets
         addTopologyOutputs(topologyReferenceable, stormTopology, 
topologyOwner, stormConf, dependentEntities);
+
         return dependentEntities;
     }
 
-    private void addTopologyInputs(Referenceable topologyReferenceable,
-                                   Map<String, SpoutSpec> spouts,
-                                   Map stormConf,
-                                   String topologyOwner, List<Referenceable> 
dependentEntities) {
+    private void addTopologyInputs(Referenceable topologyReferenceable, 
Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, 
List<Referenceable> dependentEntities) {
         final ArrayList<Referenceable> inputDataSets = new ArrayList<>();
-        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
-            Serializable instance = Utils.javaDeserialize(
-                    entry.getValue().get_spout_object().get_serialized_java(), 
Serializable.class);
 
-            String simpleName = instance.getClass().getSimpleName();
+        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
+            Serializable        instance   = 
Utils.javaDeserialize(entry.getValue().get_spout_object().get_serialized_java(),
 Serializable.class);
+            String              simpleName = 
instance.getClass().getSimpleName();
             final Referenceable datasetRef = createDataSet(simpleName, 
topologyOwner, instance, stormConf, dependentEntities);
+
             if (datasetRef != null) {
                 inputDataSets.add(datasetRef);
             }
@@ -153,19 +144,16 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
         topologyReferenceable.set("inputs", inputDataSets);
     }
 
-    private void addTopologyOutputs(Referenceable topologyReferenceable,
-                                    StormTopology stormTopology, String 
topologyOwner,
-                                    Map stormConf, List<Referenceable> 
dependentEntities) {
-        final ArrayList<Referenceable> outputDataSets = new ArrayList<>();
+    private void addTopologyOutputs(Referenceable topologyReferenceable, 
StormTopology stormTopology, String topologyOwner, Map stormConf, 
List<Referenceable> dependentEntities) {
+        final ArrayList<Referenceable> outputDataSets    = new ArrayList<>();
+        Map<String, Bolt>              bolts             = 
stormTopology.get_bolts();
+        Set<String>                    terminalBoltNames = 
StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
 
-        Map<String, Bolt> bolts = stormTopology.get_bolts();
-        Set<String> terminalBoltNames = 
StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
         for (String terminalBoltName : terminalBoltNames) {
-            Serializable instance = 
Utils.javaDeserialize(bolts.get(terminalBoltName)
-                    .get_bolt_object().get_serialized_java(), 
Serializable.class);
+            Serializable        instance    = 
Utils.javaDeserialize(bolts.get(terminalBoltName).get_bolt_object().get_serialized_java(),
 Serializable.class);
+            String              dataSetType = 
instance.getClass().getSimpleName();
+            final Referenceable datasetRef  = createDataSet(dataSetType, 
topologyOwner, instance, stormConf, dependentEntities);
 
-            String dataSetType = instance.getClass().getSimpleName();
-            final Referenceable datasetRef = createDataSet(dataSetType, 
topologyOwner, instance, stormConf, dependentEntities);
             if (datasetRef != null) {
                 outputDataSets.add(datasetRef);
             }
@@ -255,104 +243,110 @@ public class StormAtlasHook extends AtlasHook 
implements ISubmitterHook {
 
     private String extractComponentClusterName(Configuration configuration, 
Map stormConf) {
         String clusterName = 
configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
+
         if (clusterName == null) {
             clusterName = getClusterName(stormConf);
         }
+
         return clusterName;
     }
 
 
-    private ArrayList<Referenceable> createTopologyGraph(StormTopology 
stormTopology,
-                                                         Map<String, 
SpoutSpec> spouts,
-                                                         Map<String, Bolt> 
bolts) {
+    private ArrayList<Referenceable> createTopologyGraph(StormTopology 
stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) {
         // Add graph of nodes in the topology
         final Map<String, Referenceable> nodeEntities = new HashMap<>();
+
         addSpouts(spouts, nodeEntities);
         addBolts(bolts, nodeEntities);
 
         addGraphConnections(stormTopology, nodeEntities);
 
         ArrayList<Referenceable> nodes = new ArrayList<>();
+
         nodes.addAll(nodeEntities.values());
+
         return nodes;
     }
 
-    private void addSpouts(Map<String, SpoutSpec> spouts,
-                           Map<String, Referenceable> nodeEntities) {
+    private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, 
Referenceable> nodeEntities) {
         for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
-            final String spoutName = entry.getKey();
-            Referenceable spoutReferenceable = createSpoutInstance(
-                    spoutName, entry.getValue());
+            final String  spoutName          = entry.getKey();
+            Referenceable spoutReferenceable = createSpoutInstance(spoutName, 
entry.getValue());
+
             nodeEntities.put(spoutName, spoutReferenceable);
         }
     }
 
-    private Referenceable createSpoutInstance(String spoutName,
-                                              SpoutSpec stormSpout) {
+    private Referenceable createSpoutInstance(String spoutName, SpoutSpec 
stormSpout) {
         Referenceable spoutReferenceable = new 
Referenceable(StormDataTypes.STORM_SPOUT.getName());
+
         spoutReferenceable.set(AtlasClient.NAME, spoutName);
 
-        Serializable instance = Utils.javaDeserialize(
-                stormSpout.get_spout_object().get_serialized_java(), 
Serializable.class);
+        Serializable instance = 
Utils.javaDeserialize(stormSpout.get_spout_object().get_serialized_java(), 
Serializable.class);
+
         spoutReferenceable.set("driverClass", instance.getClass().getName());
 
         Map<String, String> flatConfigMap = 
StormTopologyUtil.getFieldValues(instance, true, null);
+
         spoutReferenceable.set("conf", flatConfigMap);
 
         return spoutReferenceable;
     }
 
-    private void addBolts(Map<String, Bolt> bolts,
-                          Map<String, Referenceable> nodeEntities) {
+    private void addBolts(Map<String, Bolt> bolts, Map<String, Referenceable> 
nodeEntities) {
         for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
             Referenceable boltInstance = createBoltInstance(entry.getKey(), 
entry.getValue());
+
             nodeEntities.put(entry.getKey(), boltInstance);
         }
     }
 
-    private Referenceable createBoltInstance(String boltName,
-                                             Bolt stormBolt) {
+    private Referenceable createBoltInstance(String boltName, Bolt stormBolt) {
         Referenceable boltReferenceable = new 
Referenceable(StormDataTypes.STORM_BOLT.getName());
 
         boltReferenceable.set(AtlasClient.NAME, boltName);
 
-        Serializable instance = Utils.javaDeserialize(
-                stormBolt.get_bolt_object().get_serialized_java(), 
Serializable.class);
+        Serializable instance = 
Utils.javaDeserialize(stormBolt.get_bolt_object().get_serialized_java(), 
Serializable.class);
+
         boltReferenceable.set("driverClass", instance.getClass().getName());
 
         Map<String, String> flatConfigMap = 
StormTopologyUtil.getFieldValues(instance, true, null);
+
         boltReferenceable.set("conf", flatConfigMap);
 
         return boltReferenceable;
     }
 
-    private void addGraphConnections(StormTopology stormTopology,
-                                     Map<String, Referenceable> nodeEntities) {
+    private void addGraphConnections(StormTopology stormTopology, Map<String, 
Referenceable> nodeEntities) {
         // adds connections between spouts and bolts
-        Map<String, Set<String>> adjacencyMap =
-                StormTopologyUtil.getAdjacencyMap(stormTopology, true);
+        Map<String, Set<String>> adjacencyMap = 
StormTopologyUtil.getAdjacencyMap(stormTopology, true);
 
         for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
-            String nodeName = entry.getKey();
+            String      nodeName      = entry.getKey();
             Set<String> adjacencyList = adjacencyMap.get(nodeName);
+
             if (adjacencyList == null || adjacencyList.isEmpty()) {
                 continue;
             }
 
             // add outgoing links
-            Referenceable node = nodeEntities.get(nodeName);
+            Referenceable     node    = nodeEntities.get(nodeName);
             ArrayList<String> outputs = new ArrayList<>(adjacencyList.size());
+
             outputs.addAll(adjacencyList);
             node.set("outputs", outputs);
 
             // add incoming links
             for (String adjacentNodeName : adjacencyList) {
                 Referenceable adjacentNode = 
nodeEntities.get(adjacentNodeName);
+
                 @SuppressWarnings("unchecked")
                 ArrayList<String> inputs = (ArrayList<String>) 
adjacentNode.get("inputs");
+
                 if (inputs == null) {
                     inputs = new ArrayList<>();
                 }
+
                 inputs.add(nodeName);
                 adjacentNode.set("inputs", inputs);
             }

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 38f3208..8d24035 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -19,11 +19,14 @@
 package org.apache.atlas.hook;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.hook.HookNotification;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.security.InMemoryJAASConfiguration;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
@@ -31,36 +34,48 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 
 /**
  * A base class for atlas hooks.
  */
 public abstract class AtlasHook {
-
     private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
 
-    protected static Configuration atlasProperties;
-
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS                 
   = "atlas.notification.hook.asynchronous";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS     
   = "atlas.notification.hook.asynchronous.minThreads";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS     
   = "atlas.notification.hook.asynchronous.maxThreads";
+    public static final String 
ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS = 
"atlas.notification.hook.asynchronous.keepAliveTimeMs";
+    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE      
   = "atlas.notification.hook.asynchronous.queueSize";
+    public static final String ATLAS_NOTIFICATION_MAX_RETRIES                  
   = "atlas.notification.hook.retry.maxRetries";
+    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL               
   = "atlas.notification.hook.retry.interval";
+    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY 
   = "atlas.notification.failed.messages.filename";
+    public static final String 
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = 
"atlas.notification.log.failed.messages";
+    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME     
   = "atlas_hook_failed_messages.log";
+
+    protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
 
-    private static boolean logFailedMessages;
-    private static FailedMessagesLogger failedMessagesLogger;
-    private static int notificationRetryInterval;
-    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = 
"atlas.notification.hook.retry.interval";
+    private static final int                  SHUTDOWN_HOOK_WAIT_TIME_MS = 
3000;
+    private static final boolean              logFailedMessages;
+    private static final FailedMessagesLogger failedMessagesLogger;
+    private static final int                  notificationMaxRetries;
+    private static final int                  notificationRetryInterval;
+    private static       ExecutorService      executor = null;
 
-    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY 
=
-            "atlas.notification.failed.messages.filename";
-    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = 
"atlas_hook_failed_messages.log";
-    public static final String 
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
-            "atlas.notification.log.failed.messages";
 
     static {
         try {
@@ -69,12 +84,15 @@ public abstract class AtlasHook {
             LOG.info("Failed to load application properties", e);
         }
 
-        String failedMessageFile = 
atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY,
-                ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+        String failedMessageFile = 
atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, 
ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+
         logFailedMessages = 
atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, 
true);
+
         if (logFailedMessages) {
             failedMessagesLogger = new FailedMessagesLogger(failedMessageFile);
             failedMessagesLogger.init();
+        } else {
+            failedMessagesLogger = null;
         }
 
         if (!isLoginKeytabBased()) {
@@ -83,8 +101,9 @@ public abstract class AtlasHook {
             }
         }
 
+        notificationMaxRetries    = 
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
         notificationRetryInterval = 
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
-        notificationInterface = NotificationProvider.get();
+        notificationInterface     = NotificationProvider.get();
 
         String currentUser = "";
 
@@ -96,11 +115,39 @@ public abstract class AtlasHook {
 
         notificationInterface.setCurrentUser(currentUser);
 
+        boolean isAsync = 
atlasProperties.getBoolean(ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE);
+
+        if (isAsync) {
+            int  minThreads      = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1);
+            int  maxThreads      = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 5);
+            long keepAliveTimeMs = 
atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 
10000);
+            int  queueSize       = 
atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 10000);
+
+            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
keepAliveTimeMs, TimeUnit.MILLISECONDS,
+                                              new 
LinkedBlockingDeque<>(queueSize),
+                                              new 
ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").build());
+
+            ShutdownHookManager.get().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        LOG.info("==> Shutdown of Atlas Hook");
+
+                        executor.shutdown();
+                        executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, 
TimeUnit.MILLISECONDS);
+                        executor = null;
+                    } catch (InterruptedException excp) {
+                        LOG.info("Interrupt received in shutdown.", excp);
+                    } finally {
+                        LOG.info("<== Shutdown of Atlas Hook");
+                    }
+                }
+            }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
+        }
+
         LOG.info("Created Atlas Hook");
     }
 
-    protected abstract String getNumberOfRetriesPropertyKey();
-
     protected void notifyEntities(String user, Collection<Referenceable> 
entities) {
         JSONArray entitiesArray = new JSONArray();
 
@@ -110,75 +157,87 @@ public abstract class AtlasHook {
             entitiesArray.put(entityJson);
         }
 
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        List<HookNotificationMessage> hookNotificationMessages = new 
ArrayList<>();
         hookNotificationMessages.add(new 
HookNotification.EntityCreateRequest(user, entitiesArray));
-        notifyEntities(hookNotificationMessages);
+        notifyEntities(hookNotificationMessages, null, notificationMaxRetries);
     }
 
-    /**
-     * Notify atlas of the entity through message. The entity can be a
-     * complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the
-     * unique attribute on the entities.
-     *
-     * @param messages   hook notification messages
-     * @param maxRetries maximum number of retries while sending message to 
messaging system
-     */
-    public static void 
notifyEntities(List<HookNotification.HookNotificationMessage> messages, int 
maxRetries) {
-        notifyEntitiesInternal(messages, maxRetries, notificationInterface, 
logFailedMessages, failedMessagesLogger);
+    public void notifyEntities(List<HookNotificationMessage> messages, 
UserGroupInformation ugi) {
+        notifyEntities(messages, ugi, notificationMaxRetries);
+    }
+
+    public void notifyEntities(List<HookNotificationMessage> messages, 
UserGroupInformation ugi, int maxRetries) {
+        if (executor == null) { // send synchronously
+            notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger);
+        } else {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger);
+                }
+            });
+        }
     }
 
     @VisibleForTesting
-    static void 
notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, 
int maxRetries,
+    static void notifyEntitiesInternal(List<HookNotificationMessage> messages, 
int maxRetries, UserGroupInformation ugi,
                                        NotificationInterface 
notificationInterface,
                                        boolean shouldLogFailedMessages, 
FailedMessagesLogger logger) {
         if (messages == null || messages.isEmpty()) {
             return;
         }
 
-        final String message = messages.toString();
-        int numRetries = 0;
-        while (true) {
-            try {
-                
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
-                return;
-            } catch (Exception e) {
-                numRetries++;
-                if (numRetries < maxRetries) {
-                    LOG.error("Failed to send notification - attempt #{}; 
error={}", numRetries, e.getMessage());
-                    try {
-                        LOG.debug("Sleeping for {} ms before retry", 
notificationRetryInterval);
-                        Thread.sleep(notificationRetryInterval);
-                    } catch (InterruptedException ie) {
-                        LOG.error("Notification hook thread sleep 
interrupted");
-                    }
+        final int    maxAttempts         = maxRetries < 1 ? 1 : maxRetries;
+        final String message             = messages.toString();
+        Exception    notificationFailure = null;
+
+        for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {
+            if (numAttempt > 1) { // retry attempt
+                try {
+                    LOG.debug("Sleeping for {} ms before retry", 
notificationRetryInterval);
 
+                    Thread.sleep(notificationRetryInterval);
+                } catch (InterruptedException ie) {
+                    LOG.error("Notification hook thread sleep interrupted");
+
+                    break;
+                }
+            }
+
+            try {
+                if (ugi == null) {
+                    
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
                 } else {
-                    if (shouldLogFailedMessages && e instanceof 
NotificationException) {
-                        List<String> failedMessages = ((NotificationException) 
e).getFailedMessages();
-                        for (String msg : failedMessages) {
-                            logger.log(msg);
+                    PrivilegedExceptionAction<Object> privilegedNotify = new 
PrivilegedExceptionAction<Object>() {
+                        @Override
+                        public Object run() throws Exception {
+                            
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
+                            return messages;
                         }
-                    }
-                    LOG.error("Failed to notify atlas for entity {} after {} 
retries. Quitting",
-                            message, maxRetries, e);
-                    return;
+                    };
+
+                    ugi.doAs(privilegedNotify);
                 }
+
+                notificationFailure = null; // notification sent successfully, 
reset error
+
+                break;
+            } catch (Exception e) {
+                notificationFailure = e;
+
+                LOG.error("Failed to send notification - attempt #{}; 
error={}", numAttempt, e.getMessage());
             }
         }
-    }
 
-    /**
-     * Notify atlas of the entity through message. The entity can be a
-     * complex entity with reference to other entities.
-     * De-duping of entities is done on server side depending on the
-     * unique attribute on the entities.
-     *
-     * @param messages hook notification messages
-     */
-    protected void 
notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
-        final int maxRetries = 
atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
-        notifyEntities(messages, maxRetries);
+        if (shouldLogFailedMessages && notificationFailure instanceof 
NotificationException) {
+            final List<String> failedMessages = ((NotificationException) 
notificationFailure).getFailedMessages();
+
+            for (String msg : failedMessages) {
+                logger.log(msg);
+            }
+
+            LOG.error("Giving up after {} failed attempts to send notification 
to Atlas: {}", maxAttempts, message, notificationFailure);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index d59cb1c..39ca686 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -54,7 +54,7 @@ public class AtlasHookTest {
         List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, null, 
notificationInterface, false,
                 failedMessagesLogger);
         // if we've reached here, the method finished OK.
     }
@@ -68,7 +68,7 @@ public class AtlasHookTest {
             };
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verify(notificationInterface, times(2)).
@@ -85,7 +85,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message");
@@ -97,7 +97,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);
@@ -113,7 +113,7 @@ public class AtlasHookTest {
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message1", "test message2")))
                 .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message1");
@@ -125,7 +125,7 @@ public class AtlasHookTest {
         List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
         doThrow(new RuntimeException("test 
message")).when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);

Reply via email to