atlas git commit: ATLAS-2823: updated hooks to support asynchronous notifications - #2
Repository: atlas Updated Branches: refs/heads/master 4b2324f36 -> a7cd9ac38 ATLAS-2823: updated hooks to support asynchronous notifications - #2 Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a7cd9ac3 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a7cd9ac3 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a7cd9ac3 Branch: refs/heads/master Commit: a7cd9ac38dd5dcd105a7d5a28c50e851ee0d29a0 Parents: 4b2324f Author: Madhan Neethiraj Authored: Wed Aug 22 04:27:25 2018 -0700 Committer: Madhan Neethiraj Committed: Wed Aug 22 04:27:25 2018 -0700 -- notification/src/main/java/org/apache/atlas/hook/AtlasHook.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/a7cd9ac3/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java -- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 406f679..31874af 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -113,13 +113,13 @@ public abstract class AtlasHook { if (isAsync) { int minThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1); -int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 5); +int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 1); long keepAliveTimeMs = atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 1); int queueSize = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 1); executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(queueSize), - new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").build()); + new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").setDaemon(true).build()); ShutdownHookManager.get().addShutdownHook(new Thread() { @Override
atlas git commit: ATLAS-2823: updated hooks to support asynchronous notifications - #2
Repository: atlas Updated Branches: refs/heads/branch-0.8 d1f491112 -> b432547b8 ATLAS-2823: updated hooks to support asynchronous notifications - #2 Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b432547b Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b432547b Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b432547b Branch: refs/heads/branch-0.8 Commit: b432547b8c7fa498dbecf41a36661029620f1691 Parents: d1f4911 Author: Madhan Neethiraj Authored: Wed Aug 22 04:15:53 2018 -0700 Committer: Madhan Neethiraj Committed: Wed Aug 22 04:15:53 2018 -0700 -- notification/src/main/java/org/apache/atlas/hook/AtlasHook.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/b432547b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java -- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 8d24035..d4b0715 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -119,13 +119,13 @@ public abstract class AtlasHook { if (isAsync) { int minThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1); -int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 5); +int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 1); long keepAliveTimeMs = atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 1); int queueSize = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 1); executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(queueSize), - new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").build()); + new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").setDaemon(true).build()); ShutdownHookManager.get().addShutdownHook(new Thread() { @Override
atlas git commit: ATLAS-2823: updated hooks to support asynchronous notifications
Repository: atlas Updated Branches: refs/heads/branch-0.8 9da91296f -> 9bdbb3184 ATLAS-2823: updated hooks to support asynchronous notifications (cherry picked from commit 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9bdbb318 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9bdbb318 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9bdbb318 Branch: refs/heads/branch-0.8 Commit: 9bdbb31842c93412ebfe27d98bd8dcbabe7d80f4 Parents: 9da9129 Author: Madhan Neethiraj Authored: Tue Aug 14 16:18:19 2018 -0700 Committer: Madhan Neethiraj Committed: Thu Aug 16 18:27:39 2018 -0700 -- .../apache/atlas/falcon/hook/FalconHook.java| 85 +- .../atlas/hbase/bridge/HBaseAtlasHook.java | 274 +++ .../org/apache/atlas/hive/hook/HiveHook.java| 12 +- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 138 +- .../java/org/apache/atlas/hook/AtlasHook.java | 193 - .../org/apache/atlas/hook/AtlasHookTest.java| 12 +- 7 files changed, 250 insertions(+), 478 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/9bdbb318/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java -- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 188b754..6876595 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -18,30 +18,22 @@ package org.apache.atlas.falcon.hook; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.atlas.AtlasConstants; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Falcon hook sends lineage information to the Atlas Service. @@ -49,27 +41,6 @@ import java.util.concurrent.TimeUnit; public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); -public static final String CONF_PREFIX = "atlas.hook.falcon."; -private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; -private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; -private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; -public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; -public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; - -public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - -// wait time determines how long we wait before we exit the jvm on -// shutdown. Pending requests after that will not be sent. -private static final int WAIT_TIME = 3; -private static ExecutorService executor; - -private static final int minThreadsDefault = 5; -private static final int maxThreadsDefault = 5; -private static final long keepAliveTimeDefault = 10; -private static final int queueSizeDefault = 1; - -private static boolean sync; - private static ConfigurationStore STORE; private enum Operation { @@ -79,45 +50,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { static { try { -// initialize the async facility to process hook calls. We don't -// want to do this inline since it adds plenty of overhead for the query. -int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); -int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); -long keepAliveTime =
atlas git commit: ATLAS-2823: updated hooks to support asynchronous notifications
Repository: atlas Updated Branches: refs/heads/branch-1.0 c245049a7 -> 56ac0161e ATLAS-2823: updated hooks to support asynchronous notifications (cherry picked from commit 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/56ac0161 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/56ac0161 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/56ac0161 Branch: refs/heads/branch-1.0 Commit: 56ac0161e4f1ba92bf44235173d2912ea60d99d2 Parents: c245049 Author: Madhan Neethiraj Authored: Tue Aug 14 16:18:19 2018 -0700 Committer: Madhan Neethiraj Committed: Wed Aug 15 15:02:31 2018 -0700 -- .../apache/atlas/falcon/hook/FalconHook.java| 85 +- .../atlas/hbase/bridge/HBaseAtlasHook.java | 273 +++ .../org/apache/atlas/hive/hook/HiveHook.java| 12 +- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 9 +- .../java/org/apache/atlas/hook/AtlasHook.java | 176 .../org/apache/atlas/hook/AtlasHookTest.java| 12 +- 7 files changed, 180 insertions(+), 401 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/56ac0161/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java -- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 77177b4..8c09d33 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -18,13 +18,10 @@ package org.apache.atlas.falcon.hook; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.atlas.AtlasConstants; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; @@ -32,17 +29,12 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Falcon hook sends lineage information to the Atlas Service. @@ -50,27 +42,6 @@ import java.util.concurrent.TimeUnit; public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); -public static final String CONF_PREFIX = "atlas.hook.falcon."; -private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; -private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; -private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; -public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; -public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; - -public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - -// wait time determines how long we wait before we exit the jvm on -// shutdown. Pending requests after that will not be sent. -private static final int WAIT_TIME = 3; -private static ExecutorService executor; - -private static final int minThreadsDefault = 5; -private static final int maxThreadsDefault = 5; -private static final long keepAliveTimeDefault = 10; -private static final int queueSizeDefault = 1; - -private static boolean sync; - private static ConfigurationStore STORE; private enum Operation { @@ -80,45 +51,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { static { try { -// initialize the async facility to process hook calls. We don't -// want to do this inline since it adds plenty of overhead for the query. -int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); -int maxThreads =
atlas git commit: ATLAS-2823: updated hooks to support asynchronous notifications
Repository: atlas Updated Branches: refs/heads/master 1179aff80 -> 52ef9e7f3 ATLAS-2823: updated hooks to support asynchronous notifications Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/52ef9e7f Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/52ef9e7f Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/52ef9e7f Branch: refs/heads/master Commit: 52ef9e7f3962da2bf77a15ab1a1bcef8d47b2877 Parents: 1179aff Author: Madhan Neethiraj Authored: Tue Aug 14 16:18:19 2018 -0700 Committer: Madhan Neethiraj Committed: Wed Aug 15 13:39:37 2018 -0700 -- .../apache/atlas/falcon/hook/FalconHook.java| 85 +- .../atlas/hbase/bridge/HBaseAtlasHook.java | 273 +++ .../org/apache/atlas/hive/hook/HiveHook.java| 12 +- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 9 +- .../java/org/apache/atlas/hook/AtlasHook.java | 176 .../org/apache/atlas/hook/AtlasHookTest.java| 12 +- 7 files changed, 180 insertions(+), 401 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/52ef9e7f/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java -- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 77177b4..8c09d33 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -18,13 +18,10 @@ package org.apache.atlas.falcon.hook; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.atlas.AtlasConstants; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; @@ -32,17 +29,12 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Falcon hook sends lineage information to the Atlas Service. @@ -50,27 +42,6 @@ import java.util.concurrent.TimeUnit; public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); -public static final String CONF_PREFIX = "atlas.hook.falcon."; -private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; -private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; -private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; -public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; -public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; - -public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - -// wait time determines how long we wait before we exit the jvm on -// shutdown. Pending requests after that will not be sent. -private static final int WAIT_TIME = 3; -private static ExecutorService executor; - -private static final int minThreadsDefault = 5; -private static final int maxThreadsDefault = 5; -private static final long keepAliveTimeDefault = 10; -private static final int queueSizeDefault = 1; - -private static boolean sync; - private static ConfigurationStore STORE; private enum Operation { @@ -80,45 +51,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { static { try { -// initialize the async facility to process hook calls. We don't -// want to do this inline since it adds plenty of overhead for the query. -int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); -int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); -long keepAliveTime =