Repository: atlas Updated Branches: refs/heads/branch-0.8 9da91296f -> 9bdbb3184
ATLAS-2823: updated hooks to support asynchronous notifications (cherry picked from commit 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9bdbb318 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9bdbb318 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9bdbb318 Branch: refs/heads/branch-0.8 Commit: 9bdbb31842c93412ebfe27d98bd8dcbabe7d80f4 Parents: 9da9129 Author: Madhan Neethiraj <mad...@apache.org> Authored: Tue Aug 14 16:18:19 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Thu Aug 16 18:27:39 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/falcon/hook/FalconHook.java | 85 +----- .../atlas/hbase/bridge/HBaseAtlasHook.java | 274 +++---------------- .../org/apache/atlas/hive/hook/HiveHook.java | 12 +- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 138 +++++----- .../java/org/apache/atlas/hook/AtlasHook.java | 193 ++++++++----- .../org/apache/atlas/hook/AtlasHookTest.java | 12 +- 7 files changed, 250 insertions(+), 478 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 188b754..6876595 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -18,30 +18,22 @@ package org.apache.atlas.falcon.hook; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.atlas.AtlasConstants; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Falcon hook sends lineage information to the Atlas Service. @@ -49,27 +41,6 @@ import java.util.concurrent.TimeUnit; public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); - public static final String CONF_PREFIX = "atlas.hook.falcon."; - private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; - private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; - private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; - public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; - public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; - - public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - - // wait time determines how long we wait before we exit the jvm on - // shutdown. Pending requests after that will not be sent. - private static final int WAIT_TIME = 3; - private static ExecutorService executor; - - private static final int minThreadsDefault = 5; - private static final int maxThreadsDefault = 5; - private static final long keepAliveTimeDefault = 10; - private static final int queueSizeDefault = 10000; - - private static boolean sync; - private static ConfigurationStore STORE; private enum Operation { @@ -79,45 +50,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { static { try { - // initialize the async facility to process hook calls. We don't - // want to do this inline since it adds plenty of overhead for the query. - int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); - int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); - long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); - int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault); - sync = atlasProperties.getBoolean(CONF_SYNC, false); - - executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(queueSize), - new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); - - ShutdownHookManager.get().addShutdownHook(new Thread() { - @Override - public void run() { - try { - LOG.info("==> Shutdown of Atlas Falcon Hook"); - - executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); - executor = null; - } catch (InterruptedException ie) { - LOG.info("Interrupt received in shutdown."); - } finally { - LOG.info("<== Shutdown of Atlas Falcon Hook"); - } - // shutdown client - } - }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY); - STORE = ConfigurationStore.get(); - - notificationInterface = NotificationProvider.get(); - } catch (Exception e) { LOG.error("Caught exception initializing the falcon hook.", e); } - LOG.info("Created Atlas Hook for Falcon"); } @@ -125,30 +62,12 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { public void publish(final Data data) { final FalconEvent event = data.getEvent(); try { - if (sync) { - fireAndForget(event); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - try { - fireAndForget(event); - } catch (Throwable e) { - LOG.info("Atlas hook failed", e); - } - } - }); - } + fireAndForget(event); } catch (Throwable t) { LOG.warn("Error in processing data {}", data, t); } } - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } - private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); @@ -162,7 +81,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { break; } - notifyEntities(messages); + notifyEntities(messages, null); } private List<Referenceable> createEntities(FalconEvent event, String user) throws FalconException, URISyntaxException { http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java index 314ff68..d4758a2 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java @@ -18,7 +18,6 @@ package org.apache.atlas.hbase.bridge; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.AtlasConstants; import org.apache.atlas.hbase.model.HBaseOperationContext; import org.apache.atlas.hbase.model.HBaseDataTypes; @@ -29,7 +28,6 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2; import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequestV2; import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2; -import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; @@ -40,42 +38,20 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; // This will register Hbase entities into Atlas public class HBaseAtlasHook extends AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class); - public static final String CONF_PREFIX = "atlas.hook.hbase."; - public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; - public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; - private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; - private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; - private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; - - private static final int minThreadsDefault = 5; - private static final int maxThreadsDefault = 5; - private static final int queueSizeDefault = 10000; - private static final long keepAliveTimeDefault = 10; - // wait time determines how long we wait before we exit the jvm on shutdown. Pending requests after that will not be sent. - private static final int WAIT_TIME = 3; - private static boolean sync; - private static ExecutorService executor; public static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; @@ -147,43 +123,6 @@ public class HBaseAtlasHook extends AtlasHook { } } - static { - try { - // initialize the async facility to process hook calls. We don't - // want to do this inline since it adds plenty of overhead for the query. - int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); - int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); - int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault); - long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); - - sync = atlasProperties.getBoolean(CONF_SYNC, false); - executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(queueSize), - new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); - - ShutdownHookManager.get().addShutdownHook(new Thread() { - @Override - public void run() { - try { - LOG.info("==> Shutdown of Atlas HBase Hook"); - executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); - executor = null; - } catch (InterruptedException ie) { - LOG.info("Interrupt received in shutdown.", ie); - } finally { - LOG.info("<== Shutdown of Atlas HBase Hook"); - } - // shutdown client - } - }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY); - } catch (Exception e) { - LOG.error("Caught exception initializing the Atlas HBase hook.", e); - } - - LOG.info("Created Atlas Hook for HBase"); - } - public static HBaseAtlasHook getInstance() { HBaseAtlasHook ret = me; @@ -212,11 +151,6 @@ public class HBaseAtlasHook extends AtlasHook { this.clusterName = clusterName; } - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } - public void createAtlasInstances(HBaseOperationContext hbaseOperationContext) { HBaseAtlasHook.OPERATION operation = hbaseOperationContext.getOperation(); @@ -571,104 +505,21 @@ public class HBaseAtlasHook extends AtlasHook { return ret; } - private void notifyAsPrivilegedAction(final HBaseOperationContext hbaseOperationContext) { + public void sendHBaseNameSpaceOperation(final NamespaceDescriptor namespaceDescriptor, final String nameSpace, final OPERATION operation) { if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseAtlasHook.notifyAsPrivilegedAction({})", hbaseOperationContext); + LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()"); } - final List<HookNotificationMessage> messages = hbaseOperationContext.getMessages(); - - try { - PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() { - @Override - public Object run() { - notifyEntities(messages); - return hbaseOperationContext; - } - }; + HBaseOperationContext hbaseOperationContext = handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation); - //Notify as 'hbase' service user in doAs mode - UserGroupInformation realUser = hbaseOperationContext.getUgi().getRealUser(); - String numberOfMessages = Integer.toString(messages.size()); - String operation = hbaseOperationContext.getOperation().toString(); - String user = hbaseOperationContext.getUgi().getShortUserName(); - - if (realUser != null) { - LOG.info("Sending notification for event {} as service user {} #messages {}", operation, realUser.getShortUserName(), numberOfMessages); - - realUser.doAs(privilegedNotify); - } else { - LOG.info("Sending notification for event {} as service user {} #messages {}", operation, user, numberOfMessages); - - hbaseOperationContext.getUgi().doAs(privilegedNotify); - } - } catch (Throwable e) { - LOG.error("Error during notify {} ", hbaseOperationContext.getOperation(), e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.notifyAsPrivilegedAction()"); + sendNotification(hbaseOperationContext); + } catch (Throwable t) { + LOG.error("HBaseAtlasHook.sendHBaseNameSpaceOperation(): failed to send notification", t); } - } - /** - * Notify atlas of the entity through message. The entity can be a - * complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the - * unique attribute on the entities. - * - * @param messages hook notification messages - */ - protected void notifyEntities(List<HookNotificationMessage> messages) { - final int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - notifyEntities(messages, maxRetries); - } - - public void sendHBaseNameSpaceOperation(final NamespaceDescriptor namespaceDescriptor, final String nameSpace, final OPERATION operation) { if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()"); - } - try { - final UserGroupInformation ugi = getUGI(); - HBaseOperationContext hbaseOperationContext = null; - if (executor == null) { - hbaseOperationContext = handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation); - if (hbaseOperationContext != null) { - notifyAsPrivilegedAction(hbaseOperationContext); - } - } else { - executor.submit(new Runnable() { - HBaseOperationContext hbaseOperationContext = null; - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation():executor.submit()"); - } - if (ugi != null) { - try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { - @Override - public Object run() { - hbaseOperationContext = handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation); - return hbaseOperationContext; - } - }); - notifyAsPrivilegedAction(hbaseOperationContext); - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(){}", hbaseOperationContext); - } - } catch (Throwable e) { - LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed due to error ", e); - } - } else { - LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!"); - } - } - }); - } - } catch (Throwable t) { - LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Submitting to thread pool failed due to error ", t); + LOG.debug("<== HBaseAtlasHook.sendHBaseNameSpaceOperation()"); } } @@ -676,50 +527,17 @@ public class HBaseAtlasHook extends AtlasHook { if (LOG.isDebugEnabled()) { LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation()"); } + try { - final UserGroupInformation ugi = getUGI(); - HBaseOperationContext hbaseOperationContext = null; - if (executor == null) { - hbaseOperationContext = handleHBaseTableOperation(hTableDescriptor, tableName, operation); - if (hbaseOperationContext != null) { - notifyAsPrivilegedAction(hbaseOperationContext); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation(){}", hbaseOperationContext); - } - } else { - executor.submit(new Runnable() { - HBaseOperationContext hbaseOperationContext = null; - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation():executor.submit()"); - } - if (ugi != null) { - try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { - @Override - public Object run() { - hbaseOperationContext = handleHBaseTableOperation(hTableDescriptor, tableName, operation); - return hbaseOperationContext; - } - }); - notifyAsPrivilegedAction(hbaseOperationContext); - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation(){}", hbaseOperationContext); - } - } catch (Throwable e) { - LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): Atlas hook failed due to error ", e); - } - } else { - LOG.error("<== HBaseAtlasHook.sendHBasecolumnFamilyOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!"); - } - } - }); - } + HBaseOperationContext hbaseOperationContext = handleHBaseTableOperation(hTableDescriptor, tableName, operation); + + sendNotification(hbaseOperationContext); } catch (Throwable t) { - LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): Submitting to thread pool failed due to error ", t); + LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): failed to send notification", t); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation()"); } } @@ -727,54 +545,30 @@ public class HBaseAtlasHook extends AtlasHook { if (LOG.isDebugEnabled()) { LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation()"); } + try { - final UserGroupInformation ugi = getUGI(); - HBaseOperationContext hbaseOperationContext = null; - if (executor == null) { - hbaseOperationContext = handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, operation); - if (hbaseOperationContext != null) { - notifyAsPrivilegedAction(hbaseOperationContext); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}", hbaseOperationContext); - } - } else { - executor.submit(new Runnable() { - HBaseOperationContext hbaseOperationContext = null; - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation():executor.submit()"); - } - if (ugi != null) { - try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { - @Override - public Object run() { - hbaseOperationContext = handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, operation); - return hbaseOperationContext; - } - }); - notifyAsPrivilegedAction(hbaseOperationContext); - if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}", hbaseOperationContext); - } - } catch (Throwable e) { - LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed due to error ", e); - } - } else { - LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!"); - } + HBaseOperationContext hbaseOperationContext = handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, operation); - } - }); - } + sendNotification(hbaseOperationContext); } catch (Throwable t) { - LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Submitting to thread pool failed due to error ", t); + LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): failed to send notification", t); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation()"); } } + private void sendNotification(HBaseOperationContext hbaseOperationContext) { + UserGroupInformation ugi = hbaseOperationContext.getUgi(); + + if (ugi != null && ugi.getRealUser() != null) { + ugi = ugi.getRealUser(); + } + + notifyEntities(hbaseOperationContext.getMessages(), ugi); + } + private HBaseOperationContext handleHBaseNameSpaceOperation(NamespaceDescriptor namespaceDescriptor, String nameSpace, OPERATION operation) { if (LOG.isDebugEnabled()) { LOG.debug("==> HBaseAtlasHook.handleHBaseNameSpaceOperation()"); http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index e001a06..a0a4410 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -26,6 +26,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +45,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); public static final String CONF_PREFIX = "atlas.hook.hive."; - public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count"; public static final String HOOK_TABLE_NAME_CACHE_COUNT = CONF_PREFIX + "table.name.cache.count"; public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; @@ -73,11 +74,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } - - @Override public void run(HookContext hookContext) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); @@ -154,7 +150,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } if (event != null) { - super.notifyEntities(event.getNotificationMessages()); + final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi(); + + super.notifyEntities(event.getNotificationMessages(), ugi); } } catch (Throwable t) { LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t); http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 666ec13..77468ce 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -52,8 +52,6 @@ import java.util.Date; public class SqoopHook extends SqoopJobDataPublisher { private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); - public static final String CONF_PREFIX = "atlas.hook.sqoop."; - public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; @@ -70,8 +68,12 @@ public class SqoopHook extends SqoopJobDataPublisher { public static final String INPUTS = "inputs"; public static final String OUTPUTS = "outputs"; + private static final AtlasHookImpl atlasHook; + static { org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); + + atlasHook = new AtlasHookImpl(); } @Override @@ -94,7 +96,7 @@ public class SqoopHook extends SqoopJobDataPublisher { HookNotificationMessage message = new EntityCreateRequestV2(AtlasHook.getUser(), entities); - AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3)); + atlasHook.sendNotification(message); } catch(Exception e) { LOG.error("SqoopHook.publish() failed", e); @@ -224,4 +226,10 @@ public class SqoopHook extends SqoopJobDataPublisher { return name.toString(); } + + private static class AtlasHookImpl extends AtlasHook { + public void sendNotification(HookNotificationMessage notification) { + super.notifyEntities(Collections.singletonList(notification), null); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index c3df8fc..c47bd1b 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -56,18 +56,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class); - private static final String CONF_PREFIX = "atlas.hook.storm."; - private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - // will be used for owner if Storm topology does not contain the owner instance - // possible if Storm is running in unsecure mode. - public static final String ANONYMOUS_OWNER = "anonymous"; - - public static final String HBASE_NAMESPACE_DEFAULT = "default"; - - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } + public static final String ANONYMOUS_OWNER = "anonymous"; // if Storm topology does not contain the owner instance; possible if Storm is running in unsecure mode. + public static final String HBASE_NAMESPACE_DEFAULT = "default"; + public static final String ATTRIBUTE_DB = "db"; /** * This is the client-side hook that storm fires when a topology is added. @@ -77,27 +68,28 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { * @param stormTopology a storm topology */ @Override - public void notify(TopologyInfo topologyInfo, Map stormConf, - StormTopology stormTopology) { - + public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) { LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name()); + try { - ArrayList<Referenceable> entities = new ArrayList<>(); - Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf); - List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable, - topologyInfo.get_owner(), stormConf); - if (dependentEntities.size()>0) { + ArrayList<Referenceable> entities = new ArrayList<>(); + Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf); + List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable, topologyInfo.get_owner(), stormConf); + + if (dependentEntities.size() > 0) { entities.addAll(dependentEntities); } + // create the graph for the topology - ArrayList<Referenceable> graphNodes = createTopologyGraph( - stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts()); + ArrayList<Referenceable> graphNodes = createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts()); // add the connection from topology to the graph topologyReferenceable.set("nodes", graphNodes); + entities.add(topologyReferenceable); LOG.debug("notifying entities, size = {}", entities.size()); String user = getUser(topologyInfo.get_owner(), null); + notifyEntities(user, entities); } catch (Exception e) { throw new RuntimeException("Atlas hook is unable to process the topology.", e); @@ -105,15 +97,18 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) { - Referenceable topologyReferenceable = new Referenceable( - StormDataTypes.STORM_TOPOLOGY.getName()); + Referenceable topologyReferenceable = new Referenceable(StormDataTypes.STORM_TOPOLOGY.getName()); + topologyReferenceable.set("id", topologyInfo.get_id()); topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name()); topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); + String owner = topologyInfo.get_owner(); + if (StringUtils.isEmpty(owner)) { owner = ANONYMOUS_OWNER; } + topologyReferenceable.set(AtlasClient.OWNER, owner); topologyReferenceable.set("startTime", new Date(System.currentTimeMillis())); topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); @@ -121,30 +116,26 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { return topologyReferenceable; } - private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, - Referenceable topologyReferenceable, - String topologyOwner, - Map stormConf) { + private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, Referenceable topologyReferenceable, String topologyOwner, Map stormConf) { List<Referenceable> dependentEntities = new ArrayList<>(); + // add each spout as an input data set - addTopologyInputs(topologyReferenceable, - stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities); + addTopologyInputs(topologyReferenceable, stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities); + // add the appropriate bolts as output data sets addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities); + return dependentEntities; } - private void addTopologyInputs(Referenceable topologyReferenceable, - Map<String, SpoutSpec> spouts, - Map stormConf, - String topologyOwner, List<Referenceable> dependentEntities) { + private void addTopologyInputs(Referenceable topologyReferenceable, Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, List<Referenceable> dependentEntities) { final ArrayList<Referenceable> inputDataSets = new ArrayList<>(); - for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { - Serializable instance = Utils.javaDeserialize( - entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); - String simpleName = instance.getClass().getSimpleName(); + for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { + Serializable instance = Utils.javaDeserialize(entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); + String simpleName = instance.getClass().getSimpleName(); final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities); + if (datasetRef != null) { inputDataSets.add(datasetRef); } @@ -153,19 +144,16 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { topologyReferenceable.set("inputs", inputDataSets); } - private void addTopologyOutputs(Referenceable topologyReferenceable, - StormTopology stormTopology, String topologyOwner, - Map stormConf, List<Referenceable> dependentEntities) { - final ArrayList<Referenceable> outputDataSets = new ArrayList<>(); + private void addTopologyOutputs(Referenceable topologyReferenceable, StormTopology stormTopology, String topologyOwner, Map stormConf, List<Referenceable> dependentEntities) { + final ArrayList<Referenceable> outputDataSets = new ArrayList<>(); + Map<String, Bolt> bolts = stormTopology.get_bolts(); + Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); - Map<String, Bolt> bolts = stormTopology.get_bolts(); - Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); for (String terminalBoltName : terminalBoltNames) { - Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName) - .get_bolt_object().get_serialized_java(), Serializable.class); + Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName).get_bolt_object().get_serialized_java(), Serializable.class); + String dataSetType = instance.getClass().getSimpleName(); + final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities); - String dataSetType = instance.getClass().getSimpleName(); - final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities); if (datasetRef != null) { outputDataSets.add(datasetRef); } @@ -255,104 +243,110 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { private String extractComponentClusterName(Configuration configuration, Map stormConf) { String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); + if (clusterName == null) { clusterName = getClusterName(stormConf); } + return clusterName; } - private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, - Map<String, SpoutSpec> spouts, - Map<String, Bolt> bolts) { + private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) { // Add graph of nodes in the topology final Map<String, Referenceable> nodeEntities = new HashMap<>(); + addSpouts(spouts, nodeEntities); addBolts(bolts, nodeEntities); addGraphConnections(stormTopology, nodeEntities); ArrayList<Referenceable> nodes = new ArrayList<>(); + nodes.addAll(nodeEntities.values()); + return nodes; } - private void addSpouts(Map<String, SpoutSpec> spouts, - Map<String, Referenceable> nodeEntities) { + private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, Referenceable> nodeEntities) { for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { - final String spoutName = entry.getKey(); - Referenceable spoutReferenceable = createSpoutInstance( - spoutName, entry.getValue()); + final String spoutName = entry.getKey(); + Referenceable spoutReferenceable = createSpoutInstance(spoutName, entry.getValue()); + nodeEntities.put(spoutName, spoutReferenceable); } } - private Referenceable createSpoutInstance(String spoutName, - SpoutSpec stormSpout) { + private Referenceable createSpoutInstance(String spoutName, SpoutSpec stormSpout) { Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName()); + spoutReferenceable.set(AtlasClient.NAME, spoutName); - Serializable instance = Utils.javaDeserialize( - stormSpout.get_spout_object().get_serialized_java(), Serializable.class); + Serializable instance = Utils.javaDeserialize(stormSpout.get_spout_object().get_serialized_java(), Serializable.class); + spoutReferenceable.set("driverClass", instance.getClass().getName()); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); + spoutReferenceable.set("conf", flatConfigMap); return spoutReferenceable; } - private void addBolts(Map<String, Bolt> bolts, - Map<String, Referenceable> nodeEntities) { + private void addBolts(Map<String, Bolt> bolts, Map<String, Referenceable> nodeEntities) { for (Map.Entry<String, Bolt> entry : bolts.entrySet()) { Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue()); + nodeEntities.put(entry.getKey(), boltInstance); } } - private Referenceable createBoltInstance(String boltName, - Bolt stormBolt) { + private Referenceable createBoltInstance(String boltName, Bolt stormBolt) { Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName()); boltReferenceable.set(AtlasClient.NAME, boltName); - Serializable instance = Utils.javaDeserialize( - stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); + Serializable instance = Utils.javaDeserialize(stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); + boltReferenceable.set("driverClass", instance.getClass().getName()); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); + boltReferenceable.set("conf", flatConfigMap); return boltReferenceable; } - private void addGraphConnections(StormTopology stormTopology, - Map<String, Referenceable> nodeEntities) { + private void addGraphConnections(StormTopology stormTopology, Map<String, Referenceable> nodeEntities) { // adds connections between spouts and bolts - Map<String, Set<String>> adjacencyMap = - StormTopologyUtil.getAdjacencyMap(stormTopology, true); + Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true); for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) { - String nodeName = entry.getKey(); + String nodeName = entry.getKey(); Set<String> adjacencyList = adjacencyMap.get(nodeName); + if (adjacencyList == null || adjacencyList.isEmpty()) { continue; } // add outgoing links - Referenceable node = nodeEntities.get(nodeName); + Referenceable node = nodeEntities.get(nodeName); ArrayList<String> outputs = new ArrayList<>(adjacencyList.size()); + outputs.addAll(adjacencyList); node.set("outputs", outputs); // add incoming links for (String adjacentNodeName : adjacencyList) { Referenceable adjacentNode = nodeEntities.get(adjacentNodeName); + @SuppressWarnings("unchecked") ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs"); + if (inputs == null) { inputs = new ArrayList<>(); } + inputs.add(nodeName); adjacentNode.set("inputs", inputs); } http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 38f3208..8d24035 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -19,11 +19,14 @@ package org.apache.atlas.hook; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.security.InMemoryJAASConfiguration; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.json.InstanceSerialization; @@ -31,36 +34,48 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.codehaus.jettison.json.JSONArray; +import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * A base class for atlas hooks. */ public abstract class AtlasHook { - private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class); - protected static Configuration atlasProperties; - + public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS = "atlas.notification.hook.asynchronous"; + public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS = "atlas.notification.hook.asynchronous.minThreads"; + public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS = "atlas.notification.hook.asynchronous.maxThreads"; + public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS = "atlas.notification.hook.asynchronous.keepAliveTimeMs"; + public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE = "atlas.notification.hook.asynchronous.queueSize"; + public static final String ATLAS_NOTIFICATION_MAX_RETRIES = "atlas.notification.hook.retry.maxRetries"; + public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval"; + public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename"; + public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages"; + public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log"; + + protected static Configuration atlasProperties; protected static NotificationInterface notificationInterface; - private static boolean logFailedMessages; - private static FailedMessagesLogger failedMessagesLogger; - private static int notificationRetryInterval; - public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval"; + private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000; + private static final boolean logFailedMessages; + private static final FailedMessagesLogger failedMessagesLogger; + private static final int notificationMaxRetries; + private static final int notificationRetryInterval; + private static ExecutorService executor = null; - public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = - "atlas.notification.failed.messages.filename"; - public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log"; - public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = - "atlas.notification.log.failed.messages"; static { try { @@ -69,12 +84,15 @@ public abstract class AtlasHook { LOG.info("Failed to load application properties", e); } - String failedMessageFile = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, - ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME); + String failedMessageFile = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME); + logFailedMessages = atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, true); + if (logFailedMessages) { failedMessagesLogger = new FailedMessagesLogger(failedMessageFile); failedMessagesLogger.init(); + } else { + failedMessagesLogger = null; } if (!isLoginKeytabBased()) { @@ -83,8 +101,9 @@ public abstract class AtlasHook { } } + notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3); notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); - notificationInterface = NotificationProvider.get(); + notificationInterface = NotificationProvider.get(); String currentUser = ""; @@ -96,11 +115,39 @@ public abstract class AtlasHook { notificationInterface.setCurrentUser(currentUser); + boolean isAsync = atlasProperties.getBoolean(ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE); + + if (isAsync) { + int minThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1); + int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 5); + long keepAliveTimeMs = atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 10000); + int queueSize = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 10000); + + executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(queueSize), + new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").build()); + + ShutdownHookManager.get().addShutdownHook(new Thread() { + @Override + public void run() { + try { + LOG.info("==> Shutdown of Atlas Hook"); + + executor.shutdown(); + executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS); + executor = null; + } catch (InterruptedException excp) { + LOG.info("Interrupt received in shutdown.", excp); + } finally { + LOG.info("<== Shutdown of Atlas Hook"); + } + } + }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY); + } + LOG.info("Created Atlas Hook"); } - protected abstract String getNumberOfRetriesPropertyKey(); - protected void notifyEntities(String user, Collection<Referenceable> entities) { JSONArray entitiesArray = new JSONArray(); @@ -110,75 +157,87 @@ public abstract class AtlasHook { entitiesArray.put(entityJson); } - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entitiesArray)); - notifyEntities(hookNotificationMessages); + notifyEntities(hookNotificationMessages, null, notificationMaxRetries); } - /** - * Notify atlas of the entity through message. The entity can be a - * complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the - * unique attribute on the entities. - * - * @param messages hook notification messages - * @param maxRetries maximum number of retries while sending message to messaging system - */ - public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) { - notifyEntitiesInternal(messages, maxRetries, notificationInterface, logFailedMessages, failedMessagesLogger); + public void notifyEntities(List<HookNotificationMessage> messages, UserGroupInformation ugi) { + notifyEntities(messages, ugi, notificationMaxRetries); + } + + public void notifyEntities(List<HookNotificationMessage> messages, UserGroupInformation ugi, int maxRetries) { + if (executor == null) { // send synchronously + notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); + } else { + executor.submit(new Runnable() { + @Override + public void run() { + notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); + } + }); + } } @VisibleForTesting - static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries, + static void notifyEntitiesInternal(List<HookNotificationMessage> messages, int maxRetries, UserGroupInformation ugi, NotificationInterface notificationInterface, boolean shouldLogFailedMessages, FailedMessagesLogger logger) { if (messages == null || messages.isEmpty()) { return; } - final String message = messages.toString(); - int numRetries = 0; - while (true) { - try { - notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); - return; - } catch (Exception e) { - numRetries++; - if (numRetries < maxRetries) { - LOG.error("Failed to send notification - attempt #{}; error={}", numRetries, e.getMessage()); - try { - LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval); - Thread.sleep(notificationRetryInterval); - } catch (InterruptedException ie) { - LOG.error("Notification hook thread sleep interrupted"); - } + final int maxAttempts = maxRetries < 1 ? 1 : maxRetries; + final String message = messages.toString(); + Exception notificationFailure = null; + + for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) { + if (numAttempt > 1) { // retry attempt + try { + LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval); + Thread.sleep(notificationRetryInterval); + } catch (InterruptedException ie) { + LOG.error("Notification hook thread sleep interrupted"); + + break; + } + } + + try { + if (ugi == null) { + notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); } else { - if (shouldLogFailedMessages && e instanceof NotificationException) { - List<String> failedMessages = ((NotificationException) e).getFailedMessages(); - for (String msg : failedMessages) { - logger.log(msg); + PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); + return messages; } - } - LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", - message, maxRetries, e); - return; + }; + + ugi.doAs(privilegedNotify); } + + notificationFailure = null; // notification sent successfully, reset error + + break; + } catch (Exception e) { + notificationFailure = e; + + LOG.error("Failed to send notification - attempt #{}; error={}", numAttempt, e.getMessage()); } } - } - /** - * Notify atlas of the entity through message. The entity can be a - * complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the - * unique attribute on the entities. - * - * @param messages hook notification messages - */ - protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) { - final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3); - notifyEntities(messages, maxRetries); + if (shouldLogFailedMessages && notificationFailure instanceof NotificationException) { + final List<String> failedMessages = ((NotificationException) notificationFailure).getFailedMessages(); + + for (String msg : failedMessages) { + logger.log(msg); + } + + LOG.error("Giving up after {} failed attempts to send notification to Atlas: {}", maxAttempts, message, notificationFailure); + } } /** http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index d59cb1c..39ca686 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -54,7 +54,7 @@ public class AtlasHookTest { List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); doThrow(new NotificationException(new Exception())).when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, null, notificationInterface, false, failedMessagesLogger); // if we've reached here, the method finished OK. } @@ -68,7 +68,7 @@ public class AtlasHookTest { }; doThrow(new NotificationException(new Exception())).when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, notificationInterface, false, failedMessagesLogger); verify(notificationInterface, times(2)). @@ -85,7 +85,7 @@ public class AtlasHookTest { doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message"); @@ -97,7 +97,7 @@ public class AtlasHookTest { doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, notificationInterface, false, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger); @@ -113,7 +113,7 @@ public class AtlasHookTest { doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2"))) .when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message1"); @@ -125,7 +125,7 @@ public class AtlasHookTest { List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); doThrow(new RuntimeException("test message")).when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, null, notificationInterface, true, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger);